Backend Architecture

Sparkless uses a modular backend architecture that allows different execution engines.

Backend Protocols

Protocol definitions for backend interfaces.

This module defines the protocols (interfaces) that backend implementations must satisfy. Using protocols enables dependency injection and makes modules testable independently.

class sparkless.backend.protocols.QueryExecutor(*args, **kwargs)[source]

Bases: Protocol

Protocol for executing queries on data.

This protocol defines the interface for query execution backends. Implementations can use different engines.

execute_query(query)[source]

Execute a SQL query and return results.

Parameters:

query (str) – SQL query string

Return type:

List[Dict[str, Any]]

Returns:

List of result rows as dictionaries

create_table(name, schema, data)[source]

Create a table with the given schema and data.

Parameters:
Return type:

None

close()[source]

Close the query executor and clean up resources.

Return type:

None

__init__(*args, **kwargs)
class sparkless.backend.protocols.DataMaterializer(*args, **kwargs)[source]

Bases: Protocol

Protocol for materializing lazy DataFrame operations.

This protocol defines the interface for materializing queued operations on DataFrames. Implementations can use different execution engines.

Optional: backends may implement materialize_from_plan(self, data, schema, logical_plan) to execute from a serialized logical plan. When spark.sparkless.useLogicalPlan is true or backend is robin, the engine will call it when present instead of materialize().

materialize(data, schema, operations)[source]

Materialize lazy operations into actual data.

Parameters:
Return type:

List[Row]

Returns:

List of result rows

can_handle_operation(op_name, op_payload)[source]

Check if this materializer can handle a specific operation.

Parameters:
  • op_name (str) – Name of the operation (e.g., “to_timestamp”, “filter”)

  • op_payload (Any) – Operation payload (operation-specific)

Return type:

bool

Returns:

True if the materializer can handle this operation, False otherwise

can_handle_operations(operations)[source]

Check if this materializer can handle a list of operations.

Parameters:

operations (List[Tuple[str, Any]]) – List of (operation_name, payload) tuples

Return type:

Tuple[bool, List[str]]

Returns:

Tuple of (can_handle_all, unsupported_operations) - can_handle_all: True if all operations are supported - unsupported_operations: List of operation names that are unsupported

close()[source]

Close the materializer and clean up resources.

Return type:

None

__init__(*args, **kwargs)
class sparkless.backend.protocols.ExportBackend(*args, **kwargs)[source]

Bases: Protocol

Protocol for DataFrame export operations.

This protocol defines the interface for exporting DataFrames to different formats and systems. Backend implementations should provide methods for exporting to their specific target systems.

__init__(*args, **kwargs)

Polars Backend

The default backend uses Polars for high-performance DataFrame operations.

class sparkless.backend.polars.operation_executor.PolarsOperationExecutor(expression_translator)[source]

Bases: object

Executes DataFrame operations using Polars DataFrame API.

Parameters:

expression_translator (PolarsExpressionTranslator)

Initialize operation executor.

Parameters:

expression_translator (PolarsExpressionTranslator) – Polars expression translator instance

__init__(expression_translator)[source]

Initialize operation executor.

Parameters:

expression_translator (PolarsExpressionTranslator) – Polars expression translator instance

apply_filter(df, condition)[source]

Apply a filter operation.

Parameters:
  • df (DataFrame) – Source Polars DataFrame

  • condition (Any) – Filter condition (ColumnOperation or expression)

Return type:

DataFrame

Returns:

Filtered Polars DataFrame

apply_select(df, columns)[source]

Apply a select operation.

Parameters:
  • df (DataFrame) – Source Polars DataFrame

  • columns (Tuple[Any, ...]) – Columns to select

Return type:

DataFrame

Returns:

Selected Polars DataFrame

apply_with_column(df, column_name, expression, expected_field=None)[source]

Apply a withColumn operation.

Parameters:
  • df (DataFrame) – Source Polars DataFrame

  • column_name (str) – Name of new/updated column

  • expression (Any) – Expression for the column

  • expected_field (Any)

Return type:

DataFrame

Returns:

DataFrame with new column

apply_join(df1, df2, on=None, how='inner', right_alias=None)[source]

Apply a join operation.

Parameters:
  • df1 (DataFrame) – Left DataFrame

  • df2 (DataFrame) – Right DataFrame

  • on (Union[str, List[str], ColumnOperation, None]) – Join key(s) - column name(s), list of column names, or ColumnOperation with ==

  • how (str) – Join type (“inner”, “left”, “right”, “outer”, “cross”, “semi”, “anti”)

  • right_alias (Optional[str])

