DataFrame API

DataFrame

Mock DataFrame implementation for Sparkless.

This module provides a complete mock implementation of PySpark DataFrame that behaves identically to the real PySpark DataFrame for testing and development purposes. It supports all major DataFrame operations including selection, filtering, grouping, joining, and window functions.

Key Features:
  • Complete PySpark API compatibility

  • 100% type-safe operations with mypy compliance

  • Window function support with partitioning and ordering

  • Comprehensive error handling matching PySpark exceptions

  • In-memory storage for fast test execution

  • Mockable methods for error testing scenarios

  • Enhanced DataFrameWriter with all save modes

  • Advanced data type support (15+ types including complex types)

Example

>>> from sparkless.sql import SparkSession, functions as F
>>> spark = SparkSession("test")
>>> data = [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}]
>>> df = spark.createDataFrame(data)
>>> df.select("name", "age").filter(F.col("age") > 25).show()
DataFrame[1 rows, 2 columns]

name age
Bob    30
class sparkless.dataframe.dataframe.DataFrame(data, schema, storage=None, operations=None)[source]

Bases: object

Mock DataFrame implementation with complete PySpark API compatibility.

Provides a comprehensive mock implementation of PySpark DataFrame that supports all major operations including selection, filtering, grouping, joining, and window functions. Designed for testing and development without requiring JVM.

data

List of dictionaries representing DataFrame rows.

schema

StructType defining the DataFrame schema.

storage

Optional storage manager for persistence operations.

Example

>>> from sparkless.sql import SparkSession, functions as F
>>> spark = SparkSession("test")
>>> data = [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}]
>>> df = spark.createDataFrame(data)
>>> df.filter(F.col("age") > 25).select("name").show()
DataFrame[1 rows, 1 columns]

name
Bob
Parameters:

Initialize DataFrame.

Parameters:
  • data (List[Dict[str, Any]]) – List of dictionaries representing DataFrame rows.

  • schema (StructType) – StructType defining the DataFrame schema.

  • storage (Optional[IStorageManager]) – Optional storage manager for persistence operations. Defaults to a new MemoryStorageManager instance.

  • operations (Optional[List[Tuple[str, Any]]]) – Optional list of queued operations as (operation_name, payload) tuples.

__init__(data, schema, storage=None, operations=None)[source]

Initialize DataFrame.

Parameters:
  • data (List[Dict[str, Any]]) – List of dictionaries representing DataFrame rows.

  • schema (StructType) – StructType defining the DataFrame schema.

  • storage (Optional[IStorageManager]) – Optional storage manager for persistence operations. Defaults to a new MemoryStorageManager instance.

  • operations (Optional[List[Tuple[str, Any]]]) – Optional list of queued operations as (operation_name, payload) tuples.

data: List[Dict[str, Any]]
storage: Any
select(*columns)[source]

Select columns from the DataFrame.

Parameters:

columns (Union[str, Column, Literal, Any])

Return type:

SupportsDataFrameOps

selectExpr(*exprs)[source]

Select columns or expressions using SQL-like syntax.

Parameters:

exprs (str)

Return type:

SupportsDataFrameOps

filter(condition)[source]

Filter rows based on condition.

Parameters:

condition (Union[ColumnOperation, Column, Literal, str]) – Filter condition. Can be: - ColumnOperation or Column (e.g., df.salary > 55000) - String SQL expression (e.g., “salary > 55000”) - Literal boolean value

Return type:

SupportsDataFrameOps

where(condition)[source]

Alias for filter() - Filter rows based on condition.

Parameters:

condition (Union[ColumnOperation, Column, str]) – Filter condition. Can be: - ColumnOperation or Column (e.g., df.salary > 55000) - String SQL expression (e.g., “salary > 55000”)

Return type:

SupportsDataFrameOps

withColumn(col_name, col)[source]

Add or replace column.

Parameters:
Return type:

SupportsDataFrameOps

withColumns(colsMap)[source]

Add or replace multiple columns at once.

Parameters:

colsMap (Dict[str, Union[Column, ColumnOperation, Literal, Any]])

Return type:

SupportsDataFrameOps

withColumnRenamed(existing, new)[source]

Rename a column.

Parameters:
Return type:

SupportsDataFrameOps

withColumnsRenamed(colsMap)[source]

Rename multiple columns.

Parameters:

