API Reference

Overview

Sparkless provides 100% API compatibility with PySpark while using Polars as the default backend. This reference covers all supported functions, classes, and operations.

Session Management

SparkSession

from sparkless.sql import SparkSession

# Create session
spark = SparkSession("my_app")

# With configuration
spark = SparkSession("my_app", config={
    "spark.sql.debug": "true",
    "spark.sql.adaptive.enabled": "true"
})

Methods:

  • createDataFrame(data, schema=None) - Create DataFrame from Python data

  • read - Access to data source readers

  • sql(query) - Execute SQL queries

  • catalog - Access to catalog operations

  • conf - Configuration management

Configuration

# Set configuration
spark.conf.set("spark.sql.debug", "true")

# Get configuration
debug_mode = spark.conf.get("spark.sql.debug")

DataFrame Operations

Creation

# From Python data
df = spark.createDataFrame([
    {"id": 1, "name": "Alice", "age": 25},
    {"id": 2, "name": "Bob", "age": 30}
])

# With explicit schema
from sparkless.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

df = spark.createDataFrame(data, schema)

# Empty DataFrames require explicit StructType schema (PySpark compatibility)
# ❌ This will raise ValueError:
# df = spark.createDataFrame([], ['col1', 'col2'])

# ✅ Correct way:
from sparkless.sql.types import StructType, StructField, StringType, IntegerType
empty_schema = StructType([
    StructField("col1", StringType(), True),
    StructField("col2", IntegerType(), True)
])
df = spark.createDataFrame([], empty_schema)

Set Operations

# Union operations require compatible schemas (PySpark compatibility)
df1 = spark.createDataFrame([("a", 1)], ["col1", "col2"])
df2 = spark.createDataFrame([("b", 2)], ["col1", "col2"])

# ✅ Compatible schemas - same column names and compatible types
result = df1.union(df2)

# ❌ This will raise AnalysisException (different column counts):
# df3 = spark.createDataFrame([("c",)], ["col1"])
# result = df1.union(df3)

# Numeric type promotion is allowed (IntegerType -> LongType -> FloatType -> DoubleType)
schema1 = StructType([StructField("value", IntegerType(), True)])
schema2 = StructType([StructField("value", LongType(), True)])
df1 = spark.createDataFrame([(1,)], schema1)
df2 = spark.createDataFrame([(2,)], schema2)
result = df1.union(df2)  # ✅ Works - numeric types are compatible

# unionByName - Union by column names (allows different column order)
df1 = spark.createDataFrame([("Alice", 25)], ["name", "age"])
df2 = spark.createDataFrame([(30, "Bob")], ["age", "name"])  # Different order

# ✅ unionByName matches columns by name, not position
result = df1.unionByName(df2)  # Works correctly despite different column order

# unionByName with allowMissingColumns - fills missing columns with null
df1 = spark.createDataFrame([("Alice", 25, "Engineer")], ["name", "age", "role"])
df2 = spark.createDataFrame([("Bob", 30)], ["name", "age"])  # Missing "role" column

# ✅ With allowMissingColumns=True, missing columns are filled with null
result = df1.unionByName(df2, allowMissingColumns=True)

# Diamond dependency support - unionByName correctly handles when the same DataFrame
# is used in multiple branches (e.g., different joins) and then combined
existing = spark.createDataFrame([(1, "a", 100), (2, "b", 200)], ["id", "name", "value"])
source = spark.createDataFrame([(1, "a", 150)], ["id", "name", "value"])

# Branch A: Anti-join (rows NOT in source)
branch_a = existing.join(source.select("id").distinct(), on=["id"], how="left_anti")

# Branch B: Inner-join (rows that ARE in source)
branch_b = existing.join(source.select("id").distinct(), on=["id"], how="inner")

# ✅ unionByName correctly combines branches without duplicating data
combined = branch_a.unionByName(branch_b, allowMissingColumns=True)

Column Access

# Both syntaxes supported
df.select("name", df.age)  # ✅ Direct column access
df.select(F.col("name"), F.col("age"))  # ✅ F.col syntax

Selection and Filtering

# Select columns
df.select("id", "name")
df.select(df.id, df.name)
df.select(F.col("id"), F.col("name"))

# Filter rows
df.filter(df.age > 25)
df.filter(F.col("age") > 25)
df.where(df.age > 25)