Return type:

DataFrame

Returns:

Joined DataFrame

apply_union(df1, df2)[source]

Apply a union operation.

Parameters:
  • df1 (DataFrame) – First DataFrame

  • df2 (DataFrame) – Second DataFrame

Return type:

DataFrame

Returns:

Unioned DataFrame

apply_order_by(df, columns, ascending=True)[source]

Apply an orderBy operation.

Parameters:
  • df (DataFrame) – Source Polars DataFrame

  • columns (List[Any]) – Columns to sort by

  • ascending (bool) – Sort direction

Return type:

DataFrame

Returns:

Sorted DataFrame

apply_limit(df, n)[source]

Apply a limit operation.

Parameters:
  • df (DataFrame) – Source Polars DataFrame

  • n (int) – Number of rows to return

Return type:

DataFrame

Returns:

Limited DataFrame

apply_offset(df, n)[source]

Apply an offset operation (skip first n rows).

Parameters:
  • df (DataFrame) – Source Polars DataFrame

  • n (int) – Number of rows to skip

Return type:

DataFrame

Returns:

DataFrame with first n rows skipped

apply_group_by_agg(df, group_by, aggs)[source]

Apply a groupBy().agg() operation.

Parameters:
  • df (DataFrame) – Source Polars DataFrame

  • group_by (List[Any]) – Columns to group by

  • aggs (List[Any]) – Aggregation expressions

Return type:

DataFrame

Returns:

Aggregated DataFrame

apply_distinct(df)[source]

Apply a distinct operation.

Parameters:

df (DataFrame) – Source Polars DataFrame

Return type:

DataFrame

Returns:

DataFrame with distinct rows

apply_drop(df, columns)[source]

Apply a drop operation.

Parameters:
  • df (DataFrame) – Source Polars DataFrame

  • columns (List[str]) – Columns to drop

Return type:

DataFrame

Returns:

DataFrame with columns dropped

apply_with_column_renamed(df, old_name, new_name)[source]

Apply a withColumnRenamed operation.

Parameters:
  • df (DataFrame) – Source Polars DataFrame

  • old_name (str) – Old column name

  • new_name (str) – New column name

Return type:

DataFrame

Returns:

DataFrame with renamed column

Polars materializer for lazy DataFrame operations.

This module provides materialization of lazy DataFrame operations using Polars, replacing SQL-based materialization with Polars DataFrame operations.

class sparkless.backend.polars.materializer.PolarsMaterializer[source]

Bases: object

Materializes lazy operations using Polars.

Initialize Polars materializer.

SUPPORTED_OPERATIONS = {'distinct', 'drop', 'filter', 'groupBy', 'join', 'limit', 'offset', 'orderBy', 'select', 'union', 'withColumn', 'withColumnRenamed'}
UNSUPPORTED_OPERATIONS = {'e', 'months_between', 'pi'}
OPERATION_METADATA: Dict[str, Dict[str, Any]] = {}
__init__()[source]

Initialize Polars materializer.

materialize(data, schema, operations)[source]

Materialize lazy operations into actual data.

Parameters:
Return type:

List[Row]

Returns:

List of result rows

can_handle_operation(op_name, op_payload)[source]

Check if this materializer can handle a specific operation.

Parameters:
  • op_name (str) – Name of the operation (e.g., “to_timestamp”, “filter”)

  • op_payload (Any) – Operation payload (operation-specific)

Return type:

bool

Returns:

True if the materializer can handle this operation, False otherwise

can_handle_operations(operations)[source]

Check if this materializer can handle a list of operations.

Parameters:

operations (List[Tuple[str, Any]]) – List of (operation_name, payload) tuples

Return type:

Tuple[bool, List[str]]

Returns:

Tuple of (can_handle_all, unsupported_operations) - can_handle_all: True if all operations are supported - unsupported_operations: List of operation names that are unsupported

materialize_from_plan(data, schema, logical_plan)[source]

Materialize from a serialized logical plan using the Polars plan interpreter.

Parameters:
Return type:

List[Row]

close()[source]

Close the materializer and clean up resources.

Return type:

None

Expression translator for converting Column expressions to Polars expressions.

This module translates Sparkless column expressions (Column, ColumnOperation) to Polars expressions (pl.Expr) for DataFrame operations.