colsMap (Dict[str, str])

Return type:

SupportsDataFrameOps

drop(*cols)[source]

Drop columns.

Parameters:

cols (str)

Return type:

SupportsDataFrameOps

distinct()[source]

Return distinct rows.

Return type:

SupportsDataFrameOps

dropDuplicates(subset=None)[source]

Drop duplicate rows.

Parameters:

subset (Optional[List[str]])

Return type:

SupportsDataFrameOps

drop_duplicates(subset=None)[source]

Alias for dropDuplicates().

Parameters:

subset (Optional[List[str]])

Return type:

SupportsDataFrameOps

orderBy(*columns, ascending=True)[source]

Order by columns.

Parameters:
  • *columns (Union[str, Column]) – Column names or Column objects to order by

  • ascending (bool) – Whether to sort in ascending order (default: True)

Return type:

SupportsDataFrameOps

Returns:

DataFrame sorted by the specified columns

sort(*columns, **kwargs)[source]

Alias for orderBy() - Sort DataFrame by columns.

Parameters:
Return type:

SupportsDataFrameOps

limit(n)[source]

Limit number of rows.

Parameters:

n (int)

Return type:

SupportsDataFrameOps

offset(n)[source]

Skip first n rows (SQL OFFSET clause).

Parameters:

n (int)

Return type:

SupportsDataFrameOps

repartition(numPartitions, *cols)[source]

Repartition DataFrame (no-op in mock; returns self).

Parameters:
  • numPartitions (int)

  • cols (Any)

Return type:

SupportsDataFrameOps

coalesce(numPartitions)[source]

Coalesce partitions (no-op in mock; returns self).

Parameters:

numPartitions (int)

Return type:

SupportsDataFrameOps

replace(to_replace, value=None, subset=None)[source]

Replace values in DataFrame.

Parameters:
Return type:

SupportsDataFrameOps

colRegex(colName)[source]

Select columns matching a regex pattern.

Parameters:

colName (str)

Return type:

Column

join(other, on, how='inner')[source]

Join with another DataFrame.

Parameters:
Return type:

SupportsDataFrameOps

crossJoin(other)[source]

Cross join (Cartesian product) with another DataFrame.

Parameters:

other (SupportsDataFrameOps)

Return type:

SupportsDataFrameOps

union(other)[source]

Union with another DataFrame.

Parameters:

other (SupportsDataFrameOps)

Return type:

SupportsDataFrameOps

unionByName(other, allowMissingColumns=False)[source]

Union with another DataFrame by column names.

Parameters:
  • other (SupportsDataFrameOps)

  • allowMissingColumns (bool)

Return type:

SupportsDataFrameOps

unionAll(other)[source]

Deprecated alias for union() - Use union() instead.

Parameters:

other (SupportsDataFrameOps)

Return type:

SupportsDataFrameOps

intersect(other)[source]

Intersect with another DataFrame.

Parameters:

other (SupportsDataFrameOps)

Return type:

SupportsDataFrameOps

intersectAll(other)[source]

Return intersection with duplicates.

Parameters:

other (SupportsDataFrameOps)

Return type:

SupportsDataFrameOps

exceptAll(other)[source]

Except all with another DataFrame.

Parameters:

other (SupportsDataFrameOps)

Return type:

SupportsDataFrameOps

subtract(other)[source]

Return rows in this DataFrame but not in another.

Parameters:

other (SupportsDataFrameOps)

Return type:

SupportsDataFrameOps

groupBy(*columns)[source]

Group DataFrame by columns for aggregation operations.

Parameters:

columns (Union[str, Column])

Return type:

GroupedData

groupby(*cols, **kwargs)[source]

Lowercase alias for groupBy().

Parameters:
Return type:

GroupedData

rollup(*columns)[source]

Create rollup grouped data for hierarchical grouping.

Parameters:

columns (Union[str, Column])

Return type:

Any

cube(*columns)[source]

Create cube grouped data for multi-dimensional grouping.

Parameters:

columns (Union[str, Column])

Return type:

Any

agg(*exprs)[source]

Aggregate DataFrame without grouping (global aggregation).

Parameters:

exprs (Union[str, Column, ColumnOperation, Dict[str, str]])

Return type:

SupportsDataFrameOps

show(n=20, truncate=True)[source]

Display DataFrame content in a clean table format.

