Source code for sparkless.functions.json_csv

"""
JSON and CSV functions for Sparkless.

This module provides JSON and CSV processing functions that match PySpark's API.
Includes parsing, generation, and schema inference for JSON and CSV data.
"""

from typing import Any, Dict, Optional, Union
from sparkless.functions.base import Column, ColumnOperation


[docs] class JSONCSVFunctions: """Collection of JSON and CSV manipulation functions."""
[docs] @staticmethod def from_json( column: Union[Column, str], schema: Any, options: Optional[Dict[str, Any]] = None, ) -> ColumnOperation: """Parse JSON string column into struct/array column. Args: column: JSON string column schema: Target schema options: Optional parsing options Returns: ColumnOperation representing from_json """ if isinstance(column, str): column = Column(column) return ColumnOperation( column, "from_json", value=(schema, options), name=f"from_json({column.name})", )
[docs] @staticmethod def to_json(column: Union[Column, str]) -> ColumnOperation: """Convert struct/array column to JSON string. Args: column: Struct or array column Returns: ColumnOperation representing to_json """ if isinstance(column, str): column = Column(column) return ColumnOperation(column, "to_json", name=f"to_json({column.name})")
[docs] @staticmethod def get_json_object(column: Union[Column, str], path: str) -> ColumnOperation: """Extract JSON object at specified path. Args: column: JSON string column path: JSON path (e.g., '$.field') Returns: ColumnOperation representing get_json_object """ if isinstance(column, str): column = Column(column) return ColumnOperation( column, "get_json_object", value=path, name=f"get_json_object({column.name}, {path})", )
[docs] @staticmethod def json_tuple(column: Union[Column, str], *fields: str) -> ColumnOperation: """Extract multiple fields from JSON string. Args: column: JSON string column *fields: Field names to extract Returns: ColumnOperation representing json_tuple """ if isinstance(column, str): column = Column(column) return ColumnOperation( column, "json_tuple", value=fields, name=f"json_tuple({column.name}, ...)" )
[docs] @staticmethod def schema_of_json(json_string: str) -> ColumnOperation: """Infer schema from JSON string. Args: json_string: Sample JSON string Returns: ColumnOperation representing schema_of_json """ from sparkless.functions.core.literals import Literal return ColumnOperation( Literal(json_string), "schema_of_json", name="schema_of_json(...)" )
[docs] @staticmethod def from_csv( column: Union[Column, str], schema: Any, options: Optional[Dict[str, Any]] = None, ) -> ColumnOperation: """Parse CSV string column into struct column. Args: column: CSV string column schema: Target schema options: Optional parsing options Returns: ColumnOperation representing from_csv """ if isinstance(column, str): column = Column(column) return ColumnOperation( column, "from_csv", value=(schema, options), name=f"from_csv({column.name})" )
[docs] @staticmethod def to_csv(column: Union[Column, str]) -> ColumnOperation: """Convert struct column to CSV string. Args: column: Struct column Returns: ColumnOperation representing to_csv """ if isinstance(column, str): column = Column(column) return ColumnOperation(column, "to_csv", name=f"to_csv({column.name})")
[docs] @staticmethod def schema_of_csv(csv_string: str) -> ColumnOperation: """Infer schema from CSV string. Args: csv_string: Sample CSV string Returns: ColumnOperation representing schema_of_csv """ from sparkless.functions.core.literals import Literal return ColumnOperation( Literal(csv_string), "schema_of_csv", name="schema_of_csv(...)" )