# Null-safe equality (for comparing columns that may contain NULL)
# NULL <=> NULL returns True, NULL <=> non-NULL returns False
df.filter(F.col("id").eqNullSafe(F.col("manager_id")))
df.filter(F.col("value").eqNullSafe(F.lit(None)))

# Range checks with between (inclusive on both ends)
df.filter(F.col("age").between(18, 65))  # 18 <= age <= 65
df.filter(F.col("value").between(F.lit(0), F.lit(100)))  # Using literals
df.filter(F.col("salary").between(50000, 100000))  # Salary range

# Multiple conditions
df.filter((df.age > 25) & (df.salary > 50000))
df.filter(df.age > 25).filter(df.salary > 50000)

Column Operations

# Add new columns
df.withColumn("full_name", F.concat(df.first_name, F.lit(" "), df.last_name))
df.withColumn("age_group", F.when(df.age < 30, "young").otherwise("old"))

# Rename columns
df.withColumnRenamed("old_name", "new_name")

# Drop columns
df.drop("unwanted_column")
df.drop("col1", "col2")

Aggregations

# Simple aggregations
df.agg(F.count("*"))
df.agg(F.sum("salary"))
df.agg(F.avg("age"))

# Multiple aggregations
df.agg(
    F.count("*").alias("count"),
    F.avg("salary").alias("avg_salary"),
    F.max("salary").alias("max_salary")
)

# Group by aggregations
df.groupBy("department").agg(
    F.count("*").alias("count"),
    F.avg("salary").alias("avg_salary")
)

Functions Reference

Column Functions

Basic Operations

from sparkless.sql import functions as F

# Literal values
F.lit("constant")
F.lit(42)
F.lit(True)

# Column references
F.col("column_name")
F.col("table.column_name")

# Column operations
F.col("age") + 1
F.col("salary") * 1.1
F.col("name").isNull()
F.col("age").isNotNull()
F.col("age").between(18, 65)  # Range check (inclusive)
F.col("value").isin(1, 2, 3)  # Check if value is in list

String Functions

# String operations
F.concat(F.col("first"), F.lit(" "), F.col("last"))
F.concat_ws("-", F.col("col1"), F.col("col2"))
F.length(F.col("name"))
F.upper(F.col("name"))
F.lower(F.col("name"))
F.trim(F.col("name"))
F.ltrim(F.col("name"))
F.rtrim(F.col("name"))