Parameters:
Return type:

None

printSchema()[source]

Print DataFrame schema.

Return type:

None

collect()[source]

Collect all data as list of Row objects.

Return type:

List[Row]

take(n)[source]

Take first n rows as list of Row objects.

Parameters:

n (int)

Return type:

List[Row]

head(n=None)[source]

Return first n rows.

PySpark behavior: - head() (no args) returns a single Row - head(1) or head(n) returns a list of Rows

Parameters:

n (Optional[int])

Return type:

Union[Row, List[Row]]

first()[source]

Return the first row, or None if the DataFrame is empty.

This method matches PySpark’s DataFrame.first() behavior: - Returns a single Row object (not a list like head()) - Returns None if the DataFrame is empty

Return type:

Optional[Row]

Returns:

First Row, or None if DataFrame is empty.

Example

>>> df = spark.createDataFrame([{"name": "Alice"}, {"name": "Bob"}])
>>> df.first()
Row(name='Alice')
>>> empty_df.first()
None
tail(n=1)[source]

Return last n rows. Always returns a list, matching PySpark behavior.

Parameters:

n (int)

Return type:

List[Row]

toPandas()[source]

Convert to pandas DataFrame.

Return type:

Any

toJSON()[source]

Return a single-column DataFrame of JSON strings.

Return type:

SupportsDataFrameOps

count()[source]

Count number of rows.

Return type:

int

cache()[source]

Cache the DataFrame for reuse.

In PySpark, caching a DataFrame that uses string concatenation with + operator causes the concatenation to return None/null values. This method marks the DataFrame as cached to match that behavior.

Return type:

SupportsDataFrameOps

Returns:

Self (cached DataFrame).

persist(storageLevel=None)[source]

Persist the DataFrame with the given storage level.

Parameters:

storageLevel (Any) – Storage level (ignored in mock, but kept for API compatibility).

Return type:

SupportsDataFrameOps

Returns:

Self (persisted DataFrame).

isEmpty()[source]

Check if DataFrame is empty.

Return type:

bool

dropna(how='any', thresh=None, subset=None)[source]

Drop rows with null values.

Parameters:
Return type:

SupportsDataFrameOps

fillna(value, subset=None)[source]

Fill null values.

Parameters:
  • value (Union[Any, Dict[str, Any]]) – Value to fill nulls with. Can be a single value or a dict mapping column names to fill values.

  • subset (Union[str, List[str], Tuple[str, ...], None]) – Optional column name(s) to limit fillna operation to. Can be a string (single column), list, or tuple of column names. If value is a dict, subset is ignored.

Return type:

SupportsDataFrameOps

Returns:

DataFrame with null values filled.

Raises:

ColumnNotFoundException – If any column in subset doesn’t exist.

property na: Any

Access null-handling methods via .na namespace.

Provides PySpark-compatible API for handling null values in DataFrame.

Returns:

NAHandler instance for null-handling operations.

Example

>>> df.na.fill(0)  # Fill all nulls with 0
>>> df.na.fill({"col1": 0, "col2": "default"})  # Fill with dict
sample(fraction, seed=None, withReplacement=False)[source]

Sample rows from DataFrame.

Parameters:
Return type:

SupportsDataFrameOps

sampleBy(col, fractions, seed=None)[source]

Stratified sampling.

Parameters:
Return type:

SupportsDataFrameOps

randomSplit(weights, seed=None)[source]

Randomly split DataFrame into multiple DataFrames.

Parameters:
Return type:

List[SupportsDataFrameOps]

describe(*cols)[source]

Compute basic statistics for numeric columns.

Parameters:

cols (str)

Return type:

SupportsDataFrameOps

summary(*stats)[source]

Compute extended statistics for numeric columns.

Parameters:

stats (str)

Return type:

SupportsDataFrameOps

crosstab(col1, col2)[source]

Calculate cross-tabulation.

Parameters:
Return type:

SupportsDataFrameOps

freqItems(cols, support=None)[source]

Find frequent items.

Parameters:
Return type:

SupportsDataFrameOps

approxQuantile(col, probabilities, relativeError)[source]

Calculate approximate quantiles.

Parameters:
Return type:

Union[List[float], List[List[float]]]

cov(col1, col2)[source]

Calculate covariance between two columns.

Parameters:
Return type:

float

transform(func)[source]