class sparkless.backend.polars.expression_translator.PolarsExpressionTranslator[source]

Bases: object

Translates Column expressions to Polars expressions.

__init__()[source]
translate(expr, input_col_dtype=None, available_columns=None, case_sensitive=None, column_dtypes=None)[source]

Translate Column expression to Polars expression.

Parameters:
  • expr (Any) – Column, ColumnOperation, or other expression

  • input_col_dtype (Any) – Optional Polars dtype of input column (for to_timestamp optimization)

  • available_columns (Optional[List[str]]) – Optional list of available column names for case-insensitive matching

  • case_sensitive (Optional[bool]) – Optional case sensitivity flag. If None, gets from session.

  • column_dtypes (Optional[Dict[str, Any]])

Return type:

Expr

Returns:

Polars expression (pl.Expr)

clear_cache()[source]

Clear the expression translation cache.

This should be called when columns are dropped to invalidate cached expressions that reference those columns.

Return type:

None

Polars storage backend with Parquet file persistence.

This module provides a Polars-based storage implementation using Parquet files for persistence and in-memory DataFrames for active operations.

class sparkless.backend.polars.storage.PolarsTable(name, schema, schema_name='default', db_path=None)[source]

Bases: ITable

Table implementation using Polars DataFrame.

Parameters:

Initialize Polars table.

Parameters:
  • name (str) – Table name

  • schema (StructType) – StructType schema

  • schema_name (str) – Schema name (default: “default”)

  • db_path (Optional[str]) – Optional database path for persistence

__init__(name, schema, schema_name='default', db_path=None)[source]

Initialize Polars table.

Parameters:
  • name (str) – Table name

  • schema (StructType) – StructType schema

  • schema_name (str) – Schema name (default: “default”)

  • db_path (Optional[str]) – Optional database path for persistence

property name: str

Get table name.

property schema: StructType

Get table schema.

property metadata: Dict[str, Any]

Get table metadata.

insert(data)[source]

Insert data into table.

Parameters:

data (List[Dict[str, Any]]) – List of dictionaries representing rows

Return type:

None

query(**filters)[source]

Query data from table.

Parameters:

**filters (Any) – Column filters (e.g., col_name=value)

Return type:

List[Dict[str, Any]]

Returns:

List of dictionaries representing rows

count()[source]

Count rows in table.

Return type:

int

Returns:

Number of rows

truncate()[source]

Truncate table (remove all data).

Return type:

None

drop()[source]

Drop table (remove data and metadata).

Return type:

None

class sparkless.backend.polars.storage.PolarsSchema(name, db_path=None)[source]

Bases: object

Schema implementation for Polars storage.

Parameters:

Initialize Polars schema.

Parameters:
  • name (str) – Schema name

  • db_path (Optional[str]) – Optional database path

__init__(name, db_path=None)[source]

Initialize Polars schema.

Parameters:
  • name (str) – Schema name

  • db_path (Optional[str]) – Optional database path

create_table(table, columns)[source]

Create a new table in this schema.

Parameters:
Return type:

Optional[PolarsTable]

Returns:

PolarsTable instance

class sparkless.backend.polars.storage.PolarsStorageManager(db_path=None)[source]

Bases: IStorageManager

Polars-based storage manager implementing IStorageManager protocol.

Parameters:

db_path (Optional[str])

Initialize Polars storage manager.

Parameters:

db_path (Optional[str]) – Optional database path for persistent storage. If None, uses in-memory storage only.

__init__(db_path=None)[source]

Initialize Polars storage manager.

Parameters:

db_path (Optional[str]) – Optional database path for persistent storage. If None, uses in-memory storage only.

create_schema(schema_name)[source]

Create a new schema.

Parameters:

schema_name (str) – Schema name

Return type:

None

drop_schema(schema_name, cascade=False)[source]

Drop a schema.

Parameters:
  • schema_name (str) – Schema name

  • cascade (bool) – If True, drop all tables in schema

Return type:

None

schema_exists(schema_name)[source]

Check if schema exists.

Parameters:

schema_name (str) – Schema name

Return type:

bool

Returns:

True if schema exists, False otherwise

get_current_schema()[source]

Return the schema used for unqualified table references.

Return type:

str

set_current_schema(schema_name)[source]

Set the schema used for unqualified table references.

Parameters:

schema_name (str)

Return type:

None

list_schemas()[source]

List all schemas.

Return type:

List[str]

Returns:

List of schema names

create_table(schema_name, table_name, fields)[source]