# String matching
F.col("name").contains("Alice")
F.col("email").startswith("user@")
F.col("email").endswith(".com")
F.col("name").like("A%")
F.col("email").rlike(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")

# String replacement
F.regexp_replace(F.col("text"), r"\d+", "NUMBER")
F.translate(F.col("text"), "abc", "xyz")

Numeric Functions

# Basic arithmetic
F.col("salary") + F.col("bonus")
F.col("price") * 1.1
F.col("total") / F.col("count")

# Mathematical functions
F.abs(F.col("value"))
F.ceil(F.col("value"))
F.floor(F.col("value"))
F.round(F.col("value"), 2)
F.sqrt(F.col("value"))
F.pow(F.col("base"), F.col("exponent"))

# Aggregation functions
F.sum(F.col("amount"))
F.avg(F.col("price"))
F.min(F.col("date"))
F.max(F.col("date"))
F.count(F.col("id"))
F.countDistinct(F.col("user_id"))

Date and Time Functions

# Current date/time
F.current_date()
F.current_timestamp()

# Date extraction
F.year(F.col("date"))
F.month(F.col("date"))
F.day(F.col("date"))
F.hour(F.col("timestamp"))
F.minute(F.col("timestamp"))
F.second(F.col("timestamp"))
F.dayofweek(F.col("date"))
F.dayofyear(F.col("date"))
F.weekofyear(F.col("date"))

# Date conversion
F.to_date(F.col("date_string"))
F.to_timestamp(F.col("timestamp_string"))
F.to_timestamp(F.col("date_string"), "yyyy-MM-dd")

# Date arithmetic
F.date_add(F.col("date"), 7)
F.date_sub(F.col("date"), 7)
F.datediff(F.col("end_date"), F.col("start_date"))

# Date truncation (PySpark parity)
F.date_trunc("month", F.col("timestamp"))
F.date_trunc("day", F.col("date"))

Conditional Functions

# When/otherwise
F.when(F.col("age") > 65, "senior")
 .when(F.col("age") > 18, "adult")
 .otherwise("minor")

# Case statements
F.expr("CASE WHEN age > 65 THEN 'senior' WHEN age > 18 THEN 'adult' ELSE 'minor' END")

# Null handling
F.coalesce(F.col("col1"), F.col("col2"), F.lit("default"))
F.nvl(F.col("nullable_col"), F.lit("default"))
F.nanvl(F.col("float_col"), F.lit(0.0))

# Null checks
F.col("column").isNull()
F.col("column").isNotNull()

# Null-safe equality (PySpark <=> operator)
# NULL <=> NULL returns True, NULL <=> non-NULL returns False
F.col("id").eqNullSafe(F.col("manager_id"))
F.col("value").eqNullSafe(F.lit(None))

Type Casting

# Type conversion
F.col("string_number").cast("int")
F.col("int_value").cast("string")
F.col("date_string").cast("date")
F.col("timestamp_string").cast("timestamp")

# Safe casting
F.col("value").cast("int")  # May fail on invalid values

Window Functions

Window Specification

from sparkless.sql import Window

# Basic window
window = Window.partitionBy("department").orderBy("salary")

# With frame
window = Window.partitionBy("department") \
    .orderBy("salary") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Range frame
window = Window.partitionBy("category") \
    .orderBy("date") \
    .rangeBetween(-7, 0)  # 7 days before to current

Window Functions

# Ranking functions
F.row_number().over(window)
F.rank().over(window)
F.dense_rank().over(window)
F.percent_rank().over(window)
F.ntile(4).over(window)

# Offset functions
F.lag(F.col("value"), 1).over(window)
F.lead(F.col("value"), 1).over(window)

# Aggregate functions
F.sum(F.col("amount")).over(window)
F.avg(F.col("price")).over(window)
F.count("*").over(window)
F.max(F.col("date")).over(window)
F.min(F.col("date")).over(window)

# First/Last functions
F.first(F.col("value")).over(window)
F.last(F.col("value")).over(window)
F.first(F.col("value"), ignoreNulls=True).over(window)

Data Types

Primitive Types

from sparkless.sql.types import *

# Basic types
StringType()
IntegerType()
LongType()
DoubleType()
FloatType()
BooleanType()
DateType()
TimestampType()

Complex Types

# Array type
# PySpark convention (camelCase keyword - Issue #247)
ArrayType(elementType=StringType())
# Backward-compatible (snake_case keyword)
ArrayType(element_type=StringType())
# Positional argument
ArrayType(StringType())

# Map type
MapType(StringType(), IntegerType())

# Struct type
StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), False)
])

# Nested types
StructType([
    StructField("id", IntegerType(), False),
    StructField("address", StructType([
        StructField("street", StringType(), True),
        StructField("city", StringType(), True)
    ]), True)
])

Catalog Operations

Database Management

# Create database
spark.catalog.createDatabase("my_db")

# Set current database
spark.catalog.setCurrentDatabase("my_db")

# Get current database
current_db = spark.catalog.currentDatabase()

# List databases
databases = spark.catalog.listDatabases()

# Drop database
spark.catalog.dropDatabase("my_db")

Table Management

# List tables
tables = spark.catalog.listTables()
tables = spark.catalog.listTables("my_db")

# Check if table exists
exists = spark.catalog.tableExists("my_table")
exists = spark.catalog.tableExists("my_db.my_table")

# Get table details
table = spark.catalog.getTable("my_table")

# Drop table
spark.catalog.dropTable("my_table")

Data Sources

Reading Data

# JSON
df = spark.read.json("data.json")
df = spark.read.json("data.json", schema=my_schema)

# CSV
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# Parquet
df = spark.read.parquet("data.parquet")

# Table
df = spark.table("my_table")
df = spark.table("my_db.my_table")

Writing Data

# Save as table
df.write.saveAsTable("my_table")
df.write.saveAsTable("my_table", mode="overwrite")

# Save as file
df.write.json("output.json")
df.write.csv("output.csv")
df.write.parquet("output.parquet")

# Write modes
df.write.mode("overwrite").saveAsTable("my_table")
df.write.mode("append").saveAsTable("my_table")
df.write.mode("ignore").saveAsTable("my_table")
df.write.mode("error").saveAsTable("my_table")  # Default