Apply a function to transform a DataFrame.

Parameters:

func (Any)

Return type:

SupportsDataFrameOps

mapPartitions(func, preservesPartitioning=False)[source]

Apply a function to each partition of the DataFrame.

Parameters:
  • func (Any)

  • preservesPartitioning (bool)

Return type:

SupportsDataFrameOps

mapInPandas(func, schema)[source]

Map an iterator of pandas DataFrames to another iterator of pandas DataFrames.

Parameters:
Return type:

SupportsDataFrameOps

unpivot(ids, values, variableColumnName='variable', valueColumnName='value')[source]

Unpivot columns into rows.

Parameters:
Return type:

SupportsDataFrameOps

melt(ids=None, values=None, variableColumnName='variable', valueColumnName='value')[source]

Unpivot DataFrame from wide to long format.

Parameters:
Return type:

SupportsDataFrameOps

explain(extended=False, codegen=False, cost=False, formatted=False, mode=None)[source]

Explain the execution plan.

Parameters:
Return type:

Optional[str]

toDF(*cols)[source]

Rename columns of DataFrame.

Parameters:

cols (str)

Return type:

SupportsDataFrameOps

alias(alias)[source]

Give DataFrame an alias for join operations.

Parameters:

alias (str)

Return type:

SupportsDataFrameOps

hint(name, *parameters)[source]

Provide query optimization hints.

Parameters:
Return type:

SupportsDataFrameOps

withWatermark(eventTime, delayThreshold)[source]

Define watermark for streaming.

Parameters:
  • eventTime (str)

  • delayThreshold (str)

Return type:

SupportsDataFrameOps

sameSemantics(other)[source]

Check if this DataFrame has the same semantics as another.

Parameters:

other (DataFrame)

Return type:

bool

semanticHash()[source]

Return semantic hash of this DataFrame.

Return type:

int

inputFiles()[source]

Return list of input files for this DataFrame.

Return type:

List[str]

repartitionByRange(numPartitions, *cols)[source]

Repartition by range of column values.

Parameters:
Return type:

SupportsDataFrameOps

sortWithinPartitions(*cols, **kwargs)[source]

Sort within partitions.

Parameters:
Return type:

SupportsDataFrameOps

toLocalIterator(prefetchPartitions=False)[source]

Return iterator over rows.

Parameters:

prefetchPartitions (bool)

Return type:

Any

localCheckpoint(eager=True)[source]

Local checkpoint to truncate lineage.

Parameters:

eager (bool)

Return type:

SupportsDataFrameOps

isLocal()[source]

Check if running in local mode.

Return type:

bool

property isStreaming: bool

Whether this DataFrame is streaming (always False in mock).

foreach(f)[source]

Apply function to each row.

Parameters:

f (Any)

Return type:

None

foreachPartition(f)[source]

Apply function to each partition.

Parameters:

f (Any)

Return type:

None

writeTo(table)[source]

Write DataFrame to a table using the Table API.

Parameters:

table (str)

Return type:

Any

property columns: List[str]

Get column names.

property schema: StructType

Get DataFrame schema.

If lazy with queued operations, project the resulting schema without materializing data.

property dtypes: List[Tuple[str, str]]

Get column names and their data types.

__getattribute__(name)[source]

Custom attribute access for DataFrame.

Parameters:

name (str)

Return type:

Any

__getitem__(key)[source]

Access column by name using dictionary-style syntax.

Parameters:

key (Union[str, int, slice, List[str], Tuple[str, ...]]) – Column name (str), column index (int), or slice for multiple columns.

Return type:

Union[Column, SupportsDataFrameOps]

Returns:

Column object for single column, or DataFrame for slice.

Example

>>> df["name"]  # Returns Column
>>> df[["name", "age"]]  # Returns DataFrame with selected columns
__getattr__(name)[source]

Enable df.column_name syntax for column access (PySpark compatibility).

Parameters:

name (str)

Return type:

Column

property rdd: MockRDD

Get RDD representation.

registerTempTable(name)[source]

Register as temporary table.

Parameters:

name (str)

Return type:

None

createTempView(name)[source]

Create temporary view.

Parameters:

name (str)

Return type:

None

createOrReplaceTempView(name)[source]

Create or replace a temporary view of this DataFrame.

Parameters:

name (str)

Return type:

None

createGlobalTempView(name)[source]