Create a new table.

Parameters:
Return type:

Optional[PolarsTable]

Returns:

PolarsTable instance

drop_table(schema_name, table_name)[source]

Drop a table.

Parameters:
  • schema_name (str) – Schema name

  • table_name (str) – Table name

Return type:

None

table_exists(schema_name, table_name)[source]

Check if table exists.

Parameters:
  • schema_name (str) – Schema name

  • table_name (str) – Table name

Return type:

bool

Returns:

True if table exists, False otherwise

list_tables(schema_name=None)[source]

List tables in schema.

Parameters:

schema_name (Optional[str]) – Schema name (None for all schemas)

Return type:

List[str]

Returns:

List of table names

get_table_schema(schema_name, table_name)[source]

Get table schema.

Parameters:
  • schema_name (str) – Schema name

  • table_name (str) – Table name

Return type:

StructType

Returns:

StructType schema

insert_data(schema_name, table_name, data)[source]

Insert data into table.

Parameters:
  • schema_name (str) – Schema name

  • table_name (str) – Table name

  • data (List[Dict[str, Any]]) – List of dictionaries representing rows

Return type:

None

query_data(schema_name, table_name, **filters)[source]

Query data from table.

Parameters:
  • schema_name (str) – Schema name

  • table_name (str) – Table name

  • **filters (Any) – Column filters

Return type:

List[Dict[str, Any]]

Returns:

List of dictionaries representing rows

get_table_metadata(schema_name, table_name)[source]

Get table metadata.

Parameters:
  • schema_name (str) – Schema name

  • table_name (str) – Table name

Return type:

Dict[str, Any]

Returns:

Dictionary with table metadata

get_data(schema_name, table_name)[source]

Get all data from table (optional method).

Parameters:
  • schema_name (str) – Schema name

  • table_name (str) – Table name

Return type:

List[Dict[str, Any]]

Returns:

List of dictionaries representing all rows

query_table(schema_name, table_name, filter_expr=None)[source]

Query table with filter expression (optional method).

Parameters:
  • schema_name (str) – Schema name

  • table_name (str) – Table name

  • filter_expr (Optional[str]) – Optional filter expression (ignored for Polars backend)

Return type:

List[Dict[str, Any]]

Returns:

List of dictionaries representing rows

Note

Filter expressions are not supported at the storage level for Polars backend. Filtering should be done at the DataFrame level using DataFrame.filter() or by loading data into a DataFrame and applying filters there. This method returns all data from the table regardless of filter_expr parameter.

update_table_metadata(schema_name, table_name, metadata_updates)[source]

Update table metadata (optional method).

Parameters:
  • schema_name (str) – Schema name

  • table_name (str) – Table name

  • metadata_updates (Dict[str, Any]) – Dictionary of metadata updates

Return type:

None

create_temp_view(name, dataframe)[source]

Create a temporary view from a DataFrame.

Parameters:
  • name (str) – Name of the temporary view.

  • dataframe (Any) – DataFrame to create view from.

Return type:

None

Storage Backends

Storage manager module.

This module provides a unified storage manager that can use different backends.

class sparkless.storage.manager.TableMetadata(name, schema, created_at, table_schema, properties)[source]

Bases: object

Metadata for a table in the catalog.

Parameters:

Initialize table metadata.

Parameters:
  • name (str) – Table name.

  • schema (str) – Schema/database name.

  • created_at (datetime) – Creation timestamp.

  • table_schema (StructType) – Table schema structure.

  • properties (Dict[str, Any]) – Table properties.

__init__(name, schema, created_at, table_schema, properties)[source]

Initialize table metadata.

Parameters:
  • name (str) – Table name.

  • schema (str) – Schema/database name.

  • created_at (datetime) – Creation timestamp.

  • table_schema (StructType) – Table schema structure.

  • properties (Dict[str, Any]) – Table properties.

class sparkless.storage.manager.StorageManagerFactory[source]

Bases: object

Factory for creating storage managers.

static create_memory_manager()[source]

Create a memory storage manager.

Return type:

IStorageManager

Returns:

Memory storage manager instance.

static create_file_manager(base_path='sparkless_storage')[source]

Create a file storage manager.

Parameters:

base_path (str) – Base path for storage files.

Return type:

IStorageManager

Returns:

File storage manager instance.

static create_polars_manager(db_path=None)[source]

Create a Polars storage manager (default in v3.0.0+).

Parameters:

db_path (Optional[str]) – Optional path for persistent storage. If None, uses in-memory storage.

Return type:

IStorageManager

Returns:

Polars storage manager instance.

class sparkless.storage.manager.UnifiedStorageManager(backend)[source]

Bases: IStorageManager

Unified storage manager that can switch between backends.

Parameters:

backend (IStorageManager)

Initialize unified storage manager.

Parameters:

backend (IStorageManager) – Storage backend to use.

__init__(backend)[source]

Initialize unified storage manager.

Parameters:

backend (IStorageManager) – Storage backend to use.

create_schema(schema)[source]

Create a new schema.

Parameters:

schema (str) – Name of the schema to create.

Return type:

None

schema_exists(schema)[source]

Check if schema exists.

Parameters:

schema (str) – Name of the schema to check.

Return type:

bool

Returns:

True if schema exists, False otherwise.

drop_schema(schema_name, cascade=False)[source]

Drop a schema.

Parameters:
  • schema_name (str) – Name of the schema to drop.

  • cascade (bool) – Whether to cascade the drop operation.

Return type:

None

list_schemas()[source]

List all schemas.

Return type:

List[str]

Returns:

List of schema names.

get_current_schema()[source]

Return the current schema used for unqualified table references.

Return type:

str

set_current_schema(schema_name)[source]

Set the current schema used for unqualified table references.

Parameters:

schema_name (str)

Return type:

None

table_exists(schema, table)[source]

Check if table exists.

Parameters:
  • schema (str) – Name of the schema.

  • table (str) – Name of the table.

Return type:

bool

Returns:

True if table exists, False otherwise.

create_table(schema, table, columns)[source]

Create a new table.

Parameters:
Return type:

None

drop_table(schema, table)[source]

Drop a table.

Parameters:
  • schema (str) – Name of the schema.

  • table (str) – Name of the table.

Return type:

None

insert_data(schema, table, data, mode='append')[source]

Insert data into table.

Parameters:
  • schema (str) – Name of the schema.

  • table (str) – Name of the table.

  • data (List[Dict[str, Any]]) – Data to insert.

  • mode (str) – Insert mode (“append”, “overwrite”, “ignore”).

Return type:

None

query_table(schema, table, filter_expr=None)[source]

Query data from table.

Parameters:
  • schema (str) – Name of the schema.

  • table (str) – Name of the table.

  • filter_expr (Optional[str]) – Optional filter expression.

Return type:

List[Dict[str, Any]]

Returns:

List of data rows.

get_table_schema(schema_name, table_name)[source]

Get table schema.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

Return type:

Union[Any, StructType]

Returns:

Table schema.

get_data(schema, table)[source]

Get all data from table.

Parameters:
  • schema (str) – Name of the schema.

  • table (str) – Name of the table.

Return type:

List[Dict[str, Any]]

Returns:

List of data rows.

create_temp_view(name, dataframe)[source]

Create a temporary view from a DataFrame.

Parameters:
  • name (str) – Name of the temporary view.

  • dataframe (Any) – DataFrame to create view from.

Return type:

None

list_tables(schema_name=None)[source]

List tables in schema.

Parameters:

schema_name (Optional[str]) – Name of the schema. If None, list tables in all schemas.

Return type:

List[str]

Returns:

List of table names.

get_table_metadata(schema_name, table_name)[source]

Get table metadata including Delta-specific fields.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

Return type:

Union[Any, Dict[str, Any]]

Returns:

Table metadata.

update_table_metadata(schema, table, metadata_updates)[source]

Update table metadata fields.

Parameters:
  • schema (str) – Name of the schema.

  • table (str) – Name of the table.

  • metadata_updates (Dict[str, Any]) – Dictionary of metadata fields to update.

Return type:

None

switch_backend(backend)[source]

Switch to a different storage backend.

Parameters:

backend (IStorageManager) – New storage backend to use.

Return type:

None

save_table_metadata(qualified_name, metadata)[source]

Store table metadata.

Parameters:
  • qualified_name (str) – Qualified table name (schema.table).

  • metadata (TableMetadata) – Table metadata to store.

Return type:

None

get_table_metadata_by_name(qualified_name)[source]

Retrieve table metadata by qualified name.

Parameters:

qualified_name (str) – Qualified table name (schema.table).

Return type:

Optional[TableMetadata]

Returns:

Table metadata or None if not found.

list_table_metadata(schema)[source]

List all table metadata for a schema.

Parameters:

schema (str) – Schema name.

Return type:

List[TableMetadata]

Returns:

List of table metadata objects.

cleanup_temp_tables()[source]

Clean up temporary tables to free memory.

Return type:

None

optimize_storage()[source]

Optimize storage by cleaning up and compacting data.

Return type:

None

get_memory_usage()[source]

Get current memory usage statistics.

Return type:

Dict[str, Any]

Returns:

Dictionary with memory usage information.

force_garbage_collection()[source]

Force garbage collection to free memory.

Return type:

None

get_table_sizes()[source]

Get estimated sizes of all tables.

Return type:

Dict[str, int]

Returns:

Dictionary mapping table names to estimated sizes.

cleanup_old_tables(max_age_hours=24)[source]

Clean up tables older than specified age.

Parameters:

max_age_hours (int) – Maximum age in hours before cleanup.

Return type:

int

Returns:

Number of tables cleaned up.

Memory storage backend.

This module provides an in-memory storage implementation.

class sparkless.storage.backends.memory.MemoryTable(name, schema)[source]

Bases: ITable

In-memory table implementation.

Parameters:

Initialize memory table.

Parameters:
  • name (str) – Table name.

  • schema (StructType) – Table schema.

__init__(name, schema)[source]

Initialize memory table.

Parameters:
  • name (str) – Table name.

  • schema (StructType) – Table schema.

property name: str

Get table name.

property schema: StructType

Get table schema.

property metadata: Dict[str, Any]

Get table metadata.

insert_data(data, mode='append')[source]

Insert data into table.

Parameters:
  • data (List[Dict[str, Any]]) – Data to insert.

  • mode (str) – Insert mode (“append”, “overwrite”, “ignore”).

Return type:

None

query_data(filter_expr=None)[source]

Query data from table.

Parameters:

filter_expr (Optional[str]) – Optional filter expression.

Return type:

List[Dict[str, Any]]

Returns:

List of data rows.

get_schema()[source]

Get table schema.

Return type:

StructType

Returns:

Table schema.

get_metadata()[source]

Get table metadata.

Return type:

Dict[str, Any]

Returns:

Table metadata.

insert(data)[source]

Insert data into table.

Parameters:

data (List[Dict[str, Any]])

Return type:

None

query(**filters)[source]

Query data from table.

Parameters:

filters (Any)

Return type:

List[Dict[str, Any]]

count()[source]

Count rows in table.

Return type:

int

truncate()[source]

Truncate table.

Return type:

None

drop()[source]

Drop table.

Return type:

None

class sparkless.storage.backends.memory.MemorySchema(name)[source]

Bases: object

In-memory database schema (namespace) implementation.

Parameters:

name (str)

Initialize memory schema.

Parameters:

name (str) – Schema name.

__init__(name)[source]

Initialize memory schema.

Parameters:

name (str) – Schema name.

create_table(table, columns)[source]

Create a new table in this schema.

Parameters:
Return type:

None

table_exists(table)[source]

Check if table exists in this schema.

Parameters:

table (str) – Name of the table.

Return type:

bool

Returns:

True if table exists, False otherwise.

drop_table(table)[source]

Drop a table from this schema.

Parameters:

table (str) – Name of the table.

Return type:

None

list_tables()[source]

List all tables in this schema.

Return type:

List[str]

Returns:

List of table names.

class sparkless.storage.backends.memory.MemoryStorageManager[source]

Bases: IStorageManager

In-memory storage manager implementation.

Initialize memory storage manager.

__init__()[source]

Initialize memory storage manager.

create_schema(schema)[source]

Create a new schema.

Parameters:

schema (str) – Name of the schema to create.

Return type:

None

schema_exists(schema)[source]

Check if schema exists.

Parameters:

schema (str) – Name of the schema to check.

Return type:

bool

Returns:

True if schema exists, False otherwise.

drop_schema(schema_name, cascade=False)[source]

Drop a schema.

Parameters:
  • schema_name (str) – Name of the schema to drop.

  • cascade (bool) – Whether to cascade the drop operation.

Return type:

None

list_schemas()[source]

List all schemas.

Return type:

List[str]

Returns:

List of schema names.

table_exists(schema, table)[source]

Check if table exists.

Parameters:
  • schema (str) – Name of the schema.

  • table (str) – Name of the table.

Return type:

bool

Returns:

True if table exists, False otherwise.

create_table(schema_name, table_name, fields)[source]

