Storage Serialization Guide

This guide provides comprehensive documentation for Sparkless’s storage serialization capabilities, including CSV, JSON, and other format support.

Overview

Sparkless provides a complete storage serialization system that supports multiple data formats:

  • Parquet Storage - Columnar storage format (default for Polars backend in v3.0.0+)

  • CSV Serialization - Comma-separated values format

  • JSON Serialization - JavaScript Object Notation format

  • Custom Formats - Extensible serialization framework

Note: With Polars backend (v3.0.0+), tables are persisted as Parquet files by default. CSV and JSON serialization are still available for export/import operations.

CSV Serialization

The CSVSerializer class provides CSV serialization and deserialization capabilities.

Basic Usage

from sparkless.storage.serialization.csv import CSVSerializer
from sparkless.sql import SparkSession

# Create Spark session
spark = SparkSession("CSVExample")

# Create sample data
data = [
    {"name": "Alice", "age": 25, "city": "New York"},
    {"name": "Bob", "age": 30, "city": "San Francisco"},
    {"name": "Charlie", "age": 35, "city": "Chicago"}
]
df = spark.createDataFrame(data)

# Serialize to CSV
csv_serializer = CSVSerializer()
csv_data = csv_serializer.serialize(df.collect())
print(csv_data)
# name,age,city
# Alice,25,New York
# Bob,30,San Francisco
# Charlie,35,Chicago

CSV Options

# Custom CSV options
csv_serializer = CSVSerializer(
    delimiter=',',           # Field delimiter
    quote_char='"',          # Quote character
    escape_char='\\',        # Escape character
    header=True,             # Include header row
    null_value='',           # Null value representation
    date_format='yyyy-MM-dd', # Date format
    timestamp_format='yyyy-MM-dd HH:mm:ss' # Timestamp format
)

csv_data = csv_serializer.serialize(df.collect())

Deserialization

# Deserialize CSV data
csv_data = """name,age,city
Alice,25,New York
Bob,30,San Francisco
Charlie,35,Chicago"""

deserialized_data = csv_serializer.deserialize(csv_data)
print(deserialized_data)
# [{'name': 'Alice', 'age': 25, 'city': 'New York'}, ...]

Schema-Aware Serialization

from sparkless.spark_types import MockStructType, MockStructField, StringType, IntegerType

# Define schema
schema = MockStructType([
    MockStructField("name", StringType()),
    MockStructField("age", IntegerType()),
    MockStructField("city", StringType())
])

# Serialize with schema
csv_data = csv_serializer.serialize_with_schema(df.collect(), schema)

# Deserialize with schema
deserialized_data = csv_serializer.deserialize_with_schema(csv_data, schema)

Advanced CSV Features

# Custom field formatting
def format_age(age):
    return f"Age: {age}"

csv_serializer.add_formatter("age", format_age)

# Custom field parsing
def parse_age(age_str):
    return int(age_str.replace("Age: ", ""))

csv_serializer.add_parser("age", parse_age)

# Serialize with custom formatting
csv_data = csv_serializer.serialize(df.collect())

JSON Serialization

The JSONSerializer class provides JSON serialization and deserialization capabilities.

Basic Usage

from sparkless.storage.serialization.json import JSONSerializer

# Create JSON serializer
json_serializer = JSONSerializer()

# Serialize to JSON
json_data = json_serializer.serialize(df.collect())
print(json_data)
# [{"name": "Alice", "age": 25, "city": "New York"}, ...]

JSON Options

# Custom JSON options
json_serializer = JSONSerializer(
    indent=2,                # Pretty print with indentation
    ensure_ascii=False,      # Allow non-ASCII characters
    sort_keys=True,          # Sort dictionary keys
    separators=(',', ':'),   # Custom separators
    default=str              # Default serializer for unknown types
)

json_data = json_serializer.serialize(df.collect())

Deserialization

# Deserialize JSON data
json_data = '[{"name": "Alice", "age": 25, "city": "New York"}]'
deserialized_data = json_serializer.deserialize(json_data)
print(deserialized_data)

Schema-Aware JSON

# Serialize with schema
json_data = json_serializer.serialize_with_schema(df.collect(), schema)

# Deserialize with schema
deserialized_data = json_serializer.deserialize_with_schema(json_data, schema)

Custom JSON Serialization

import json
from datetime import datetime

# Custom JSON encoder
class CustomJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        return super().default(obj)

# Use custom encoder
json_serializer = JSONSerializer(encoder=CustomJSONEncoder)
json_data = json_serializer.serialize(df.collect())

Storage Integration

DataFrame Integration