Create a global temporary view (session-independent).

Parameters:

name (str)

Return type:

None

createOrReplaceGlobalTempView(name)[source]

Create or replace a global temporary view (all PySpark versions).

Unlike createGlobalTempView, this method does not raise an error if the view already exists.

Parameters:

name (str) – Name of the global temp view

Return type:

None

Example

>>> df.createOrReplaceGlobalTempView("my_global_view")
>>> spark.sql("SELECT * FROM global_temp.my_global_view")
checkpoint(eager=False)[source]

Checkpoint the DataFrame (no-op in mock; returns self).

Parameters:

eager (bool)

Return type:

SupportsDataFrameOps

observe(name, *exprs)[source]

Define observation metrics (PySpark 3.3+).

Parameters:
  • name (str) – Name of the observation

  • *exprs (Column) – Column expressions to observe

Return type:

SupportsDataFrameOps

Returns:

Same DataFrame with observation registered

Example

>>> df.observe("metrics", F.count(F.lit(1)).alias("count"))
property write: DataFrameWriter

Get DataFrame writer (PySpark-compatible property).

GroupedData

Base grouped data implementation for Sparkless.

This module provides the core GroupedData class for DataFrame aggregation operations, maintaining compatibility with PySpark’s GroupedData interface.

class sparkless.dataframe.grouped.base.GroupedData(df, group_columns, group_output_names=None)[source]

Bases: object

Mock grouped data for aggregation operations.

Provides grouped data functionality for DataFrame aggregation operations, maintaining compatibility with PySpark’s GroupedData interface.

Parameters:

Initialize GroupedData.

