Migration from PySpark
Overview
Sparkless is a drop-in replacement for PySpark designed for testing and development. It provides 100% API compatibility with PySpark while using Polars as the default backend for fast, in-memory processing.
Drop-in Replacement
Basic Migration
Change one line in your imports:
# Before
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
# After
from sparkless.sql import SparkSession, functions as F
from sparkless.sql.types import *
Module structure (PySpark compatibility)
Sparkless mirrors PySpark’s module layout for drop-in compatibility:
sparkless.sql–SparkSession,functions(asF), and SQL entry points.sparkless.sql.types–StructType,StructField,StringType,IntegerType, and other types (re-exports fromsparkless.spark_typeswhere applicable).sparkless.sql.utils– Exceptions such asAnalysisException,ParseException,IllegalArgumentExceptionfor parity withpyspark.sql.utils.
The functions object is a module (e.g. import sparkless.sql.functions as F), so isinstance(F, ModuleType) is true as in PySpark. Use from sparkless.sql import SparkSession, functions as F and from sparkless.sql.types import * (or explicit type imports) for minimal code changes when migrating.
Session Creation
# PySpark
spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Sparkless (identical API)
spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
DataFrame Operations
All DataFrame operations work identically:
# Both PySpark and Sparkless
df = spark.createDataFrame([
{"id": 1, "name": "Alice", "age": 25},
{"id": 2, "name": "Bob", "age": 30}
])
# Column access (both syntaxes supported)
df.select("name", df.age, F.col("id")) # ✅ Works
df.select(df.name, df.age, F.col("id")) # ✅ Works
# Operations
result = df.filter(df.age > 25) \
.withColumn("category", F.when(df.age > 30, "senior").otherwise("junior")) \
.groupBy("category") \
.agg(F.count("*").alias("count"))
API Compatibility
✅ Fully Supported
DataFrame Operations: select, filter, withColumn, drop, join, union, etc.
Column Access: Both
df.column_nameandF.col("column_name")syntaxFunctions: All PySpark functions (F.col, F.when, F.lit, etc.)
Window Functions: row_number, rank, dense_rank, lag, lead, first, last
Aggregations: count, sum, avg, min, max, countDistinct, first, last
Datetime Functions: to_timestamp, hour, dayofweek, current_date, etc.
String Functions: contains, startswith, endswith, like, rlike, regexp
Type Casting: cast to string, int, long, double, float, boolean, date, timestamp
Catalog Operations: createDatabase, setCurrentDatabase, currentDatabase, listTables
Data Types: All PySpark data types (StringType, IntegerType, etc.)
🔄 Enhanced Features
Better Error Messages: Clear, actionable error messages with suggestions
Debug Mode: Enable SQL logging for troubleshooting
Validation Rules: String and list-based validation expressions
Performance: 10x faster than PySpark for most operations
Storage API: Convenient
.storageAPI for database and table management (sparkless-specific)
📝 Sparkless-Specific Features
Storage API
Sparkless provides a convenient .storage API for managing databases and tables. This is a sparkless-specific feature that doesn’t exist in PySpark. In PySpark, you would use SQL commands instead.
Sparkless (using .storage API):
from sparkless.sql import SparkSession
from sparkless.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession("MyApp")
# Create database/schema
spark._storage.create_schema("test_db")
# Create table with schema
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
spark._storage.create_table("test_db", "users", schema)
# Insert data
spark._storage.insert_data("test_db", "users", [
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30}
])
PySpark (using SQL commands):
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.appName("MyApp").getOrCreate()
# Create database/schema
spark.sql("CREATE DATABASE IF NOT EXISTS test_db")
# Create table with schema
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.createDataFrame([], schema)
df.write.saveAsTable("test_db.users")
# Insert data
spark.sql("INSERT INTO test_db.users VALUES ('Alice', 25), ('Bob', 30)")
Compatibility Note: For maximum compatibility with PySpark, use SQL commands in sparkless. Both approaches work in sparkless, but SQL commands are portable to PySpark:
# Works in both sparkless and PySpark
spark.sql("CREATE DATABASE IF NOT EXISTS test_db")
spark.sql("CREATE TABLE test_db.users (name STRING, age INT)")
spark.sql("INSERT INTO test_db.users VALUES ('Alice', 25), ('Bob', 30)")
⚠️ Known Limitations
SQL Generation Issues
Some complex operations may have SQL generation issues that are being addressed:
# May have issues with complex CAST operations
df.select(df.column.cast("timestamp")) # ⚠️ SQL generation issue
# Workaround: Use explicit type conversion
df.select(F.to_timestamp(df.column))
Window Function SQL Translation
Window functions work correctly but may not be optimized in SQL generation:
# Works but may not be SQL-optimized
window = Window.partitionBy("category").orderBy("timestamp")
df.withColumn("row_num", F.row_number().over(window))
Complex Type Operations
Some complex type operations may need workarounds:
# Array operations may need explicit handling
df.select(F.explode(df.array_column)) # ✅ Works
df.select(F.array_contains(df.array_column, "value")) # ✅ Works
Performance Considerations
Speed Improvements
Sparkless is significantly faster than PySpark for most operations:
Simple Operations: 10-50x faster
Complex Aggregations: 5-20x faster
Window Functions: 3-10x faster
Joins: 2-5x faster
Memory Usage
Lower Memory Footprint: Uses Polars’ efficient columnar storage
No JVM Overhead: Pure Python implementation
Automatic Cleanup: Temporary tables are automatically cleaned up
Debugging Guide
Enable Debug Mode
# Enable SQL logging
spark.conf.set("spark.sql.debug", "true")
# Or set globally
import sparkless
sparkless.set_debug_mode(True)
Common Error Messages
Column Not Found
MockSparkColumnNotFoundError: Column 'invalid_column' not found
Available columns: ['id', 'name', 'age']
Solution: Check column name spelling and case sensitivity.
Type Mismatch
MockSparkTypeMismatchError: Cannot cast 'string' to 'integer'
Column: age, Value: 'not_a_number'
Solution: Ensure data types are compatible or use proper type conversion.
SQL Generation Error
MockSparkSQLGenerationError: SQL syntax error in generated query
Operation: filter
Solution: Simplify the expression or use alternative syntax.
SQL Logging
When debug mode is enabled, you’ll see generated SQL:
[DEBUG] Operation: filter
[DEBUG] SQL: SELECT * FROM temp_table_123 WHERE age > 25
Testing Patterns
Unit Testing
import pytest
from sparkless.sql import SparkSession, functions as F
@pytest.fixture
def spark():
return SparkSession("test")
def test_data_processing(spark):
df = spark.createDataFrame([
{"id": 1, "value": 10},
{"id": 2, "value": 20}
])
result = df.filter(df.value > 15).collect()
assert len(result) == 1
assert result[0]["id"] == 2
Integration Testing
def test_complex_pipeline(spark):
# Test entire data pipeline
df = spark.read.json("data.json")
processed = df.filter(df.status == "active") \
.withColumn("processed_at", F.current_timestamp()) \
.groupBy("category") \
.agg(F.count("*").alias("count"))
results = processed.collect()
assert len(results) > 0
Performance Testing
import time
def test_performance(spark):
df = spark.createDataFrame(large_dataset)
start = time.time()
result = df.filter(df.value > 1000).count()
end = time.time()
assert result > 0
assert (end - start) < 1.0 # Should complete in under 1 second
Migration Checklist
Before Migration
Identify all PySpark imports in your codebase
List all DataFrame operations used
Note any custom UDFs or complex operations
Check for any PySpark-specific configurations
During Migration
Replace PySpark imports with Sparkless imports
Update session creation (usually no changes needed)
Test basic DataFrame operations
Verify column access patterns work
Test aggregation and window functions
Validate type casting operations
After Migration
Run full test suite
Verify performance improvements
Check error messages are helpful
Document any workarounds needed
Update CI/CD pipelines
Troubleshooting
Import Issues
# If you get import errors
from sparkless.sql import SparkSession, functions as F
from sparkless.sql.types import *
Session Issues
# If session creation fails
spark = SparkSession("my_app")
# Instead of
spark = SparkSession.builder.appName("my_app").getOrCreate()
Data Type Issues
# If you get type errors, check your data types
from sparkless.sql.types import StringType, IntegerType, StructType, StructField
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
Getting Help
Documentation
API Reference:
docs/api_reference.mdTesting Patterns:
docs/testing_patterns.mdType Reference:
docs/types.md
Community
GitHub Issues: Report bugs and request features
Discussions: Ask questions and share solutions
Examples: Check the
examples/directory
Support
For issues specific to Sparkless:
Check the error message for specific guidance
Enable debug mode to see generated SQL
Verify your PySpark code works with real PySpark
Check the API parity tests for examples
Open an issue with detailed error information
Examples
Basic Data Processing
from sparkless.sql import SparkSession, functions as F
# Create session
spark = SparkSession("data_processing")
# Create DataFrame
df = spark.createDataFrame([
{"id": 1, "name": "Alice", "age": 25, "salary": 50000},
{"id": 2, "name": "Bob", "age": 30, "salary": 60000},
{"id": 3, "name": "Charlie", "age": 35, "salary": 70000}
])
# Process data
result = df.filter(df.age > 25) \
.withColumn("senior", F.when(df.age > 30, True).otherwise(False)) \
.groupBy("senior") \
.agg(F.avg("salary").alias("avg_salary")) \
.collect()
print(result)
Window Functions
from sparkless.sql import Window
# Define window
window = Window.partitionBy("department").orderBy("salary")
# Apply window functions
result = df.withColumn("rank", F.rank().over(window)) \
.withColumn("row_num", F.row_number().over(window)) \
.withColumn("lag_salary", F.lag("salary", 1).over(window)) \
.collect()
Complex Aggregations
# Multiple aggregations
result = df.groupBy("department") \
.agg(
F.count("*").alias("count"),
F.avg("salary").alias("avg_salary"),
F.max("salary").alias("max_salary"),
F.min("salary").alias("min_salary")
) \
.collect()
This migration guide should help you successfully transition from PySpark to Sparkless while maintaining full functionality and gaining significant performance improvements.