# Write DataFrame to CSV
df.write.format("csv").option("header", "true").save("/path/to/csv")

# Read DataFrame from CSV
df_read = spark.read.format("csv").option("header", "true").load("/path/to/csv")

# Write DataFrame to JSON
df.write.format("json").save("/path/to/json")

# Read DataFrame from JSON
df_read = spark.read.format("json").load("/path/to/json")

Storage Backend Integration

# Note: Polars is the default backend (v3.0.0+)
# DuckDB backend is available as optional/legacy backend

from sparkless.storage.backends.memory import MemoryStorageManager

# Create storage manager (in-memory by default)
storage = MemoryStorageManager()

# Store serialized data
csv_data = csv_serializer.serialize(df.collect())
storage.store_data("table_name", csv_data, format="csv")

# Retrieve and deserialize data
retrieved_data = storage.retrieve_data("table_name", format="csv")
deserialized_data = csv_serializer.deserialize(retrieved_data)

Custom Serialization Formats

Creating Custom Serializers

from sparkless.storage.serialization.base import BaseSerializer

class CustomSerializer(BaseSerializer):
    def serialize(self, data):
        """Serialize data to custom format."""
        # Custom serialization logic
        return custom_format_data(data)
    
    def deserialize(self, data):
        """Deserialize data from custom format."""
        # Custom deserialization logic
        return custom_parse_data(data)
    
    def get_extension(self):
        """Return file extension for this format."""
        return ".custom"

# Use custom serializer
custom_serializer = CustomSerializer()
custom_data = custom_serializer.serialize(df.collect())

Registering Custom Formats

from sparkless.storage.serialization.registry import SerializationRegistry

# Register custom format
registry = SerializationRegistry()
registry.register("custom", CustomSerializer)

# Use registered format
serializer = registry.get_serializer("custom")
custom_data = serializer.serialize(df.collect())

Performance Optimization

Streaming Serialization

# Stream large datasets
def stream_serialize(data_stream, serializer):
    for batch in data_stream:
        yield serializer.serialize(batch)

# Use streaming for large datasets
for serialized_batch in stream_serialize(large_data_stream, csv_serializer):
    # Process serialized batch
    process_batch(serialized_batch)

Compression Support

import gzip

# Compressed serialization
def compress_serialize(data, serializer):
    serialized = serializer.serialize(data)
    return gzip.compress(serialized.encode())

def decompress_deserialize(compressed_data, serializer):
    decompressed = gzip.decompress(compressed_data).decode()
    return serializer.deserialize(decompressed)

# Use compression
compressed_data = compress_serialize(df.collect(), csv_serializer)
decompressed_data = decompress_deserialize(compressed_data, csv_serializer)

Memory-Efficient Processing

# Process data in chunks
def chunked_serialize(data, serializer, chunk_size=1000):
    for i in range(0, len(data), chunk_size):
        chunk = data[i:i + chunk_size]
        yield serializer.serialize(chunk)

# Use chunked processing
for serialized_chunk in chunked_serialize(large_data, csv_serializer):
    # Process chunk
    process_chunk(serialized_chunk)

Error Handling

Serialization Errors

from sparkless.storage.serialization.exceptions import SerializationError

try:
    csv_data = csv_serializer.serialize(invalid_data)
except SerializationError as e:
    print(f"Serialization error: {e}")
    # Handle error appropriately

Deserialization Errors

try:
    data = csv_serializer.deserialize(invalid_csv)
except SerializationError as e:
    print(f"Deserialization error: {e}")
    # Handle error appropriately

Validation Errors

from sparkless.storage.serialization.exceptions import ValidationError

try:
    validated_data = csv_serializer.validate_and_deserialize(csv_data, schema)
except ValidationError as e:
    print(f"Validation error: {e}")
    # Handle validation error

Best Practices

  1. Choose appropriate format - CSV for simple data, JSON for complex structures

  2. Use schema validation - Always validate data against expected schema

  3. Handle errors gracefully - Implement proper error handling

  4. Optimize for size - Use compression for large datasets

  5. Stream large data - Process large datasets in chunks

  6. Validate input - Always validate input data before serialization

Troubleshooting

Common Issues

  1. Encoding problems - Use UTF-8 encoding for international characters

  2. Schema mismatches - Ensure data matches expected schema

  3. Memory issues - Use streaming for large datasets

  4. Performance problems - Use appropriate serialization format

Debug Mode

# Enable debug mode
csv_serializer.set_debug(True)
json_serializer.set_debug(True)

# Debug information will be printed during serialization/deserialization

This comprehensive storage serialization guide provides everything you need to work with Sparkless’s serialization capabilities effectively.