Create a new table.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

  • fields (Union[List[Any], StructType]) – Table fields definition.

Return type:

None

drop_table(schema_name, table_name)[source]

Drop a table.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

Return type:

None

insert_data(schema_name, table_name, data)[source]

Insert data into table.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

  • data (List[Dict[str, Any]]) – Data to insert.

Return type:

None

query_data(schema_name, table_name, **filters)[source]

Query data from table.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

  • **filters (Any) – Optional filter parameters.

Return type:

List[Dict[str, Any]]

Returns:

List of data rows.

query_table(schema, table, filter_expr=None)[source]

Query data from table.

Parameters:
  • schema (str) – Name of the schema.

  • table (str) – Name of the table.

  • filter_expr (Optional[str]) – Optional filter expression.

Return type:

List[Dict[str, Any]]

Returns:

List of data rows.

get_table_schema(schema_name, table_name)[source]

Get table schema.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

Return type:

StructType

Returns:

Table schema.

get_data(schema, table)[source]

Get all data from table.

Parameters:
  • schema (str) – Name of the schema.

  • table (str) – Name of the table.

Return type:

List[Dict[str, Any]]

Returns:

List of data rows.

create_temp_view(name, dataframe)[source]

Create a temporary view from a DataFrame.

Parameters:
  • name (str) – Name of the temporary view.

  • dataframe (Any) – DataFrame to create view from.

Return type:

None

list_tables(schema_name=None)[source]

List tables in schema.

Parameters:

schema_name (Optional[str]) – Name of the schema. If None, list tables in all schemas.

Return type:

List[str]

Returns:

List of table names.

get_table_metadata(schema_name, table_name)[source]

Get table metadata including Delta-specific fields.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

Return type:

Dict[str, Any]

Returns:

Table metadata dictionary.

update_table_metadata(schema_name, table_name, metadata_updates)[source]

Update table metadata fields.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

  • metadata_updates (Dict[str, Any]) – Dictionary of metadata fields to update.

Return type:

None

close()[source]

Close storage backend and clean up resources.

For in-memory storage, this is a no-op as there are no external resources.

Return type:

None

File-based storage backend.

This module provides a file-based storage implementation using JSON files.

class sparkless.storage.backends.file.FileTable(name, schema, file_path)[source]

Bases: ITable

File-based table implementation.

Parameters:

Initialize file table.

Parameters:
  • name (str) – Table name.

  • schema (StructType) – Table schema.

  • file_path (str) – Path to table data file.

__init__(name, schema, file_path)[source]

Initialize file table.

Parameters:
  • name (str) – Table name.

  • schema (StructType) – Table schema.

  • file_path (str) – Path to table data file.

property name: str

Get table name.

property schema: StructType

Get table schema.

property metadata: Dict[str, Any]

Get table metadata.

insert_data(data, mode='append')[source]

Insert data into table.

Parameters:
  • data (List[Dict[str, Any]]) – Data to insert.

  • mode (str) – Insert mode (“append”, “overwrite”, “ignore”).

Return type:

None

query_data(filter_expr=None)[source]

Query data from table.

Parameters:

filter_expr (Optional[str]) – Optional filter expression.

Return type:

List[Dict[str, Any]]

Returns:

List of data rows.

get_schema()[source]

Get table schema.

Return type:

StructType

Returns:

Table schema.

get_metadata()[source]

Get table metadata.

Return type:

Dict[str, Any]

Returns:

Table metadata.

insert(data)[source]

Insert data into table.

Parameters:

data (List[Dict[str, Any]])

Return type:

None

query(**filters)[source]

Query data from table.

Parameters:

filters (Any)

Return type:

List[Dict[str, Any]]

count()[source]

Count rows in table.

Return type:

int

truncate()[source]

Truncate table.

Return type:

None

drop()[source]

Drop table.

Return type:

None

class sparkless.storage.backends.file.FileSchema(name, base_path)[source]

Bases: ISchema

File-based schema implementation.

Parameters:

Initialize file schema.

Parameters:
  • name (str) – Schema name.

  • base_path (str) – Base path for schema files.

__init__(name, base_path)[source]

Initialize file schema.

Parameters:
  • name (str) – Schema name.

  • base_path (str) – Base path for schema files.

create_table(table, columns)[source]

Create a new table in this schema.

Parameters:
Return type:

None

table_exists(table)[source]

Check if table exists in this schema.

Parameters:

table (str) – Name of the table.

Return type:

bool

Returns:

True if table exists, False otherwise.

