Source code for sparkless.storage.serialization.json

"""
JSON serialization module.

This module provides JSON serialization and deserialization for storage.
"""

import json
from typing import Any, Dict, List
from sparkless.spark_types import StructType, StructField


[docs] class JSONSerializer: """JSON serializer for storage operations."""
[docs] @staticmethod def serialize_data(data: List[Dict[str, Any]], file_path: str) -> None: """Serialize data to JSON file. Args: data: Data to serialize. file_path: Path to output file. """ with open(file_path, "w") as f: json.dump(data, f, indent=2, default=str)
[docs] @staticmethod def deserialize_data(file_path: str) -> List[Dict[str, Any]]: """Deserialize data from JSON file. Args: file_path: Path to input file. Returns: Deserialized data. """ try: with open(file_path) as f: data = json.load(f) return data if isinstance(data, list) else [] except (FileNotFoundError, json.JSONDecodeError): return []
[docs] @staticmethod def serialize_schema(schema: StructType, file_path: str) -> None: """Serialize schema to JSON file. Args: schema: Schema to serialize. file_path: Path to output file. """ schema_data = { "fields": [ { "name": field.name, "data_type": type(field.dataType).__name__, "nullable": field.nullable, } for field in schema.fields ] } with open(file_path, "w") as f: json.dump(schema_data, f, indent=2)
[docs] @staticmethod def deserialize_schema(file_path: str) -> StructType: """Deserialize schema from JSON file. Args: file_path: Path to input file. Returns: Deserialized schema. """ try: with open(file_path) as f: schema_data = json.load(f) fields = [] for field_data in schema_data.get("fields", []): # Create appropriate data type based on type name data_type = JSONSerializer._create_data_type(field_data["data_type"]) field = StructField( field_data["name"], data_type, field_data.get("nullable", True) ) fields.append(field) return StructType(fields) except (FileNotFoundError, json.JSONDecodeError, KeyError): return StructType([])
@staticmethod def _create_data_type(type_name: str) -> Any: """Create data type from type name. Args: type_name: Name of the data type. Returns: Data type instance. """ from ...spark_types import ( StringType, IntegerType, LongType, DoubleType, BooleanType, ) type_mapping = { "StringType": StringType(), "IntegerType": IntegerType(), "LongType": LongType(), "DoubleType": DoubleType(), "BooleanType": BooleanType(), } return type_mapping.get(type_name, StringType())