SQL Operations

SQL Queries

# Register DataFrame as table
df.createOrReplaceTempView("my_table")

# Execute SQL
result = spark.sql("SELECT * FROM my_table WHERE age > 25")
result = spark.sql("""
    SELECT department, AVG(salary) as avg_salary
    FROM employees
    GROUP BY department
    ORDER BY avg_salary DESC
""")

SQL Functions

# SQL functions in expressions
F.expr("CASE WHEN age > 65 THEN 'senior' ELSE 'junior' END")
F.expr("SUBSTRING(name, 1, 3)")
F.expr("DATE_ADD(date_col, 7)")

Error Handling

Exception Types

from sparkless.core.exceptions import *

# Column not found
MockSparkColumnNotFoundError

# Type mismatch
MockSparkTypeMismatchError

# Operation errors
MockSparkOperationError

# SQL generation errors
MockSparkSQLGenerationError

# Query execution errors
MockSparkQueryExecutionError

Debug Mode

# Enable debug mode
spark.conf.set("spark.sql.debug", "true")

# Or globally
import sparkless
sparkless.set_debug_mode(True)

Performance Tips

Optimization

# Use column pruning
df.select("id", "name")  # Only select needed columns

# Filter early
df.filter(df.active == True).select("id", "name")

# Use appropriate data types
df.withColumn("id", F.col("id").cast("int"))

Memory Management

# Cache frequently used DataFrames
df.cache()

# Unpersist when done
df.unpersist()

# Check storage level
df.storageLevel

Testing

Unit Testing

import pytest
from sparkless import SparkSession

@pytest.fixture
def spark():
    return SparkSession("test")

def test_data_processing(spark):
    df = spark.createDataFrame([
        {"id": 1, "value": 10},
        {"id": 2, "value": 20}
    ])
    
    result = df.filter(df.value > 15).collect()
    assert len(result) == 1

Integration Testing

def test_complex_pipeline(spark):
    # Test entire data pipeline
    df = spark.read.json("data.json")
    
    processed = df.filter(df.status == "active") \
        .withColumn("processed_at", F.current_timestamp()) \
        .groupBy("category") \
        .agg(F.count("*").alias("count"))
    
    results = processed.collect()
    assert len(results) > 0

Compatibility Notes

PySpark Compatibility

  • 100% API Compatibility: All PySpark operations work identically

  • Column Access: Both df.column_name and F.col("column_name") supported

  • Data Types: All PySpark data types supported

  • Functions: All PySpark functions supported

Known Differences

  • Backend: Uses Polars instead of Spark SQL (DuckDB available as optional legacy backend)

  • Performance: 10x faster than PySpark for most operations

  • Memory: Lower memory usage, no JVM overhead

  • SQL Generation: Some complex operations may generate different SQL

Migration from PySpark

See docs/migration_from_pyspark.md for detailed migration guide.

Examples

Basic Data Processing

from sparkless.sql import SparkSession, functions as F

# Create session
spark = SparkSession("data_processing")

# Create DataFrame
df = spark.createDataFrame([
    {"id": 1, "name": "Alice", "age": 25, "salary": 50000},
    {"id": 2, "name": "Bob", "age": 30, "salary": 60000},
    {"id": 3, "name": "Charlie", "age": 35, "salary": 70000}
])

# Process data
result = df.filter(df.age > 25) \
    .withColumn("senior", F.when(df.age > 30, True).otherwise(False)) \
    .groupBy("senior") \
    .agg(F.avg("salary").alias("avg_salary")) \
    .collect()

print(result)

Window Functions

from sparkless.sql import Window

# Define window
window = Window.partitionBy("department").orderBy("salary")

# Apply window functions
result = df.withColumn("rank", F.rank().over(window)) \
    .withColumn("row_num", F.row_number().over(window)) \
    .withColumn("lag_salary", F.lag("salary", 1).over(window)) \
    .collect()

Complex Aggregations

# Multiple aggregations
result = df.groupBy("department") \
    .agg(
        F.count("*").alias("count"),
        F.avg("salary").alias("avg_salary"),
        F.max("salary").alias("max_salary"),
        F.min("salary").alias("min_salary")
    ) \
    .collect()

This API reference provides comprehensive documentation for all Sparkless functionality. For more examples and patterns, see docs/testing_patterns.md.