drop_table(table)[source]

Drop a table from this schema.

Parameters:

table (str) – Name of the table.

Return type:

None

list_tables()[source]

List all tables in this schema.

Return type:

List[str]

Returns:

List of table names.

property fields: List[Any]

Get schema fields.

add_field(field)[source]

Add field to schema.

Parameters:

field (Any)

Return type:

None

remove_field(field_name)[source]

Remove field from schema.

Parameters:

field_name (str)

Return type:

None

get_field(field_name)[source]

Get field by name.

Parameters:

field_name (str)

Return type:

Optional[Any]

field_names()[source]

Get field names.

Return type:

List[str]

field_types()[source]

Get field types.

Return type:

Dict[str, Any]

__eq__(other)[source]

Check equality with another schema.

Parameters:

other (Any)

Return type:

bool

__hash__()[source]

Get hash for schema.

Return type:

int

__str__()[source]

Get string representation.

Return type:

str

__repr__()[source]

Get representation.

Return type:

str

class sparkless.storage.backends.file.FileStorageManager(base_path='sparkless_storage')[source]

Bases: IStorageManager

File-based storage manager implementation.

Parameters:

base_path (str)

Initialize file storage manager.

Parameters:

base_path (str) – Base path for storage files.

__init__(base_path='sparkless_storage')[source]

Initialize file storage manager.

Parameters:

base_path (str) – Base path for storage files.

create_schema(schema)[source]

Create a new schema.

Parameters:

schema (str) – Name of the schema to create.

Return type:

None

schema_exists(schema)[source]

Check if schema exists.

Parameters:

schema (str) – Name of the schema to check.

Return type:

bool

Returns:

True if schema exists, False otherwise.

drop_schema(schema_name, cascade=False)[source]

Drop a schema.

Parameters:
  • schema_name (str) – Name of the schema to drop.

  • cascade (bool) – Whether to cascade the drop operation.

Return type:

None

list_schemas()[source]

List all schemas.

Return type:

List[str]

Returns:

List of schema names.

table_exists(schema, table)[source]

Check if table exists.

Parameters:
  • schema (str) – Name of the schema.

  • table (str) – Name of the table.

Return type:

bool

Returns:

True if table exists, False otherwise.

create_table(schema_name, table_name, fields)[source]

Create a new table.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

  • fields (Union[List[StructField], StructType]) – Table fields definition.

Return type:

None

drop_table(schema_name, table_name)[source]

Drop a table.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

Return type:

None

insert_data(schema_name, table_name, data)[source]

Insert data into table.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

  • data (List[Dict[str, Any]]) – Data to insert.

Return type:

None

query_data(schema_name, table_name, **filters)[source]

Query data from table.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

  • **filters (Any) – Optional filter parameters.

Return type:

List[Dict[str, Any]]

Returns:

List of data rows.

query_table(schema, table, filter_expr=None)[source]

Query data from table.

Parameters:
  • schema (str) – Name of the schema.

  • table (str) – Name of the table.

  • filter_expr (Optional[str]) – Optional filter expression.

Return type:

List[Dict[str, Any]]

Returns:

List of data rows.

get_table_schema(schema_name, table_name)[source]

Get table schema.

Parameters:
  • schema_name (str) – Name of the schema.

  • table_name (str) – Name of the table.

Return type:

StructType

Returns:

Table schema.

get_data(schema, table)[source]

Get all data from table.

Parameters:
  • schema (str) – Name of the schema.

  • table (str) – Name of the table.

Return type:

List[Dict[str, Any]]

Returns:

List of data rows.

create_temp_view(name, dataframe)[source]

Create a temporary view from a DataFrame.

Parameters:
  • name (str) – Name of the temporary view.

  • dataframe (Any) – DataFrame to create view from.

Return type:

None

list_tables(schema_name=None)[source]

List tables in schema.

Parameters:

schema_name (Optional[str]) – Name of the schema. If None, list tables in all schemas.

Return type:

List[str]

Returns:

List of table names.

get_table_metadata(schema_name, table_name)[source]

Get table metadata including Delta-specific fields.

Parameters:
  • schema_name (str)

  • table_name (str)

Return type:

Dict[str, Any]

update_table_metadata(schema_name, table_name, metadata_updates)[source]

Update table metadata fields.

Parameters:
Return type:

None

close()[source]

Close storage backend and clean up resources.

For file-based storage, this is a no-op as files are managed per operation.

Return type:

None