Getting Startedο
Compatibility Snapshot: This guide targets Sparkless
3.31.0, which provides parity with PySpark 3.2β3.5 and ships with 2400+ passing regression tests.
Installationο
Install Sparkless using pip:
pip install sparkless
For development with testing tools:
pip install sparkless[dev]
Quick Startο
Basic Exampleο
from sparkless.sql import SparkSession, functions as F
# Create session
spark = SparkSession("MyApp")
# Create DataFrame
data = [
{"id": 1, "name": "Alice", "age": 25},
{"id": 2, "name": "Bob", "age": 30},
]
df = spark.createDataFrame(data)
# Operations work just like PySpark
result = df.filter(F.col("age") > 25).select("name")
print(result.collect()) # [Row(name='Bob')]
Drop-in PySpark Replacementο
Sparkless is designed to be a drop-in replacement for PySpark:
# Before (PySpark)
from pyspark.sql import SparkSession
# After (Sparkless)
from sparkless.sql import SparkSession
Thatβs it! Your existing PySpark code works unchanged.
Core Featuresο
DataFrame Operationsο
from sparkless.sql import SparkSession, functions as F
spark = SparkSession("Example")
data = [
{"name": "Alice", "dept": "Engineering", "salary": 80000},
{"name": "Bob", "dept": "Sales", "salary": 75000},
{"name": "Charlie", "dept": "Engineering", "salary": 90000},
]
df = spark.createDataFrame(data)
# Filter
high_earners = df.filter(F.col("salary") > 75000)
# Null-safe equality (for comparing columns that may contain NULL)
# NULL <=> NULL returns True, NULL <=> non-NULL returns False
employees_with_managers = df.filter(F.col("id").eqNullSafe(F.col("manager_id")))
# Select
names = df.select("name", "dept")
# Aggregations
dept_avg = df.groupBy("dept").avg("salary")
Window Functionsο
from sparkless.sql import Window, functions as F
# Ranking within departments
window_spec = Window.partitionBy("dept").orderBy(F.desc("salary"))
ranked = df.select(
"name",
"dept",
"salary",
F.row_number().over(window_spec).alias("rank")
)
SQL Queriesο
# Create temporary view
df.createOrReplaceTempView("employees")
Storage Managementο
Sparkless provides two ways to manage databases and tables:
Option 1: SQL Commands (PySpark-Compatible - Recommended)
# Works in both sparkless and PySpark
spark.sql("CREATE DATABASE IF NOT EXISTS test_db")
spark.sql("CREATE TABLE test_db.users (name STRING, age INT)")
spark.sql("INSERT INTO test_db.users VALUES ('Alice', 25), ('Bob', 30)")
Option 2: Storage API (Sparkless-Specific)
# Convenient API, but sparkless-specific
from sparkless.sql.types import StructType, StructField, StringType, IntegerType
spark._storage.create_schema("test_db")
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
spark._storage.create_table("test_db", "users", schema)
spark._storage.insert_data("test_db", "users", [
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30}
])
Note: For maximum compatibility with PySpark, use SQL commands. The .storage API is a sparkless convenience feature that doesnβt exist in PySpark.
Run SQL Queriesο
# Run SQL queries
result = spark.sql("SELECT name, salary FROM employees WHERE salary > 80000")
result.show()
Testing with Sparklessο
Unit Test Exampleο
import pytest
from sparkless.sql import SparkSession, functions as F
def test_data_transformation():
"""Test DataFrame transformation logic."""
spark = SparkSession("TestApp")
# Test data
data = [{"value": 10}, {"value": 20}, {"value": 30}]
df = spark.createDataFrame(data)
# Apply transformation
result = df.filter(F.col("value") > 15)
# Assertions
assert result.count() == 2
rows = result.collect()
assert rows[0]["value"] == 20
assert rows[1]["value"] == 30
def test_aggregation():
"""Test aggregation logic."""
spark = SparkSession("TestApp")
data = [
{"category": "A", "value": 100},
{"category": "A", "value": 200},
{"category": "B", "value": 150},
]
df = spark.createDataFrame(data)
# Group and aggregate
result = df.groupBy("category").sum("value")
# Verify results
assert result.count() == 2
Lazy Evaluationο
Sparkless mirrors PySparkβs lazy evaluation model:
# Transformations are queued (not executed)
filtered = df.filter(F.col("age") > 25)
selected = filtered.select("name")
# No execution yet!
# Actions trigger execution
rows = selected.collect() # β Executes now
count = selected.count() # β Executes now
Control evaluation mode:
# Lazy (default, recommended)
spark = SparkSession("App", enable_lazy_evaluation=True)
# Eager (for legacy tests)
spark = SparkSession("App", enable_lazy_evaluation=False)
Performanceο
Sparkless provides significant speed improvements:
Operation |
PySpark |
Sparkless |
Speedup |
|---|---|---|---|
Session Creation |
30-45s |
0.1s |
300x |
Simple Query |
2-5s |
0.01s |
200x |
Full Test Suite |
5-10min |
30-60s |
10x |
Advanced: Session-aware literals and schema trackingο
Sparkless tracks the active SparkSession for functions that depend on it (e.g. F.col, F.lit, F.expr). When you call these, the engine uses the current session for catalog and configuration (e.g. case sensitivity, current database).
Session-aware helpers (e.g. current_catalog, current_database, current_schema, current_user) and schema tracking in the Polars storage backend ensure that operations like setCurrentDatabase take effect for subsequent SQL and DataFrame operations. Create the session before using F.col/F.lit and set the current database with spark.catalog.setCurrentDatabase("db_name") when using multiple databases. See Configuration and Troubleshooting.
Next Stepsο
API Reference - Complete API documentation
SQL Operations - SQL query examples
Testing Patterns - Test helpers and fixtures
Examples - More code examples
Getting Helpο
Documentation: docs/