CTE Optimizationο
Overviewο
With Polars backend (v3.0.0+), query optimization is handled automatically by Polarsβ native lazy evaluation system. Polars builds an optimized execution plan for the entire operation chain, eliminating the need for manual CTE optimization.
Note: This guide describes the previous CTE-based optimization used with DuckDB backend. With Polars, optimization is automatic and more efficient.
Polars Backend (v3.0.0+)ο
With Polars backend, optimization is automatic:
Lazy Evaluation: Operations are built into a lazy execution plan
Automatic Optimization: Polars optimizes the entire plan before execution
No Intermediate Tables: No need to create temporary tables
Single Materialization: Only the final result is materialized
How It Worksο
from sparkless.sql import SparkSession
spark = SparkSession("MyApp") # Uses Polars by default
df = spark.createDataFrame(data)
# Polars automatically optimizes this entire chain
result = df.filter(age > 25).select("name", "age").withColumn("bonus", salary * 0.1)
# Materialization happens only when needed (show, collect, etc.)
result.show() # Polars optimizes and executes the entire plan
Performance Benefitsο
No SQL Generation: Direct DataFrame operations
Automatic Optimization: Polars optimizes the execution plan
Better Performance: Polars is optimized for DataFrame operations
Thread-Safe: No connection management overhead
Legacy: DuckDB CTE Optimization (v2.x)ο
Previously, with DuckDB backend, each DataFrame operation would:
Create a new temporary table
Execute a SELECT query from the source table
Insert results into the new temporary table
The CTE optimization built a single SQL query with chained CTEs instead.
Example SQL generated for df.filter(age > 25).select("name", "age").withColumn("bonus", salary * 0.1):
WITH cte_0 AS (SELECT * FROM source_table WHERE "age" > 25),
cte_1 AS (SELECT "name", "age" FROM cte_0),
cte_2 AS (SELECT "name", "age", "salary" * 0.1 AS "bonus" FROM cte_1)
SELECT * FROM cte_2
Implementation Detailsο
Architectureο
Note: CTE optimization is only relevant for DuckDB backend. With Polars (default in v3.0.0+), optimization is automatic.
The legacy implementation was in sparkless/backend/duckdb/query_executor.py with these key components:
1. Updated materialize() Methodο
def materialize(self, data, schema, operations):
"""Materializes operations using CTEs with fallback to table-per-operation."""
# Create initial table with data
source_table_name = f"temp_table_{self._temp_table_counter}"
self._create_table_with_data(source_table_name, data)
# Try CTE-based approach first
try:
return self._materialize_with_cte(source_table_name, operations)
except Exception as e:
# Fallback to old approach for complex operations
warnings.warn(f"CTE optimization failed, falling back: {e}")
return self._materialize_with_tables(source_table_name, operations)
2. CTE Query Builderο
def _build_cte_query(self, source_table_name, operations):
"""Build a single SQL query with CTEs for all operations."""
cte_definitions = []
current_cte_name = source_table_name
for i, (op_name, op_val) in enumerate(operations):
cte_name = f"cte_{i}"
# Build CTE SQL for each operation type
if op_name == "filter":
cte_sql = self._build_filter_cte(current_cte_name, op_val, source_table_obj)
elif op_name == "select":
cte_sql = self._build_select_cte(current_cte_name, op_val, source_table_obj)
# ... other operations
cte_definitions.append(f"{cte_name} AS ({cte_sql})")
current_cte_name = cte_name
# Build final query
cte_clause = "WITH " + ",\n ".join(cte_definitions)
return f"{cte_clause}\nSELECT * FROM {current_cte_name}"
3. Operation-Specific CTE Buildersο
Each operation has a corresponding _build_*_cte() method:
_build_filter_cte()- Generates WHERE clause_build_select_cte()- Generates SELECT with column list_build_with_column_cte()- Generates SELECT with new/replaced column_build_order_by_cte()- Generates ORDER BY clause_build_limit_cte()- Generates LIMIT clause_build_join_cte()- Generates JOIN clause_build_union_cte()- Generates UNION ALL clause
Supported Operationsο
β Fully Supported:
filter()- With simple column comparisonsselect()- Column selection and aliasingwithColumn()- Adding/replacing columns with expressionsorderBy()- Sorting with ASC/DESClimit()- Limiting result countjoin()- All join types (inner, left, right, outer)union()- Union of DataFrames
β Advanced Features:
Window functions in select/withColumn
Aggregate functions
String operations (upper, lower, etc.)
Arithmetic expressions
Complex column expressions
β οΈ Limitations:
Function calls in filter conditions (e.g.,
F.length(col("name")) > 3) currently fall back to table-per-operationVery complex nested expressions may require fallback
Backward Compatibilityο
The implementation includes a fallback mechanism:
If CTE generation fails for any operation, it automatically falls back to the original table-per-operation approach
This ensures 100% backward compatibility
The original
_apply_*methods are preserved as_materialize_with_tables()
Performance Benefitsο
Polars Backend (v3.0.0+)ο
No I/O for Intermediate Results: Operations are chained in lazy evaluation
Automatic Optimization: Polars optimizes the entire execution plan
Memory Efficient: Only final result is materialized
Better Performance: Polars is optimized for DataFrame operations
Legacy DuckDB Backendο
I/O Reduction:
Before: N intermediate table writes for N operations
After (CTE): 1 table creation + 1 SELECT with CTEs
Memory Efficiency:
Before: N temporary tables in memory/disk
After: 1 query execution with DuckDBβs optimized execution plan
Query Optimization:
DuckDB can optimize the entire CTE chain at once
Better query planning and execution strategies
Reduced overhead from multiple separate queries
Benchmark Resultsο
Polars Backend:
Operations are chained lazily - no intermediate materialization
Single materialization at the end
Automatic optimization by Polars
Legacy DuckDB (CTE optimization): For a typical pipeline with 5 operations:
Table-per-operation: 5 table creations + 5 SELECTs + 5 INSERTs
CTE optimization: 1 table creation + 1 SELECT with CTEs
Example operations: filter β select β withColumn β orderBy β limit
Old approach: ~5x the I/O operations
New approach: Single query execution
Usageο
Polars Backend (v3.0.0+): Optimization is automatic - no configuration needed:
from sparkless.sql import SparkSession
spark = SparkSession("MyApp") # Uses Polars by default
# All operations are automatically optimized
Legacy DuckDB Backend: The CTE optimization is automatically applied when using lazy evaluation:
from sparkless.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("app").getOrCreate()
# Create DataFrame with lazy evaluation enabled
df = spark.createDataFrame(data).withLazy(True)
# Chain operations - they're queued, not executed
result_df = (
df.filter(F.col("age") > 25)
.select("name", "age", "salary")
.withColumn("bonus", F.col("salary") * 0.1)
.withColumn("total", F.col("salary") + F.col("bonus"))
.orderBy(F.desc("total"))
.limit(10)
)
# Materialize with CTE optimization when action is called
results = result_df.collect() # Single CTE query executed here
Testingο
Comprehensive test suite in tests/unit/test_cte_optimization.py:
β Filter + Select + WithColumn chains
β OrderBy and Limit operations
β Complex expressions and nested operations
β Window functions
β Empty DataFrames
β Single operations
β String operations with upper/lower
β Fallback behavior
All existing tests pass with CTE optimization enabled, ensuring backward compatibility.
Future Enhancementsο
Potential improvements for future releases:
Enhanced Filter Support: Handle function calls in filter conditions without fallback
Aggregate Operations: Optimize groupBy + aggregate patterns
Complex Joins: Multi-DataFrame joins with shared CTEs
Query Plan Inspection: Add explain() method to show CTE structure
Metrics: Track CTE vs table-per-operation usage statistics
Technical Notesο
SQL Generationο
The CTE builder reuses existing SQL generation logic from the _apply_* methods:
_expression_to_sql()- Converts column expressions to SQL_condition_to_sql()- Converts filter conditions to SQL_window_spec_to_sql()- Converts window specifications to SQL
Error Handlingο
The implementation includes robust error handling:
Catches exceptions during CTE generation
Falls back gracefully to table-per-operation
Warns users when fallback occurs
Preserves error messages for debugging
DuckDB Integrationο
The CTE optimization leverages DuckDBβs capabilities:
Efficient CTE execution
Query optimization across CTE chain
Memory-efficient query planning
Support for complex SQL constructs
Conclusionο
The CTE-based query optimization significantly improves performance for DataFrame operation chains while maintaining 100% backward compatibility through the fallback mechanism. The implementation is transparent to users and automatically applied when lazy evaluation is enabled.