Parameters:
  • df (SupportsDataFrameOps) – The DataFrame being grouped.

  • group_columns (List[str]) – List of column names to group by (for reading from rows).

  • group_output_names (Optional[List[str]]) – Optional output names (e.g. from alias). When provided, used for result row keys instead of group_columns (Issue #397).

__init__(df, group_columns, group_output_names=None)[source]

Initialize GroupedData.

Parameters:
  • df (SupportsDataFrameOps) – The DataFrame being grouped.

  • group_columns (List[str]) – List of column names to group by (for reading from rows).

  • group_output_names (Optional[List[str]]) – Optional output names (e.g. from alias). When provided, used for result row keys instead of group_columns (Issue #397).

agg(*exprs)[source]

Aggregate grouped data.

Parameters:

*exprs (Union[str, Column, ColumnOperation, AggregateFunction, Dict[str, str]]) – Aggregation expressions or dictionary mapping column names to aggregation functions.

Return type:

DataFrame

Returns:

New DataFrame with aggregated results.

sum(*columns)[source]

Sum grouped data.

Parameters:

*columns (Union[str, Column]) – Columns to sum.

Return type:

DataFrame

Returns:

DataFrame with sum aggregations.

avg(*columns)[source]

Average grouped data.

Parameters:

*columns (Union[str, Column]) – Columns to average.

Return type:

DataFrame

Returns:

DataFrame with average aggregations.

mean(*columns)[source]

Mean grouped data (alias for avg).

Parameters:

*columns (Union[str, Column]) – Columns to get mean of.

Return type:

DataFrame

Returns:

DataFrame with mean aggregations.

Example

>>> df.groupBy("Name").mean("Value")
count(*columns)[source]

Count grouped data.

Parameters:

*columns (Union[str, Column]) – Columns to count.

Return type:

DataFrame

Returns:

DataFrame with count aggregations.

max(*columns)[source]

Max grouped data.

Parameters:

*columns (Union[str, Column]) – Columns to get max of.

Return type:

DataFrame

Returns:

DataFrame with max aggregations.

min(*columns)[source]

Min grouped data.

Parameters:

*columns (Union[str, Column]) – Columns to get min of.

Return type:

DataFrame

Returns:

DataFrame with min aggregations.

count_distinct(*columns)[source]

Count distinct values in columns.

Parameters:

*columns (Union[str, Column]) – Columns to count distinct values for.

Return type:

DataFrame

Returns:

DataFrame with count distinct results.

collect_set(*columns)[source]

Collect unique values into a set.

Parameters:

*columns (Union[str, Column]) – Columns to collect unique values for.

Return type:

DataFrame

Returns:

DataFrame with collect_set results.

first(*columns)[source]

Get first value in each group.

Parameters:

*columns (Union[str, Column]) – Columns to get first values for.

Return type:

DataFrame

Returns:

DataFrame with first values.

last(*columns)[source]

Get last value in each group.

Parameters:

*columns (Union[str, Column]) – Columns to get last values for.

Return type:

DataFrame

Returns:

DataFrame with last values.

stddev(*columns)[source]

Calculate standard deviation.

Parameters:

*columns (Union[str, Column]) – Columns to calculate standard deviation for.

Return type:

DataFrame

Returns:

DataFrame with standard deviation results.

variance(*columns)[source]

Calculate variance.

Parameters:

*columns (Union[str, Column]) – Columns to calculate variance for.

Return type:

DataFrame

Returns:

DataFrame with variance results.

rollup(*columns)[source]

Create rollup grouped data for hierarchical grouping.

Parameters:

*columns (Union[str, Column]) – Columns to rollup.

Return type:

RollupGroupedData

Returns:

RollupGroupedData for hierarchical grouping.

cube(*columns)[source]

Create cube grouped data for multi-dimensional grouping.

Parameters:

*columns (Union[str, Column]) – Columns to cube.

Return type:

CubeGroupedData

Returns:

CubeGroupedData for multi-dimensional grouping.

pivot(pivot_col, values=None)[source]

Create pivot grouped data.

Parameters:
  • pivot_col (str) – Column to pivot on.

  • values (Optional[List[Any]]) – Optional list of pivot values. If None, uses all unique values.

Return type:

PivotGroupedData

Returns:

PivotGroupedData for pivot operations.

applyInPandas(func, schema)[source]

Apply a Python native function to each group using pandas DataFrames.

The function should take a pandas DataFrame and return a pandas DataFrame. For each group, the group data is passed as a pandas DataFrame to the function and the returned pandas DataFrame is used to construct the output rows.

Parameters:
  • func (Any) – A function that takes a pandas DataFrame and returns a pandas DataFrame.

  • schema (Any) – The schema of the output DataFrame (StructType or DDL string).

Returns:

Result of applying the function to each group.

Return type:

DataFrame

Example

>>> def normalize(pdf):
...     pdf['normalized'] = (pdf['value'] - pdf['value'].mean()) / pdf['value'].std()
...     return pdf
>>> df.groupBy("category").applyInPandas(normalize, schema="category string, value double, normalized double")
transform(func)[source]

Apply a function to each group and return a DataFrame with the same schema.

This is similar to applyInPandas but preserves the original schema. The function should take a pandas DataFrame and return a pandas DataFrame with the same columns (though it may add computed columns).

Parameters:

func (Any) – A function that takes a pandas DataFrame and returns a pandas DataFrame.

Returns:

Result of applying the function to each group.

Return type:

DataFrame

Example

>>> def add_group_stats(pdf):
...     pdf['group_mean'] = pdf['value'].mean()
...     pdf['group_std'] = pdf['value'].std()
...     return pdf
>>> df.groupBy("category").transform(add_group_stats)

DataFrameWriter

Mock DataFrameWriter implementation for DataFrame write operations.

This module provides DataFrame writing functionality, maintaining compatibility with PySpark’s DataFrameWriter interface. Supports writing to various data sinks including tables, files, and custom storage backends with multiple save modes.

Key Features:
  • Complete PySpark DataFrameWriter API compatibility

  • Support for multiple output formats (parquet, json, csv)

  • Multiple save modes (append, overwrite, error, ignore)

  • Flexible options configuration

  • Integration with storage manager

  • Table and file output support

  • Error handling for invalid configurations

Example

>>> from sparkless.sql import SparkSession
>>> spark = SparkSession("test")
>>> df = spark.createDataFrame([{"name": "Alice", "age": 25}])
>>> # Save as table
>>> df.write.mode("overwrite").saveAsTable("users")
>>> # Save to file with options
>>> df.write.format("parquet").option("compression", "snappy").save("/path")
class sparkless.dataframe.writer.DataFrameWriter(df, storage)[source]

Bases: object

Mock DataFrame writer for saveAsTable operations.

Provides a PySpark-compatible interface for writing DataFrames to storage formats. Supports various formats and save modes for testing and development.

df

The DataFrame to be written.

storage

Storage manager for persisting data.

format_name

Output format (e.g., ‘parquet’, ‘json’).

save_mode

Save mode (‘append’, ‘overwrite’, ‘error’, ‘ignore’).

options[source]

Additional options for the writer.

Example

>>> df.write.format("parquet").mode("overwrite").saveAsTable("my_table")
Parameters:

Initialize DataFrameWriter.

Parameters:
  • df (DataFrame) – The DataFrame to be written.

  • storage (IStorageManager) – Storage manager for persisting data.

__init__(df, storage)[source]

Initialize DataFrameWriter.

Parameters:
  • df (DataFrame) – The DataFrame to be written.

  • storage (IStorageManager) – Storage manager for persisting data.

format(source)[source]

Set the output format for the DataFrame writer.

Parameters:

source (str) – The output format (e.g., ‘parquet’, ‘json’, ‘csv’).

Return type:

DataFrameWriter

Returns:

Self for method chaining.

Example

>>> df.write.format("parquet")
mode(mode)[source]

Set the save mode for the DataFrame writer.

Parameters:

mode (str) – Save mode (‘append’, ‘overwrite’, ‘error’, ‘ignore’).

Return type:

DataFrameWriter

Returns:

Self for method chaining.

Raises:

IllegalArgumentException – If mode is not valid.

Example

>>> df.write.mode("overwrite")
property saveMode: str

Get the current save mode (PySpark compatibility).

Returns:

Current save mode string.

option(key, value)[source]

Set an option for the DataFrame writer.

Parameters:
  • key (str) – Option key.

  • value (Any) – Option value.

Return type:

DataFrameWriter

Returns:

Self for method chaining.

Example

>>> df.write.option("compression", "snappy")
options(**kwargs)[source]

Set multiple options for the DataFrame writer.

Parameters:

**kwargs (Any) – Option key-value pairs.

Return type:

DataFrameWriter

Returns:

Self for method chaining.

Example

>>> df.write.options(compression="snappy", format="parquet")
partitionBy(*cols)[source]

Partition output by given columns.

Parameters:

*cols (str) – Column names to partition by.

Return type:

DataFrameWriter

Returns:

Self for method chaining.

Example

>>> df.write.partitionBy("year", "month")
saveAsTable(table_name)[source]

Save DataFrame as a table in storage.

Parameters:

table_name (str) – Name of the table (can include schema, e.g., ‘schema.table’).

Raises:
  • AnalysisException – If table operations fail.

  • IllegalArgumentException – If table name is invalid.

Return type:

None

Example

>>> df.write.saveAsTable("my_table")
>>> df.write.saveAsTable("schema.my_table")
save(path=None)[source]

Save DataFrame to a file path.

Parameters:

path (Optional[str]) – Optional file path to save to. If None, uses a default path.

Raises:

IllegalArgumentException – If path is invalid.

Return type:

None

Example

>>> df.write.format("parquet").mode("overwrite").save("/path/to/file")
parquet(path, **options)[source]

Save DataFrame in Parquet format.

Parameters:
  • path (str) – Path to save the Parquet file.

  • **options (Any) – Additional options for Parquet format.

Return type:

None

Example

>>> df.write.parquet("/path/to/file.parquet")
json(path, **options)[source]

Save DataFrame in JSON format.

Parameters:
  • path (str) – Path to save the JSON file.

  • **options (Any) – Additional options for JSON format.

Return type:

None

Example

>>> df.write.json("/path/to/file.json")
delta(path, **options)[source]

Save DataFrame in Delta Lake format.

This is a convenience method equivalent to: df.write.format(“delta”).save(path)

Parameters:
  • path (str) – Path to save the Delta Lake table.

  • **options (Any) – Additional options for Delta Lake format.

Return type:

None

Example

>>> df.write.delta("/path/to/delta_table")
>>> df.write.delta("/path/to/delta_table", mergeSchema=True)
csv(path, **options)[source]

Save DataFrame in CSV format.

Parameters:
  • path (str) – Path to save the CSV file.

  • **options (Any) – Additional options for CSV format.

Return type:

None

Example

>>> df.write.csv("/path/to/file.csv")
orc(path, **options)[source]

Save DataFrame in ORC format.

Parameters:
  • path (str) – Path to save the ORC file.

  • **options (Any) – Additional options for ORC format.

Return type:

None

Example

>>> df.write.orc("/path/to/file.orc")
text(path, **options)[source]

Save DataFrame in text format.

Parameters:
  • path (str) – Path to save the text file.

  • **options (Any) – Additional options for text format.

Return type:

None

Example

>>> df.write.text("/path/to/file.txt")

DataFrameReader

Mock DataFrameReader implementation for DataFrame read operations.

This module provides DataFrame reading functionality, maintaining compatibility with PySpark’s DataFrameReader interface. Supports reading from various data sources including tables, files, and custom storage backends.

Key Features:
  • Complete PySpark DataFrameReader API compatibility

  • Support for multiple data formats (parquet, json, csv, table)

  • Flexible options configuration

  • Integration with storage manager

  • Schema inference and validation

  • Error handling for missing data sources

Example

>>> from sparkless.sql import SparkSession
>>> spark = SparkSession("test")
>>> # Read from table
>>> df = spark.read.table("my_table")
>>> # Read with format and options
>>> df = spark.read.format("parquet").option("header", "true").load("/path")
class sparkless.dataframe.reader.DataFrameReader(session)[source]

Bases: object

Mock DataFrameReader for reading data from various sources.

Provides a PySpark-compatible interface for reading DataFrames from storage formats and tables. Supports various formats and options for testing and development.

session

Sparkless session instance.

_format

Input format (e.g., ‘parquet’, ‘json’).

_options

Additional options for the reader.

Example

>>> spark.read.format("parquet").load("/path/to/file")
>>> spark.read.table("my_table")
Parameters:

session (ISession)

Initialize DataFrameReader.

Parameters:

session (ISession) – Sparkless session instance.

__init__(session)[source]

Initialize DataFrameReader.

Parameters:

session (ISession) – Sparkless session instance.

format(source)[source]

Set input format.

Parameters:

source (str) – Data source format.

Return type:

DataFrameReader

Returns:

Self for method chaining.

Example

>>> spark.read.format("parquet")
option(key, value)[source]

Set option.

Parameters:
  • key (str) – Option key.

  • value (Any) – Option value.

Return type:

DataFrameReader

Returns:

Self for method chaining.

Example

>>> spark.read.option("header", "true")
options(**options)[source]

Set multiple options.

Parameters:

**options (Any) – Option key-value pairs.

Return type:

DataFrameReader

Returns:

Self for method chaining.

Example

>>> spark.read.options(header="true", inferSchema="true")
schema(schema)[source]

Set schema.

Parameters:

schema (Union[StructType, str]) – Schema definition.

Return type:

DataFrameReader

Returns:

Self for method chaining.

Example

>>> spark.read.schema("name STRING, age INT")
load(path=None, format=None, **options)[source]

Load data.

Parameters:
Return type:

IDataFrame

Returns:

DataFrame with loaded data.

Example

>>> spark.read.load("/path/to/file")
>>> spark.read.format("parquet").load("/path/to/file")
table(table_name)[source]

Load table.

Parameters:

table_name (str) – Table name.

Return type:

IDataFrame

Returns:

DataFrame with table data.

Example

>>> spark.read.table("my_table")
>>> spark.read.format("delta").option("versionAsOf", 0).table("my_table")
json(path, **options)[source]

Load JSON data from disk.

Parameters:
Return type:

IDataFrame

csv(path, **options)[source]

Load CSV data from disk.

Parameters:
Return type:

IDataFrame

parquet(path, **options)[source]

Load Parquet data from disk.

Parameters:
Return type:

IDataFrame

orc(path, **options)[source]

Load ORC data.

Parameters:
  • path (str) – Path to ORC file.

  • **options (Any) – Additional options.

Return type:

IDataFrame

Returns:

DataFrame with ORC data.

Example

>>> spark.read.orc("/path/to/file.orc")
text(path, **options)[source]

Load text data.

Parameters:
  • path (str) – Path to text file.

  • **options (Any) – Additional options.

Return type:

IDataFrame

Returns:

DataFrame with text data.

Example

>>> spark.read.text("/path/to/file.txt")
jdbc(url, table, **options)[source]

Load data from JDBC source.

Parameters:
  • url (str) – JDBC URL.

  • table (str) – Table name.

  • **options (Any) – Additional options.

Return type:

IDataFrame

Returns:

DataFrame with JDBC data.

Example

>>> spark.read.jdbc("jdbc:postgresql://localhost:5432/db", "table")