DataFrame API
DataFrame
Mock DataFrame implementation for Sparkless.
This module provides a complete mock implementation of PySpark DataFrame that behaves identically to the real PySpark DataFrame for testing and development purposes. It supports all major DataFrame operations including selection, filtering, grouping, joining, and window functions.
- Key Features:
Complete PySpark API compatibility
100% type-safe operations with mypy compliance
Window function support with partitioning and ordering
Comprehensive error handling matching PySpark exceptions
In-memory storage for fast test execution
Mockable methods for error testing scenarios
Enhanced DataFrameWriter with all save modes
Advanced data type support (15+ types including complex types)
Example
>>> from sparkless.sql import SparkSession, functions as F
>>> spark = SparkSession("test")
>>> data = [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}]
>>> df = spark.createDataFrame(data)
>>> df.select("name", "age").filter(F.col("age") > 25).show()
DataFrame[1 rows, 2 columns]
name age
Bob 30
- class sparkless.dataframe.dataframe.DataFrame(data, schema, storage=None, operations=None)[source]
Bases:
objectMock DataFrame implementation with complete PySpark API compatibility.
Provides a comprehensive mock implementation of PySpark DataFrame that supports all major operations including selection, filtering, grouping, joining, and window functions. Designed for testing and development without requiring JVM.
- data
List of dictionaries representing DataFrame rows.
- schema
StructType defining the DataFrame schema.
- storage
Optional storage manager for persistence operations.
Example
>>> from sparkless.sql import SparkSession, functions as F >>> spark = SparkSession("test") >>> data = [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}] >>> df = spark.createDataFrame(data) >>> df.filter(F.col("age") > 25).select("name").show() DataFrame[1 rows, 1 columns] name Bob
- Parameters:
Initialize DataFrame.
- Parameters:
data (
List[Dict[str,Any]]) – List of dictionaries representing DataFrame rows.schema (
StructType) – StructType defining the DataFrame schema.storage (
Optional[IStorageManager]) – Optional storage manager for persistence operations. Defaults to a new MemoryStorageManager instance.operations (
Optional[List[Tuple[str,Any]]]) – Optional list of queued operations as (operation_name, payload) tuples.
- __init__(data, schema, storage=None, operations=None)[source]
Initialize DataFrame.
- Parameters:
data (
List[Dict[str,Any]]) – List of dictionaries representing DataFrame rows.schema (
StructType) – StructType defining the DataFrame schema.storage (
Optional[IStorageManager]) – Optional storage manager for persistence operations. Defaults to a new MemoryStorageManager instance.operations (
Optional[List[Tuple[str,Any]]]) – Optional list of queued operations as (operation_name, payload) tuples.
- selectExpr(*exprs)[source]
Select columns or expressions using SQL-like syntax.
- Parameters:
exprs (
str)- Return type:
SupportsDataFrameOps
- filter(condition)[source]
Filter rows based on condition.
- Parameters:
condition (
Union[ColumnOperation,Column,Literal,str]) – Filter condition. Can be: - ColumnOperation or Column (e.g., df.salary > 55000) - String SQL expression (e.g., “salary > 55000”) - Literal boolean value- Return type:
SupportsDataFrameOps
- where(condition)[source]
Alias for filter() - Filter rows based on condition.
- Parameters:
condition (
Union[ColumnOperation,Column,str]) – Filter condition. Can be: - ColumnOperation or Column (e.g., df.salary > 55000) - String SQL expression (e.g., “salary > 55000”)- Return type:
SupportsDataFrameOps
- withColumn(col_name, col)[source]
Add or replace column.
- Parameters:
col_name (
str)col (
Union[Column,ColumnOperation,Literal,Any])
- Return type:
SupportsDataFrameOps
- offset(n)[source]
Skip first n rows (SQL OFFSET clause).
- Parameters:
n (
int)- Return type:
SupportsDataFrameOps
- coalesce(numPartitions)[source]
Coalesce partitions (no-op in mock; returns self).
- Parameters:
numPartitions (
int)- Return type:
SupportsDataFrameOps
- join(other, on, how='inner')[source]
Join with another DataFrame.
- Parameters:
other (
SupportsDataFrameOps)on (
Union[str,List[str],ColumnOperation])how (
str)
- Return type:
SupportsDataFrameOps
- crossJoin(other)[source]
Cross join (Cartesian product) with another DataFrame.
- Parameters:
other (
SupportsDataFrameOps)- Return type:
SupportsDataFrameOps
- union(other)[source]
Union with another DataFrame.
- Parameters:
other (
SupportsDataFrameOps)- Return type:
SupportsDataFrameOps
- unionByName(other, allowMissingColumns=False)[source]
Union with another DataFrame by column names.
- Parameters:
other (
SupportsDataFrameOps)allowMissingColumns (
bool)
- Return type:
SupportsDataFrameOps
- unionAll(other)[source]
Deprecated alias for union() - Use union() instead.
- Parameters:
other (
SupportsDataFrameOps)- Return type:
SupportsDataFrameOps
- intersect(other)[source]
Intersect with another DataFrame.
- Parameters:
other (
SupportsDataFrameOps)- Return type:
SupportsDataFrameOps
- intersectAll(other)[source]
Return intersection with duplicates.
- Parameters:
other (
SupportsDataFrameOps)- Return type:
SupportsDataFrameOps
- exceptAll(other)[source]
Except all with another DataFrame.
- Parameters:
other (
SupportsDataFrameOps)- Return type:
SupportsDataFrameOps
- subtract(other)[source]
Return rows in this DataFrame but not in another.
- Parameters:
other (
SupportsDataFrameOps)- Return type:
SupportsDataFrameOps
- groupBy(*columns)[source]
Group DataFrame by columns for aggregation operations.
- Parameters:
- Return type:
- head(n=None)[source]
Return first n rows.
PySpark behavior: - head() (no args) returns a single Row - head(1) or head(n) returns a list of Rows
- first()[source]
Return the first row, or None if the DataFrame is empty.
This method matches PySpark’s DataFrame.first() behavior: - Returns a single Row object (not a list like head()) - Returns None if the DataFrame is empty
Example
>>> df = spark.createDataFrame([{"name": "Alice"}, {"name": "Bob"}]) >>> df.first() Row(name='Alice') >>> empty_df.first() None
- toJSON()[source]
Return a single-column DataFrame of JSON strings.
- Return type:
SupportsDataFrameOps
- cache()[source]
Cache the DataFrame for reuse.
In PySpark, caching a DataFrame that uses string concatenation with + operator causes the concatenation to return None/null values. This method marks the DataFrame as cached to match that behavior.
- Return type:
SupportsDataFrameOps- Returns:
Self (cached DataFrame).
- persist(storageLevel=None)[source]
Persist the DataFrame with the given storage level.
- Parameters:
storageLevel (
Any) – Storage level (ignored in mock, but kept for API compatibility).- Return type:
SupportsDataFrameOps- Returns:
Self (persisted DataFrame).
- fillna(value, subset=None)[source]
Fill null values.
- Parameters:
value (
Union[Any,Dict[str,Any]]) – Value to fill nulls with. Can be a single value or a dict mapping column names to fill values.subset (
Union[str,List[str],Tuple[str,...],None]) – Optional column name(s) to limit fillna operation to. Can be a string (single column), list, or tuple of column names. If value is a dict, subset is ignored.
- Return type:
SupportsDataFrameOps- Returns:
DataFrame with null values filled.
- Raises:
ColumnNotFoundException – If any column in subset doesn’t exist.
- property na: Any
Access null-handling methods via .na namespace.
Provides PySpark-compatible API for handling null values in DataFrame.
- Returns:
NAHandler instance for null-handling operations.
Example
>>> df.na.fill(0) # Fill all nulls with 0 >>> df.na.fill({"col1": 0, "col2": "default"}) # Fill with dict
- describe(*cols)[source]
Compute basic statistics for numeric columns.
- Parameters:
cols (
str)- Return type:
SupportsDataFrameOps
- summary(*stats)[source]
Compute extended statistics for numeric columns.
- Parameters:
stats (
str)- Return type:
SupportsDataFrameOps
- transform(func)[source]
Apply a function to transform a DataFrame.
- Parameters:
func (
Any)- Return type:
SupportsDataFrameOps
- mapPartitions(func, preservesPartitioning=False)[source]
Apply a function to each partition of the DataFrame.
- mapInPandas(func, schema)[source]
Map an iterator of pandas DataFrames to another iterator of pandas DataFrames.
- unpivot(ids, values, variableColumnName='variable', valueColumnName='value')[source]
Unpivot columns into rows.
- melt(ids=None, values=None, variableColumnName='variable', valueColumnName='value')[source]
Unpivot DataFrame from wide to long format.
- explain(extended=False, codegen=False, cost=False, formatted=False, mode=None)[source]
Explain the execution plan.
- toDF(*cols)[source]
Rename columns of DataFrame.
- Parameters:
cols (
str)- Return type:
SupportsDataFrameOps
- alias(alias)[source]
Give DataFrame an alias for join operations.
- Parameters:
alias (
str)- Return type:
SupportsDataFrameOps
- localCheckpoint(eager=True)[source]
Local checkpoint to truncate lineage.
- Parameters:
eager (
bool)- Return type:
SupportsDataFrameOps
- property schema: StructType
Get DataFrame schema.
If lazy with queued operations, project the resulting schema without materializing data.
- __getitem__(key)[source]
Access column by name using dictionary-style syntax.
- Parameters:
key (
Union[str,int,slice,List[str],Tuple[str,...]]) – Column name (str), column index (int), or slice for multiple columns.- Return type:
Union[Column,SupportsDataFrameOps]- Returns:
Column object for single column, or DataFrame for slice.
Example
>>> df["name"] # Returns Column >>> df[["name", "age"]] # Returns DataFrame with selected columns
- property rdd: MockRDD
Get RDD representation.
- createOrReplaceGlobalTempView(name)[source]
Create or replace a global temporary view (all PySpark versions).
Unlike createGlobalTempView, this method does not raise an error if the view already exists.
Example
>>> df.createOrReplaceGlobalTempView("my_global_view") >>> spark.sql("SELECT * FROM global_temp.my_global_view")
- checkpoint(eager=False)[source]
Checkpoint the DataFrame (no-op in mock; returns self).
- Parameters:
eager (
bool)- Return type:
SupportsDataFrameOps
- observe(name, *exprs)[source]
Define observation metrics (PySpark 3.3+).
- Parameters:
- Return type:
SupportsDataFrameOps- Returns:
Same DataFrame with observation registered
Example
>>> df.observe("metrics", F.count(F.lit(1)).alias("count"))
- property write: DataFrameWriter
Get DataFrame writer (PySpark-compatible property).
GroupedData
Base grouped data implementation for Sparkless.
This module provides the core GroupedData class for DataFrame aggregation operations, maintaining compatibility with PySpark’s GroupedData interface.
- class sparkless.dataframe.grouped.base.GroupedData(df, group_columns, group_output_names=None)[source]
Bases:
objectMock grouped data for aggregation operations.
Provides grouped data functionality for DataFrame aggregation operations, maintaining compatibility with PySpark’s GroupedData interface.
- Parameters:
Initialize GroupedData.
- Parameters:
df (
SupportsDataFrameOps) – The DataFrame being grouped.group_columns (
List[str]) – List of column names to group by (for reading from rows).group_output_names (
Optional[List[str]]) – Optional output names (e.g. from alias). When provided, used for result row keys instead of group_columns (Issue #397).
- __init__(df, group_columns, group_output_names=None)[source]
Initialize GroupedData.
- Parameters:
df (
SupportsDataFrameOps) – The DataFrame being grouped.group_columns (
List[str]) – List of column names to group by (for reading from rows).group_output_names (
Optional[List[str]]) – Optional output names (e.g. from alias). When provided, used for result row keys instead of group_columns (Issue #397).
- agg(*exprs)[source]
Aggregate grouped data.
- Parameters:
*exprs (
Union[str,Column,ColumnOperation,AggregateFunction,Dict[str,str]]) – Aggregation expressions or dictionary mapping column names to aggregation functions.- Return type:
- Returns:
New DataFrame with aggregated results.
- mean(*columns)[source]
Mean grouped data (alias for avg).
- Parameters:
- Return type:
- Returns:
DataFrame with mean aggregations.
Example
>>> df.groupBy("Name").mean("Value")
- applyInPandas(func, schema)[source]
Apply a Python native function to each group using pandas DataFrames.
The function should take a pandas DataFrame and return a pandas DataFrame. For each group, the group data is passed as a pandas DataFrame to the function and the returned pandas DataFrame is used to construct the output rows.
- Parameters:
- Returns:
Result of applying the function to each group.
- Return type:
Example
>>> def normalize(pdf): ... pdf['normalized'] = (pdf['value'] - pdf['value'].mean()) / pdf['value'].std() ... return pdf >>> df.groupBy("category").applyInPandas(normalize, schema="category string, value double, normalized double")
- transform(func)[source]
Apply a function to each group and return a DataFrame with the same schema.
This is similar to applyInPandas but preserves the original schema. The function should take a pandas DataFrame and return a pandas DataFrame with the same columns (though it may add computed columns).
- Parameters:
func (
Any) – A function that takes a pandas DataFrame and returns a pandas DataFrame.- Returns:
Result of applying the function to each group.
- Return type:
Example
>>> def add_group_stats(pdf): ... pdf['group_mean'] = pdf['value'].mean() ... pdf['group_std'] = pdf['value'].std() ... return pdf >>> df.groupBy("category").transform(add_group_stats)
DataFrameWriter
Mock DataFrameWriter implementation for DataFrame write operations.
This module provides DataFrame writing functionality, maintaining compatibility with PySpark’s DataFrameWriter interface. Supports writing to various data sinks including tables, files, and custom storage backends with multiple save modes.
- Key Features:
Complete PySpark DataFrameWriter API compatibility
Support for multiple output formats (parquet, json, csv)
Multiple save modes (append, overwrite, error, ignore)
Flexible options configuration
Integration with storage manager
Table and file output support
Error handling for invalid configurations
Example
>>> from sparkless.sql import SparkSession
>>> spark = SparkSession("test")
>>> df = spark.createDataFrame([{"name": "Alice", "age": 25}])
>>> # Save as table
>>> df.write.mode("overwrite").saveAsTable("users")
>>> # Save to file with options
>>> df.write.format("parquet").option("compression", "snappy").save("/path")
- class sparkless.dataframe.writer.DataFrameWriter(df, storage)[source]
Bases:
objectMock DataFrame writer for saveAsTable operations.
Provides a PySpark-compatible interface for writing DataFrames to storage formats. Supports various formats and save modes for testing and development.
- df
The DataFrame to be written.
- storage
Storage manager for persisting data.
- format_name
Output format (e.g., ‘parquet’, ‘json’).
- save_mode
Save mode (‘append’, ‘overwrite’, ‘error’, ‘ignore’).
Example
>>> df.write.format("parquet").mode("overwrite").saveAsTable("my_table")
- Parameters:
df (
DataFrame)storage (
IStorageManager)
Initialize DataFrameWriter.
- Parameters:
df (
DataFrame) – The DataFrame to be written.storage (
IStorageManager) – Storage manager for persisting data.
- __init__(df, storage)[source]
Initialize DataFrameWriter.
- Parameters:
df (
DataFrame) – The DataFrame to be written.storage (
IStorageManager) – Storage manager for persisting data.
- format(source)[source]
Set the output format for the DataFrame writer.
- Parameters:
source (
str) – The output format (e.g., ‘parquet’, ‘json’, ‘csv’).- Return type:
- Returns:
Self for method chaining.
Example
>>> df.write.format("parquet")
- mode(mode)[source]
Set the save mode for the DataFrame writer.
- Parameters:
mode (
str) – Save mode (‘append’, ‘overwrite’, ‘error’, ‘ignore’).- Return type:
- Returns:
Self for method chaining.
- Raises:
IllegalArgumentException – If mode is not valid.
Example
>>> df.write.mode("overwrite")
- property saveMode: str
Get the current save mode (PySpark compatibility).
- Returns:
Current save mode string.
- option(key, value)[source]
Set an option for the DataFrame writer.
- Parameters:
- Return type:
- Returns:
Self for method chaining.
Example
>>> df.write.option("compression", "snappy")
- options(**kwargs)[source]
Set multiple options for the DataFrame writer.
- Parameters:
**kwargs (
Any) – Option key-value pairs.- Return type:
- Returns:
Self for method chaining.
Example
>>> df.write.options(compression="snappy", format="parquet")
- partitionBy(*cols)[source]
Partition output by given columns.
- Parameters:
*cols (
str) – Column names to partition by.- Return type:
- Returns:
Self for method chaining.
Example
>>> df.write.partitionBy("year", "month")
- saveAsTable(table_name)[source]
Save DataFrame as a table in storage.
- Parameters:
table_name (
str) – Name of the table (can include schema, e.g., ‘schema.table’).- Raises:
AnalysisException – If table operations fail.
IllegalArgumentException – If table name is invalid.
- Return type:
Example
>>> df.write.saveAsTable("my_table") >>> df.write.saveAsTable("schema.my_table")
- save(path=None)[source]
Save DataFrame to a file path.
- Parameters:
path (
Optional[str]) – Optional file path to save to. If None, uses a default path.- Raises:
IllegalArgumentException – If path is invalid.
- Return type:
Example
>>> df.write.format("parquet").mode("overwrite").save("/path/to/file")
- parquet(path, **options)[source]
Save DataFrame in Parquet format.
- Parameters:
- Return type:
Example
>>> df.write.parquet("/path/to/file.parquet")
- json(path, **options)[source]
Save DataFrame in JSON format.
- Parameters:
- Return type:
Example
>>> df.write.json("/path/to/file.json")
- delta(path, **options)[source]
Save DataFrame in Delta Lake format.
This is a convenience method equivalent to: df.write.format(“delta”).save(path)
- Parameters:
- Return type:
Example
>>> df.write.delta("/path/to/delta_table") >>> df.write.delta("/path/to/delta_table", mergeSchema=True)
- csv(path, **options)[source]
Save DataFrame in CSV format.
- Parameters:
- Return type:
Example
>>> df.write.csv("/path/to/file.csv")
DataFrameReader
Mock DataFrameReader implementation for DataFrame read operations.
This module provides DataFrame reading functionality, maintaining compatibility with PySpark’s DataFrameReader interface. Supports reading from various data sources including tables, files, and custom storage backends.
- Key Features:
Complete PySpark DataFrameReader API compatibility
Support for multiple data formats (parquet, json, csv, table)
Flexible options configuration
Integration with storage manager
Schema inference and validation
Error handling for missing data sources
Example
>>> from sparkless.sql import SparkSession
>>> spark = SparkSession("test")
>>> # Read from table
>>> df = spark.read.table("my_table")
>>> # Read with format and options
>>> df = spark.read.format("parquet").option("header", "true").load("/path")
- class sparkless.dataframe.reader.DataFrameReader(session)[source]
Bases:
objectMock DataFrameReader for reading data from various sources.
Provides a PySpark-compatible interface for reading DataFrames from storage formats and tables. Supports various formats and options for testing and development.
- session
Sparkless session instance.
- _format
Input format (e.g., ‘parquet’, ‘json’).
- _options
Additional options for the reader.
Example
>>> spark.read.format("parquet").load("/path/to/file") >>> spark.read.table("my_table")
- Parameters:
session (
ISession)
Initialize DataFrameReader.
- Parameters:
session (
ISession) – Sparkless session instance.
- __init__(session)[source]
Initialize DataFrameReader.
- Parameters:
session (
ISession) – Sparkless session instance.
- format(source)[source]
Set input format.
- Parameters:
source (
str) – Data source format.- Return type:
- Returns:
Self for method chaining.
Example
>>> spark.read.format("parquet")
- option(key, value)[source]
Set option.
- Parameters:
- Return type:
- Returns:
Self for method chaining.
Example
>>> spark.read.option("header", "true")
- options(**options)[source]
Set multiple options.
- Parameters:
**options (
Any) – Option key-value pairs.- Return type:
- Returns:
Self for method chaining.
Example
>>> spark.read.options(header="true", inferSchema="true")
- schema(schema)[source]
Set schema.
- Parameters:
schema (
Union[StructType,str]) – Schema definition.- Return type:
- Returns:
Self for method chaining.
Example
>>> spark.read.schema("name STRING, age INT")
- load(path=None, format=None, **options)[source]
Load data.
- Parameters:
- Return type:
IDataFrame- Returns:
DataFrame with loaded data.
Example
>>> spark.read.load("/path/to/file") >>> spark.read.format("parquet").load("/path/to/file")
- table(table_name)[source]
Load table.
- Parameters:
table_name (
str) – Table name.- Return type:
IDataFrame- Returns:
DataFrame with table data.
Example
>>> spark.read.table("my_table") >>> spark.read.format("delta").option("versionAsOf", 0).table("my_table")
- orc(path, **options)[source]
Load ORC data.
- Parameters:
- Return type:
IDataFrame- Returns:
DataFrame with ORC data.
Example
>>> spark.read.orc("/path/to/file.orc")