"""
Polars materializer for lazy DataFrame operations.
This module provides materialization of lazy DataFrame operations using Polars,
replacing SQL-based materialization with Polars DataFrame operations.
"""
import contextlib
from typing import Any, Dict, List, Optional, Set, Tuple, cast
import polars as pl
from sparkless.spark_types import StructType, Row, get_row_value
from sparkless.functions import ColumnOperation
from sparkless.functions.core.column import Column
from .expression_translator import PolarsExpressionTranslator
from .operation_executor import PolarsOperationExecutor
[docs]
class PolarsMaterializer:
"""Materializes lazy operations using Polars."""
# Explicit capability declarations
SUPPORTED_OPERATIONS = {
"select",
"filter",
"withColumn",
"drop",
"join",
"union",
"orderBy",
"limit",
"offset",
"groupBy",
"distinct",
"withColumnRenamed",
}
# Operations that are explicitly unsupported (require manual materialization)
UNSUPPORTED_OPERATIONS = {
"months_between",
"pi",
"e",
}
# Optional: Operation-specific metadata (for future extensibility)
OPERATION_METADATA: Dict[str, Dict[str, Any]] = {}
[docs]
def __init__(self) -> None:
"""Initialize Polars materializer."""
self.translator = PolarsExpressionTranslator()
self.operation_executor = PolarsOperationExecutor(self.translator)
def _get_case_sensitive(self) -> bool:
"""Get case sensitivity setting from active session.
Returns:
True if case-sensitive mode is enabled, False otherwise.
Defaults to False (case-insensitive) to match PySpark behavior.
"""
try:
from sparkless.session.core.session import SparkSession
active_sessions = getattr(SparkSession, "_active_sessions", [])
if active_sessions:
session = active_sessions[-1]
if hasattr(session, "conf"):
return bool(session.conf.is_case_sensitive())
except (AttributeError, TypeError):
pass
return False # Default to case-insensitive (matching PySpark)
@staticmethod
def _extract_isin_column_dtypes(
condition: Any, polars_schema: Any
) -> Dict[str, Any]:
"""Recursively extract column dtypes for isin ops in OR/AND trees (Issue #419)."""
from ...core.column_resolver import ColumnResolver
result: Dict[str, Any] = {}
polars_col_names = (
list(polars_schema.names())
if hasattr(polars_schema, "names")
else list(polars_schema.keys())
)
def _walk(expr: Any) -> None:
if isinstance(expr, ColumnOperation):
if expr.operation in ("isin", "between") and hasattr(expr, "column"):
col_ref = expr.column
col_name = getattr(col_ref, "name", None)
if col_name is None and hasattr(col_ref, "_original_column"):
col_name = getattr(col_ref._original_column, "name", None)
if col_name:
resolved = ColumnResolver.resolve_column_name(
col_name, polars_col_names, case_sensitive=False
)
if resolved and resolved not in result:
result[resolved] = polars_schema[resolved]
elif expr.operation in ("|", "&"):
_walk(expr.column)
if hasattr(expr, "value"):
_walk(expr.value)
elif expr.operation in ("!", "~") and hasattr(expr, "column"):
_walk(expr.column)
_walk(condition)
return result
[docs]
def materialize(
self,
data: List[Dict[str, Any]],
schema: StructType,
operations: List[Tuple[str, Any]],
) -> List[Row]:
"""Materialize lazy operations into actual data.
Args:
data: Initial data
schema: DataFrame schema
operations: List of queued operations (operation_name, payload)
Returns:
List of result rows
"""
# Check if we have operations that require processing even with empty data
# (e.g., union with non-empty DataFrame)
has_union_operation = any(op_name == "union" for op_name, _ in operations)
if not data and not has_union_operation:
# Empty DataFrame with no operations that need processing
return []
# Convert data to Polars DataFrame
# For empty DataFrames, create from schema if available
if not data and schema.fields:
from .type_mapper import mock_type_to_polars_dtype
schema_dict = {}
for field in schema.fields:
polars_dtype = mock_type_to_polars_dtype(field.dataType)
schema_dict[field.name] = pl.Series(field.name, [], dtype=polars_dtype)
df = pl.DataFrame(schema_dict)
elif not data:
# Empty DataFrame with no schema
df = pl.DataFrame()
else:
# Create DataFrame from data
# Handle tuple/list format by converting to dicts using schema field names
# Note: Type signature says List[dict], but we defensively handle tuples at runtime
if data:
first_row: Any = data[0] # Allow runtime check for tuples/lists
if isinstance(first_row, (list, tuple)):
# Convert tuples to dicts using schema field names
converted_data = []
field_names = [f.name for f in schema.fields]
for row in data:
row_any: Any = row # Allow runtime check for tuples/lists
if isinstance(row_any, (list, tuple)):
# Only iterate up to the minimum of row length and field_names length
# This handles cases where select operations reduce the number of columns
converted_data.append(
{
field_names[i]: row_any[i]
for i in range(min(len(row_any), len(field_names)))
}
)
else:
converted_data.append(row_any)
# Issue #413: tuple path - use pl.from_dicts with schema when union present
# (pl.DataFrame can reorder, breaking position-based union).
# Dict path uses pl.DataFrame - schema may be projected (post-select)
# and would drop columns like StructValue needed for select expressions.
df = (
pl.from_dicts(converted_data, schema=field_names)
if has_union_operation
else pl.DataFrame(converted_data)
)
else:
# Dict data: always use pl.DataFrame - passed-in schema may be
# final/projected (e.g. after select) and would mismatch base data
df = pl.DataFrame(data)
else:
df = pl.DataFrame(data)
# Enforce schema types when:
# 1. Union operation (to prevent type mismatches), or
# 2. Column inferred as Null but schema expects List/Array (e.g. [{"a": None}]
# with ArrayType - Polars infers Null, but array_distinct needs List type)
if schema.fields:
from .type_mapper import mock_type_to_polars_dtype
cast_exprs = []
for field in schema.fields:
if field.name not in df.columns:
continue
polars_dtype = mock_type_to_polars_dtype(field.dataType)
inferred = df[field.name].dtype
if inferred == polars_dtype:
continue
# Cast when: union (numeric types only), or Null->List for array_distinct
if has_union_operation and polars_dtype in (
pl.Int32,
pl.Int64,
pl.Float32,
pl.Float64,
):
cast_exprs.append(pl.col(field.name).cast(polars_dtype))
elif inferred == pl.Null and str(polars_dtype).startswith("List"):
# All-null column: Polars infers Null, but schema expects List(X)
cast_exprs.append(pl.col(field.name).cast(polars_dtype))
if cast_exprs:
df = df.with_columns(cast_exprs)
# Use lazy evaluation for better performance
lazy_df = df.lazy()
# Track original schema BEFORE any operations
# The schema parameter should be the original schema before any operations
# If we have data, we can verify by checking the data keys match schema fields
original_schema = schema
# Verify schema matches data (if data exists)
if data and len(data) > 0:
first_row = data[0]
# Handle both dict and tuple formats
is_dict_format = isinstance(first_row, dict)
is_tuple_format = isinstance(first_row, (list, tuple))
if is_dict_format:
data_keys = set(first_row.keys())
elif is_tuple_format:
# Tuple/list format - use schema field names
data_keys = {f.name for f in schema.fields}
else:
# Fallback: try to get keys if possible
data_keys = set(getattr(first_row, "keys", lambda: [])())
schema_keys = {field.name for field in schema.fields}
# If we have operations, infer base schema from data for the op loop.
# Issue #413: pass column_order to preserve data key order (infer_from_data
# sorts alphabetically by default, which breaks union position semantics).
if data and operations and is_dict_format:
from ...core.schema_inference import SchemaInferenceEngine
try:
column_order = (
list(data[0].keys())
if data and isinstance(data[0], dict)
else None
)
inferred_schema, _ = SchemaInferenceEngine.infer_from_data(
data, column_order=column_order
)
original_schema = inferred_schema
except (ValueError, Exception):
pass
elif is_dict_format and data_keys != schema_keys and not operations:
# No ops: infer when schema doesn't match data
from ...core.schema_inference import SchemaInferenceEngine
inferred_schema, _ = SchemaInferenceEngine.infer_from_data(data)
original_schema = inferred_schema
original_columns = {field.name for field in original_schema.fields}
# Track current schema as operations are applied
current_schema = original_schema
# Build mapping of computed expressions to column names for optimization
from ...dataframe.lazy import LazyEvaluationEngine
computed_expressions = LazyEvaluationEngine._build_computed_expressions_map(
operations
)
# Build column dependency graph for drop operation optimization
# This tracks which columns depend on which other columns
# IMPORTANT: Use the projected schema (which includes computed columns) to determine
# available columns, not just original_columns. This ensures that intermediate
# columns created by earlier operations are available for dependency tracking.
# The schema parameter already reflects the projected schema after all operations.
projected_columns = {field.name for field in schema.fields}
column_dependencies = LazyEvaluationEngine._build_column_dependency_graph(
operations, projected_columns
)
# Track current columns as we process operations (for dependency tracking)
# Start with projected columns (which includes computed columns from earlier operations)
current_available_columns = projected_columns.copy()
# Track if we have a materialized DataFrame available (e.g., after drop operation)
# This avoids schema issues when converting back to lazy and then collecting again
df_materialized: Optional[pl.DataFrame] = None
# Group operations to handle filter-before-select optimization
# When filter references original columns not in current schema, push it before select
optimized_operations = []
i = 0
while i < len(operations):
op_name, payload = operations[i]
if op_name == "select" and i + 1 < len(operations):
# Check if next operation is a filter that references original columns
next_op_name, next_payload = operations[i + 1]
if next_op_name == "filter":
# Check if filter references columns not in select result
# Extract column names that will be in the result of select
# (using schema inference, not just looking at the expressions)
from ...dataframe.schema.schema_manager import SchemaManager
select_schema = SchemaManager.project_schema_with_operations(
current_schema, [(op_name, payload)]
)
current_columns = {field.name for field in select_schema.fields}
# Check if filter references original columns not in current columns
missing_cols = LazyEvaluationEngine._extract_column_names(
next_payload, current_columns
)
# If filter references original columns, push it before select
if missing_cols and missing_cols.issubset(original_columns):
# Filter references original columns - apply it before select
# Add filter first, then select
optimized_operations.append((next_op_name, next_payload))
optimized_operations.append(
(op_name, payload)
) # Add select after filter
i += 2 # Skip both select and filter
continue
optimized_operations.append((op_name, payload))
i += 1
# Apply optimized operations in sequence
# If no optimization happened, optimized_operations will be the same as operations
for current_op_index, (op_name, payload) in enumerate(optimized_operations):
if op_name == "filter":
# Filter operation - optimize to use computed columns if available
# Note: When filter is pushed before select, computed_expressions will be empty
# because select hasn't been applied yet. This is fine - the filter references
# original columns, not computed ones.
# If we have a materialized DataFrame, convert it back to lazy first
if df_materialized is not None:
lazy_df = df_materialized.lazy()
df_materialized = None
optimized_condition = (
LazyEvaluationEngine._replace_with_computed_column(
payload, computed_expressions
)
)
# Get available columns from lazy DataFrame schema for case-insensitive matching
available_columns = (
list(lazy_df.collect_schema().names())
if hasattr(lazy_df, "collect_schema")
else []
)
# Check if condition contains struct field paths that need special handling
# (e.g., F.col("StructVal")["E1"] > 2 creates ColumnOperation with Column("StructVal.E1"))
# This must be checked BEFORE translation to avoid ColumnNotFoundError
is_struct_field_path = False
if isinstance(optimized_condition, ColumnOperation):
if (
isinstance(optimized_condition.column, Column)
and "." in optimized_condition.column.name
):
is_struct_field_path = True
elif (
isinstance(optimized_condition, Column)
and "." in optimized_condition.name
):
is_struct_field_path = True
if is_struct_field_path:
# Handle struct field path in filter
# Use apply_filter which has the struct field path handling
df_collected = lazy_df.collect()
filtered_df = self.operation_executor.apply_filter(
df_collected, optimized_condition
)
lazy_df = filtered_df.lazy()
continue
# Extract column dtype for isin operations to enable type coercion (Issue #369, #419)
# Handles both direct isin and ~col.isin([...]) so string column vs int list works
# For OR/AND, recursively find nested isin ops and build column_dtypes map
input_col_dtype = None
isin_op = None
column_dtypes: Dict[str, Any] = {}
if isinstance(optimized_condition, ColumnOperation):
if optimized_condition.operation == "isin":
isin_op = optimized_condition
elif optimized_condition.operation in ["!", "~"] and isinstance(
optimized_condition.column, ColumnOperation
):
inner = optimized_condition.column
if inner.operation == "isin":
isin_op = inner
if isin_op is not None and hasattr(isin_op, "column"):
col_ref = isin_op.column
col_name = getattr(col_ref, "name", None)
if col_name is None and hasattr(col_ref, "_original_column"):
col_name = getattr(col_ref._original_column, "name", None)
if col_name and hasattr(lazy_df, "collect_schema"):
from ...core.column_resolver import ColumnResolver
polars_schema = lazy_df.collect_schema()
polars_col_names: List[str] = (
list(polars_schema.names())
if hasattr(polars_schema, "names")
else list(polars_schema.keys())
)
resolved_name = ColumnResolver.resolve_column_name(
col_name, polars_col_names, case_sensitive=False
)
if resolved_name is not None:
input_col_dtype = polars_schema[resolved_name]
# Recursively extract isin column dtypes for OR/AND (Issue #419)
if hasattr(lazy_df, "collect_schema"):
column_dtypes = PolarsMaterializer._extract_isin_column_dtypes(
optimized_condition, lazy_df.collect_schema()
)
# Issue #370: isin with numeric literals on string column - apply on eager
# DataFrame so cast(Utf8).is_in([...]) is evaluated correctly (LazyFrame
# path can yield 0 rows in some Polars/contexts).
if isin_op is not None:
isin_value = getattr(isin_op, "value", None)
values_numeric = (
isinstance(isin_value, list)
and isin_value
and all(isinstance(v, (int, float)) for v in isin_value)
) or isinstance(isin_value, (int, float))
if values_numeric and (
input_col_dtype is None
or getattr(input_col_dtype, "name", None) == "Utf8"
or str(getattr(input_col_dtype, "name", "")).lower()
in ("string", "utf8")
):
df_collected = lazy_df.collect()
filtered_df = self.operation_executor.apply_filter(
df_collected, optimized_condition
)
lazy_df = filtered_df.lazy()
continue
try:
filter_expr = self.translator.translate(
optimized_condition,
input_col_dtype=input_col_dtype,
available_columns=available_columns,
column_dtypes=column_dtypes,
)
except ValueError as e:
# Check if this is a WindowFunction comparison that should be handled
error_msg = str(e)
from sparkless.functions.window_execution import WindowFunction
# Note: Column is already imported at the top of the file
is_window_function_comparison = (
"WindowFunction comparison" in error_msg
and isinstance(optimized_condition, ColumnOperation)
and isinstance(optimized_condition.column, WindowFunction)
)
if is_window_function_comparison:
# Handle WindowFunction comparison in filter
# Use apply_filter which has the WindowFunction comparison handling
df_collected = lazy_df.collect()
filtered_df = self.operation_executor.apply_filter(
df_collected, optimized_condition
)
lazy_df = filtered_df.lazy()
continue
# Check if this is a struct field path comparison that should be handled
# (e.g., F.col("StructVal")["E1"] > 2 creates ColumnOperation with Column("StructVal.E1"))
# Note: Column is already imported at the top of the file
is_struct_field_path = False
if isinstance(optimized_condition, ColumnOperation):
if (
isinstance(optimized_condition.column, Column)
and "." in optimized_condition.column.name
):
is_struct_field_path = True
elif (
isinstance(optimized_condition, Column)
and "." in optimized_condition.name
):
is_struct_field_path = True
if is_struct_field_path:
# Handle struct field path in filter
# Use apply_filter which has the struct field path handling
df_collected = lazy_df.collect()
filtered_df = self.operation_executor.apply_filter(
df_collected, optimized_condition
)
lazy_df = filtered_df.lazy()
continue
# Fallback to Python evaluation for unsupported operations (e.g., + with strings)
if "+ operation requires Python evaluation" in error_msg:
# Convert to eager DataFrame for Python evaluation
df_collected = lazy_df.collect()
data = df_collected.to_dicts()
from sparkless.dataframe.evaluation.expression_evaluator import (
ExpressionEvaluator,
)
evaluator = ExpressionEvaluator()
# Evaluate filter condition for each row
filtered_data = [
row
for row in data
if evaluator.evaluate_expression(row, optimized_condition)
]
# Recreate DataFrame from filtered data
if filtered_data:
lazy_df = pl.DataFrame(filtered_data).lazy()
else:
# Empty result - create empty DataFrame with same schema
if hasattr(lazy_df, "collect_schema"):
schema = lazy_df.collect_schema()
schema_dict = {
col: pl.Series(col, [], dtype=schema[col])
for col in schema.names()
}
lazy_df = pl.DataFrame(schema_dict).lazy()
else:
lazy_df = pl.DataFrame().lazy()
continue
else:
raise
# Apply filter to lazy DataFrame
# Catch Polars ColumnNotFoundError and convert to SparkColumnNotFoundError
try:
lazy_df = lazy_df.filter(filter_expr)
except pl.exceptions.ColumnNotFoundError as e:
# Check if this is a struct field path error (e.g., "StructVal.E1" not found)
# If so, handle it by collecting and using apply_filter
error_msg = str(e)
import re
# Note: Column is already imported at the top of the file
# Check if the error is about a struct field path
col_match = re.search(
r'unable to find column\s+"([^"]+)"', error_msg
)
if col_match:
col_name = col_match.group(1)
# Check if this is a struct field path (contains a dot)
if "." in col_name:
# Check if the condition contains this struct field path
is_struct_field_path = False
if isinstance(optimized_condition, ColumnOperation):
if (
isinstance(optimized_condition.column, Column)
and "." in optimized_condition.column.name
):
is_struct_field_path = True
elif (
isinstance(optimized_condition, Column)
and "." in optimized_condition.name
):
is_struct_field_path = True
if is_struct_field_path:
# Handle struct field path in filter
# Use apply_filter which has the struct field path handling
df_collected = lazy_df.collect()
filtered_df = self.operation_executor.apply_filter(
df_collected, optimized_condition
)
lazy_df = filtered_df.lazy()
continue
# Convert Polars error to our consistent error format
from ...core.exceptions.operation import SparkColumnNotFoundError
# Extract column name from error message
# Polars error format: "unable to find column "col_name"; valid columns: [...]"
# Extract column name and available columns
valid_match = re.search(r"valid columns:\s*\[([^\]]+)\]", error_msg)
if col_match and valid_match:
col_name = col_match.group(1)
valid_cols_str = valid_match.group(1)
# Parse valid columns (remove quotes and split)
available_columns = [
col.strip().strip('"').strip("'")
for col in valid_cols_str.split(",")
]
raise SparkColumnNotFoundError(col_name, available_columns)
else:
# Fallback: try to extract column name from error message
# or use the original error message
raise SparkColumnNotFoundError(
"unknown_column",
list(lazy_df.collect().columns)
if lazy_df is not None
else [],
f"Column not found during filter operation: {error_msg}",
)
# Verify filter worked by checking row count (for debugging)
# Note: We don't update current_schema for filter as it doesn't change columns
elif op_name == "select":
# Select operation - need to collect first for window functions
# Use materialized DataFrame if available, otherwise collect from lazy
# Preserve materialized state if it contains computed values (e.g., from to_timestamp)
if df_materialized is not None:
df_collected = df_materialized
# Don't clear df_materialized yet - we'll clear it after select if needed
# This ensures computed values are preserved through select
else:
df_collected = lazy_df.collect()
# Check if columns are being dropped (columns before select vs after)
columns_before = set(df_collected.columns)
# Validate column names exist, but preserve requested column names for output
# apply_select will handle resolution and aliasing to preserve requested names
# This ensures PySpark behavior: requested column name is used as output name
available_cols = list(df_collected.columns)
case_sensitive = self._get_case_sensitive()
for col in payload:
if isinstance(col, str) and col != "*":
from sparkless.core.column_resolver import ColumnResolver
# Validate column exists (for error reporting), but keep original name
# Handle table-prefixed (e.g. "t1.id") or struct field (e.g. "Person.name")
if "." in col:
# Try full string first - ColumnResolver maps "t1.id" to "id" or "t1_id"
resolved_full = ColumnResolver.resolve_column_name(
col, available_cols, case_sensitive
)
if resolved_full is not None:
# Table-prefixed column (e.g. t1.id -> id or t1_id)
pass # apply_select will resolve; skip struct validation
else:
# Struct field access, or right-alias.column after join (_right_X)
parts = col.split(".", 1)
struct_col = parts[0]
field_name = parts[1]
resolved_struct_col = (
ColumnResolver.resolve_column_name(
struct_col, available_cols, case_sensitive
)
)
if resolved_struct_col is None:
# After join with right prefix, "c.name" -> _right_name (#380)
right_prefixed = f"_right_{field_name}"
if right_prefixed in available_cols:
pass # valid; apply_select will resolve
else:
from ...core.exceptions.operation import (
SparkColumnNotFoundError,
)
raise SparkColumnNotFoundError(
struct_col, available_cols
)
# Keep original nested column name for output (apply_select will handle resolution)
# Validation is done above, now preserve the original requested name for output
# apply_select will resolve and alias correctly
else:
# Validate column exists, but preserve requested name
resolved_col: Optional[str] = (
ColumnResolver.resolve_column_name(
col, available_cols, case_sensitive
)
)
if resolved_col is None:
from ...core.exceptions.operation import (
SparkColumnNotFoundError,
)
raise SparkColumnNotFoundError(col, available_cols)
# Don't replace col with resolved_col - preserve requested name
# apply_select will resolve and apply alias to preserve requested name
# Apply select with original column names (not resolved)
# apply_select will handle resolution and aliasing to preserve requested names
# This ensures the output column names match the requested names (PySpark behavior)
result_df = self.operation_executor.apply_select(df_collected, payload)
# If we started with df_materialized, keep the result materialized
# to preserve computed values (e.g., from to_timestamp operations)
had_materialized_before_select = df_materialized is not None
# Get columns after select
columns_after = set(result_df.columns)
# Check if select includes computed columns (columns created by withColumn)
# Computed columns are those in the current schema but not in the original schema
# or columns that were created by previous withColumn operations
original_column_names = {field.name for field in original_schema.fields}
selected_columns = set()
for col in payload:
if isinstance(col, str) and col != "*":
selected_columns.add(col)
elif hasattr(col, "name"):
# Column object - get the name
selected_columns.add(col.name)
elif hasattr(col, "_alias") and col._alias:
# Column with alias - use alias name
selected_columns.add(col._alias)
# Check if any selected column is a computed column (not in original schema)
has_computed_columns = bool(selected_columns - original_column_names)
# If we had materialized data before OR if we're selecting computed columns,
# keep it materialized after select to preserve computed values
# This is especially important when distinct follows select, as distinct
# needs the materialized DataFrame to preserve computed column values
if had_materialized_before_select or has_computed_columns:
df_materialized = result_df
lazy_df = None
else:
lazy_df = result_df.lazy()
df_materialized = None
# If columns were dropped, clear the expression cache to invalidate
# cached expressions that reference the dropped columns
# This fixes issue #160 where cached expressions reference dropped columns
if columns_before - columns_after:
self.translator.clear_cache()
# Update schema after select
from ...dataframe.schema.schema_manager import SchemaManager
# Get case sensitivity from session for schema projection
case_sensitive = self._get_case_sensitive()
current_schema = SchemaManager.project_schema_with_operations(
current_schema, [(op_name, payload)], case_sensitive
)
elif op_name == "withColumn":
# WithColumn operation - need to collect first for window functions
# Use materialized DataFrame if available, otherwise collect from lazy
# This avoids schema mismatch issues when converting back to lazy after drop
# Track if we had materialized data before clearing it (to preserve computed values)
had_materialized_before = df_materialized is not None
if df_materialized is not None:
df_collected = df_materialized
df_materialized = None
elif lazy_df is not None:
df_collected = lazy_df.collect()
else:
# Should not happen, but handle gracefully
raise ValueError("No DataFrame available for withColumn operation")
column_name, expression = payload
# Get the expected schema after this operation BEFORE applying withColumn
# This allows us to pass the expected type to enforce correct casting
from ...dataframe.schema.schema_manager import SchemaManager
updated_schema = SchemaManager.project_schema_with_operations(
current_schema, [(op_name, payload)]
)
# Find the expected type for this column from the updated schema
expected_field = None
for field in updated_schema.fields:
if field.name == column_name:
expected_field = field
break
# Apply withColumn with expected schema type enforcement
result_df = self.operation_executor.apply_with_column(
df_collected, column_name, expression, expected_field
)
# Don't align after withColumn - the result should already match the schema
# Alignment can cause issues when the expression produces the correct type
# but alignment tries to enforce a different type
# Keep materialized DataFrame only if next operation is withColumnRenamed
# This ensures columns added by withColumn are preserved through rename operations
# For to_timestamp operations, keep materialized to avoid schema validation issues
# when converting back to lazy (Polars validates against expression input types)
# Also keep materialized if we had materialized data before (to preserve computed values)
from sparkless.spark_types import TimestampType
is_timestamp_column = expected_field is not None and isinstance(
expected_field.dataType, TimestampType
)
next_op_index = current_op_index + 1
if next_op_index < len(optimized_operations):
next_op_name, _ = optimized_operations[next_op_index]
# Keep materialized if:
# 1. Next operation is withColumnRenamed
# 2. This is a timestamp column (to avoid validation issues)
# 3. We had materialized data before (to preserve computed values through multiple withColumn ops)
# 4. Next operation is select (to preserve computed column values for select->distinct sequences)
if (
next_op_name == "withColumnRenamed"
or is_timestamp_column
or had_materialized_before
or next_op_name == "select"
):
# Keep materialized for next withColumnRenamed operation, for to_timestamp,
# or to preserve computed values from previous withColumn operations
# This avoids Polars schema validation issues when converting to lazy
# and ensures computed values are preserved
df_materialized = result_df
# For to_timestamp, when preserving materialized state, or when next op is select,
# don't create lazy_df. The materialized DataFrame will be used directly.
lazy_df = (
None
if (
is_timestamp_column
or had_materialized_before
or next_op_name == "select"
)
else result_df.lazy()
) # Don't create lazy frame for to_timestamp, when preserving materialized, or when next is select
else:
# Convert result back to lazy for other operations
lazy_df = result_df.lazy()
df_materialized = None
else:
# No more operations
# Keep materialized for to_timestamp, when we had materialized data,
# or when this is a computed column (to preserve values for final collection)
# This avoids validation on final collection and preserves computed values
if is_timestamp_column or had_materialized_before:
df_materialized = result_df
lazy_df = None # Don't create lazy frame for to_timestamp or when preserving materialized
else:
# For computed columns (like struct field paths), keep materialized to preserve values
# Check if this column was created from a struct field path by checking the expression
# Column is already imported at module level
is_computed_column = (
isinstance(expression, Column) and "." in expression.name
) or (
hasattr(expression, "column")
and isinstance(expression.column, Column)
and "." in expression.column.name
)
if is_computed_column:
df_materialized = result_df
lazy_df = None
else:
# Convert result back to lazy for final collection
lazy_df = result_df.lazy()
df_materialized = None
# Update schema and available columns after withColumn
from ...dataframe.schema.schema_manager import SchemaManager
# Get case sensitivity from session for schema projection
case_sensitive = self._get_case_sensitive()
current_schema = SchemaManager.project_schema_with_operations(
current_schema, [(op_name, payload)], case_sensitive
)
current_available_columns.add(column_name)
elif op_name == "join":
# Join operation - need to handle separately
other_df, on, how = payload
right_alias = getattr(
other_df, "_alias", None
) # For compound join resolution
# Materialize other_df if it has lazy operations before converting to Polars
# This ensures renamed columns and other operations are applied
if not isinstance(other_df, pl.DataFrame):
# Materialize if it has lazy operations
if (
hasattr(other_df, "_operations_queue")
and other_df._operations_queue
):
# Materialize the other DataFrame first
materialized_other = other_df._materialize_if_lazy()
# Convert materialized DataFrame to Polars
if hasattr(materialized_other, "collect"):
# It's still a Sparkless DataFrame, get the data
other_rows = materialized_other.collect()
other_schema = getattr(materialized_other, "schema", None)
def _row_to_dict(row: Any) -> Dict[str, Any]:
# Sparkless Row
if hasattr(row, "asDict") and callable(row.asDict):
return cast("Dict[str, Any]", row.asDict())
# Already a mapping
if isinstance(row, dict):
return cast("Dict[str, Any]", row)
# Sequence-like rows (e.g. tuple/list) with a known schema
if other_schema is not None and hasattr(
other_schema, "fields"
):
try:
values = list(row)
return {
field.name: values[i]
for i, field in enumerate(
other_schema.fields
)
if i < len(values)
}
except (TypeError, IndexError, KeyError):
pass
# Fallback: iterables of (k, v) pairs
return cast("Dict[str, Any]", dict(row))
other_data = (
[_row_to_dict(row) for row in other_rows]
if other_rows
else []
)
if not other_data:
# Empty DataFrame - create from schema if available
if hasattr(materialized_other, "schema"):
from .type_mapper import mock_type_to_polars_dtype
schema_dict = {}
for field in materialized_other.schema.fields:
polars_dtype = mock_type_to_polars_dtype(
field.dataType
)
schema_dict[field.name] = pl.Series(
field.name, [], dtype=polars_dtype
)
other_df = pl.DataFrame(schema_dict)
else:
other_df = pl.DataFrame()
else:
# Use schema to ensure column names are preserved correctly
if hasattr(materialized_other, "schema"):
# Verify column names match schema (safety check)
schema_cols = set(
materialized_other.schema.fieldNames()
)
data_cols = (
set(other_data[0].keys())
if other_data
else set()
)
if schema_cols != data_cols:
# Column mismatch - use schema to create DataFrame with correct column order
# Create DataFrame ensuring column order matches schema
ordered_data = []
for row in other_data:
ordered_row = {
field.name: get_row_value(
row, field.name
)
for field in materialized_other.schema.fields
}
ordered_data.append(ordered_row)
other_df = pl.DataFrame(ordered_data)
else:
other_df = pl.DataFrame(other_data)
else:
other_df = pl.DataFrame(other_data)
else:
other_data = getattr(materialized_other, "data", [])
if not other_data:
# Empty DataFrame - create from schema if available
if hasattr(materialized_other, "schema"):
from .type_mapper import mock_type_to_polars_dtype
schema_dict = {}
for field in materialized_other.schema.fields:
polars_dtype = mock_type_to_polars_dtype(
field.dataType
)
schema_dict[field.name] = pl.Series(
field.name, [], dtype=polars_dtype
)
other_df = pl.DataFrame(schema_dict)
else:
other_df = pl.DataFrame()
else:
other_df = pl.DataFrame(other_data)
else:
other_data = getattr(other_df, "data", [])
if not other_data:
# Empty DataFrame - create from schema if available
if hasattr(other_df, "schema"):
from .type_mapper import mock_type_to_polars_dtype
schema_dict = {}
for field in other_df.schema.fields:
polars_dtype = mock_type_to_polars_dtype(
field.dataType
)
schema_dict[field.name] = pl.Series(
field.name, [], dtype=polars_dtype
)
other_df = pl.DataFrame(schema_dict)
else:
other_df = pl.DataFrame()
else:
other_df = pl.DataFrame(other_data)
# Collect before joining - use df_materialized when available
# (e.g. after select with computed columns like struct field + alias)
if df_materialized is not None:
df_collected = df_materialized
df_materialized = None
else:
df_collected = lazy_df.collect()
result_df = self.operation_executor.apply_join(
df_collected, other_df, on=on, how=how, right_alias=right_alias
)
lazy_df = result_df.lazy()
elif op_name == "union":
# Union operation - need to collect first
# Use df_materialized when available (e.g. after select with computed columns)
if df_materialized is not None:
df_collected = df_materialized
df_materialized = None
else:
df_collected = lazy_df.collect()
other_df_payload = payload
# Validate schema compatibility before union (PySpark compatibility)
# current_schema is the schema after all previous operations
if hasattr(other_df_payload, "schema"):
other_schema = other_df_payload.schema
else:
# If other_df doesn't have schema, we can't validate - skip validation
# This shouldn't happen in normal usage
other_schema = None
if other_schema is not None:
from ...dataframe.operations.set_operations import SetOperations
from ...core.exceptions.analysis import AnalysisException
# Check column count
if len(current_schema.fields) != len(other_schema.fields):
raise AnalysisException(
f"Union can only be performed on tables with the same number of columns, "
f"but the first table has {len(current_schema.fields)} columns and "
f"the second table has {len(other_schema.fields)} columns"
)
# PySpark union() matches by position, not by name (Issue #413).
# Only check type compatibility at each position.
for i, (field1, field2) in enumerate(
zip(current_schema.fields, other_schema.fields)
):
if not SetOperations._are_types_compatible(
field1.dataType, field2.dataType
):
raise AnalysisException(
f"Union can only be performed on tables with compatible column types. "
f"Column {i} type mismatch: "
f"{field1.dataType} vs {field2.dataType}"
)
# Convert other_df to Polars DataFrame if needed
if not isinstance(other_df_payload, pl.DataFrame):
# Check if other_df_payload has lazy operations that need materialization
# (e.g., withColumn operations that create computed columns like struct field paths)
if (
hasattr(other_df_payload, "_operations_queue")
and other_df_payload._operations_queue
):
# Materialize the other DataFrame to ensure computed columns are available
from ...dataframe.lazy import LazyEvaluationEngine
materialized_other = LazyEvaluationEngine.materialize(
other_df_payload
)
other_rows = materialized_other.collect()
other_schema = getattr(materialized_other, "schema", None)
def _row_to_dict(row: Any) -> Dict[str, Any]:
# Sparkless Row
if hasattr(row, "asDict") and callable(row.asDict):
return cast("Dict[str, Any]", row.asDict())
# Already a mapping
if isinstance(row, dict):
return cast("Dict[str, Any]", row)
# Sequence-like rows (e.g. tuple/list) with a known schema
if other_schema is not None and hasattr(
other_schema, "fields"
):
try:
values = list(row)
return {
field.name: values[i]
for i, field in enumerate(other_schema.fields)
if i < len(values)
}
except Exception:
pass
# Fallback: iterables of (k, v) pairs
return cast("Dict[str, Any]", dict(row))
other_data = (
[_row_to_dict(row) for row in other_rows]
if other_rows
else []
)
else:
other_data = getattr(other_df_payload, "data", [])
if not other_data:
# Empty DataFrame - create from schema if available
if hasattr(other_df_payload, "schema"):
from .type_mapper import mock_type_to_polars_dtype
schema_dict = {}
for field in other_df_payload.schema.fields:
polars_dtype = mock_type_to_polars_dtype(field.dataType)
schema_dict[field.name] = pl.Series(
field.name, [], dtype=polars_dtype
)
other_df = pl.DataFrame(schema_dict)
else:
other_df = pl.DataFrame()
else:
# Create DataFrame from data with explicit schema order (Issue #413)
# Ensure column order matches other_schema for position-based union
other_schema_obj = (
other_schema
if other_schema is not None
else getattr(other_df_payload, "schema", None)
)
if other_schema_obj is not None:
# Build dicts in schema order; use from_dicts with schema to
# preserve column order (pl.DataFrame sorts dict keys)
ordered_data = [
{
f.name: (
row.get(f.name)
if isinstance(row, dict)
else get_row_value(row, f.name)
)
for f in other_schema_obj.fields
}
for row in other_data
]
schema_names = [f.name for f in other_schema_obj.fields]
other_df = pl.from_dicts(ordered_data, schema=schema_names)
else:
other_df = pl.DataFrame(other_data)
if (
hasattr(other_df_payload, "schema")
and other_df_payload.schema.fields
):
from .type_mapper import mock_type_to_polars_dtype
cast_exprs = []
for field in other_df_payload.schema.fields:
polars_dtype = mock_type_to_polars_dtype(field.dataType)
# Only cast if column exists and type doesn't match
# Only cast numeric types to prevent Int32/Int64 mismatches
# Only cast numeric types (Int32/Int64) to prevent union issues
# Don't cast string/datetime types as they can cause schema errors
if (
field.name in other_df.columns
and other_df[field.name].dtype != polars_dtype
and polars_dtype
in (pl.Int32, pl.Int64, pl.Float32, pl.Float64)
):
cast_exprs.append(
pl.col(field.name).cast(polars_dtype)
)
if cast_exprs:
other_df = other_df.with_columns(cast_exprs)
else:
other_df = other_df_payload
# Position-based union (Issue #413): reorder other_df when column order differs
# (union matches by position, not name)
if (
other_schema is not None
and isinstance(other_df, pl.DataFrame)
and other_df.width > 0
):
left_cols = [f.name for f in current_schema.fields]
right_cols = list(other_df.columns)
if right_cols != left_cols:
other_df = pl.DataFrame(
{
current_schema.fields[i].name: other_df[
other_schema.fields[i].name
]
for i in range(len(current_schema.fields))
}
)
result_df = self.operation_executor.apply_union(df_collected, other_df)
# Schema may change after union if type coercion occurred
# PySpark normalizes types (e.g., LongType + StringType -> StringType)
# Update current_schema to reflect coerced types from result_df
from .type_mapper import polars_dtype_to_mock_type
if result_df.schema:
# Extract schema from result DataFrame (reflects coerced types)
result_schema_fields = []
for col_name, dtype in result_df.schema.items():
mock_type = polars_dtype_to_mock_type(dtype)
from ...spark_types import StructField
result_schema_fields.append(
StructField(col_name, mock_type, nullable=True)
)
if result_schema_fields:
current_schema = StructType(result_schema_fields)
lazy_df = result_df.lazy()
elif op_name == "orderBy":
# OrderBy operation - can be done lazily
# Use df_materialized when available (e.g. after select with computed columns)
if df_materialized is not None:
lazy_df = df_materialized.lazy()
df_materialized = None
# Payload can be (columns, ascending) tuple or just columns (for backward compatibility)
if isinstance(payload, tuple) and len(payload) == 2:
# New format: (columns, ascending)
columns_raw, ascending = payload
# Handle case where columns is a single list/tuple
if isinstance(columns_raw, (list, tuple)):
columns = tuple(columns_raw)
else:
columns = (columns_raw,)
elif (
isinstance(payload, tuple)
and len(payload) == 1
and isinstance(payload[0], (list, tuple))
):
# Old format: tuple containing a single list/tuple
# (e.g., when df.sort(["col1", "col2"]) is called)
columns = tuple(payload[0])
ascending = True
elif isinstance(payload, (tuple, list)):
# Old format: just columns tuple/list
columns = tuple(payload) if isinstance(payload, list) else payload
ascending = True
else:
# Old format: single column
columns = (payload,)
ascending = True
# Optimize orderBy columns to use computed columns if available
optimized_columns = []
for col in columns:
optimized_col = LazyEvaluationEngine._replace_with_computed_column(
col, computed_expressions
)
optimized_columns.append(optimized_col)
# Build sort expressions with descending and nulls_last parameters
# Polars sort() accepts:
# - by: list of column names (str) or expressions
# - descending: list of bools for direction
# - nulls_last: list of bools for nulls handling
# Get available columns from lazy DataFrame schema for case-insensitive matching
available_columns = (
list(lazy_df.collect_schema().names())
if hasattr(lazy_df, "collect_schema")
else []
)
sort_by = []
descending_flags = []
nulls_last_flags = []
for col in optimized_columns:
is_desc = False
nulls_last = None # None means default behavior
col_name = None
if isinstance(col, str):
col_name = col
is_desc = not ascending
nulls_last = True # PySpark default: nulls last
elif hasattr(col, "operation"):
operation = col.operation
col_name = (
col.column.name if hasattr(col, "column") else col.name
)
# Handle nulls variant operations
if operation == "desc_nulls_last":
is_desc = True
nulls_last = True
elif operation == "desc_nulls_first":
is_desc = True
nulls_last = False
elif operation == "asc_nulls_last":
is_desc = False
nulls_last = True
elif operation == "asc_nulls_first":
is_desc = False
nulls_last = False
elif operation == "desc":
is_desc = True
nulls_last = True # PySpark default: nulls last for desc()
elif operation == "asc":
is_desc = False
nulls_last = True # PySpark default: nulls last for asc()
else:
# Fallback for other operations
is_desc = not ascending
nulls_last = True # PySpark default: nulls last
else:
# For ColumnOperation with asc/desc, get the actual column name
if hasattr(col, "column") and hasattr(col.column, "name"):
col_name = col.column.name
elif hasattr(col, "name"):
col_name = col.name
else:
col_name = str(col)
# Remove any " ASC" or " DESC" suffix that might be in the name
col_name = (
col_name.replace(" ASC", "")
.replace(" DESC", "")
.replace(" NULLS FIRST", "")
.replace(" NULLS LAST", "")
.strip()
)
is_desc = not ascending
nulls_last = True # PySpark default: nulls last
# Resolve column name using ColumnResolver if available columns are provided
if col_name is not None:
if available_columns:
from sparkless.core.column_resolver import ColumnResolver
# Get case sensitivity from session
case_sensitive = self._get_case_sensitive()
actual_col_name = ColumnResolver.resolve_column_name(
col_name, available_columns, case_sensitive
)
if actual_col_name:
col_name = actual_col_name
sort_by.append(col_name)
descending_flags.append(is_desc)
nulls_last_flags.append(nulls_last)
if sort_by:
# Use sort() with by, descending, and nulls_last parameters
# nulls_last can be None (default), True, or False
# We only pass nulls_last if at least one value is not None
has_nulls_specification = any(
n is not None for n in nulls_last_flags
)
if has_nulls_specification:
lazy_df = lazy_df.sort(
sort_by,
descending=descending_flags,
nulls_last=nulls_last_flags,
)
else:
# No nulls specification, use default
lazy_df = lazy_df.sort(sort_by, descending=descending_flags)
elif op_name == "limit":
# Limit operation
n = payload
lazy_df = lazy_df.head(n)
elif op_name == "offset":
# Offset operation (skip first n rows)
n = payload
lazy_df = lazy_df.slice(n)
elif op_name == "groupBy":
# GroupBy operation - need to collect first
# Use materialized DataFrame if available, otherwise collect from lazy
if df_materialized is not None:
df_collected = df_materialized
df_materialized = None # Clear after use
elif lazy_df is not None:
df_collected = lazy_df.collect()
else:
raise ValueError("No DataFrame available for groupBy operation")
group_by, aggs = payload
result_df = self.operation_executor.apply_group_by_agg(
df_collected, group_by, aggs
)
lazy_df = result_df.lazy()
elif op_name == "distinct":
# Distinct operation
# For distinct to work correctly with computed columns (like struct field paths),
# we need to ensure the DataFrame is materialized first
# This is especially important when distinct follows a select operation
# Use materialized DataFrame if available, otherwise collect from lazy
if df_materialized is not None:
df_collected = df_materialized
else:
df_collected = lazy_df.collect()
result_df = df_collected.unique()
lazy_df = result_df.lazy()
df_materialized = result_df
elif op_name == "drop":
# Drop operation - need to handle Polars lazy evaluation limitation
# Polars drops columns that depend on dropped columns during lazy evaluation
# Solution: materialize ALL columns before dropping, then re-select what we need
# This breaks the lazy dependency chain
columns_to_drop = (
payload if isinstance(payload, (list, tuple)) else [payload]
)
# Filter out non-existent columns (PySpark allows dropping non-existent columns silently)
# Resolve column names using ColumnResolver
from sparkless.core.column_resolver import ColumnResolver
available_cols_list = list(current_available_columns)
# Get case sensitivity from session
case_sensitive = self._get_case_sensitive()
existing_columns_to_drop = []
for col in columns_to_drop:
# Find actual column name using ColumnResolver
actual_col = ColumnResolver.resolve_column_name(
col, available_cols_list, case_sensitive
)
if actual_col:
existing_columns_to_drop.append(actual_col)
# If no columns to actually drop, skip this operation
if not existing_columns_to_drop:
continue
columns_to_drop = existing_columns_to_drop
# Find all columns that depend on the columns being dropped
columns_to_preserve: Set[str] = set()
for col_name, deps in column_dependencies.items():
# If this column depends on any column being dropped, we need to preserve it
if deps.intersection(columns_to_drop):
columns_to_preserve.add(col_name)
# Also check subsequent operations for columns they need
current_op_index = optimized_operations.index((op_name, payload))
for future_op_name, future_payload in optimized_operations[
current_op_index + 1 :
]:
if future_op_name == "withColumn":
col_name, expr = future_payload
# Extract all columns this expression depends on
expr_deps = (
LazyEvaluationEngine._extract_all_column_dependencies(expr)
)
# If expr depends on a column that depends on dropped columns, preserve it
for dep_col in expr_deps:
if dep_col in column_dependencies:
dep_deps = column_dependencies[dep_col]
if dep_deps.intersection(columns_to_drop):
columns_to_preserve.add(dep_col)
# Also check if dep_col is in current available columns
elif (
dep_col in current_available_columns
and dep_col in column_dependencies
and column_dependencies[dep_col].intersection(
columns_to_drop
)
):
# This column exists and is needed - preserve it if it depends on dropped columns
columns_to_preserve.add(dep_col)
# Always materialize before dropping if there are subsequent operations
# This ensures dependent columns are preserved even if dependency graph is incomplete
if current_op_index + 1 < len(optimized_operations):
# Collect current state to materialize all columns
# Use materialized DataFrame if available, otherwise collect from lazy
if df_materialized is not None:
df_collected = df_materialized
df_materialized = None # Clear after use
elif lazy_df is not None:
df_collected = lazy_df.collect()
else:
raise ValueError("No DataFrame available for drop operation")
# Drop columns using select to preserve schema correctly
# Using select instead of drop ensures schema is properly maintained
cols_to_keep = [
col
for col in df_collected.columns
if col not in columns_to_drop
]
# Handle edge case: dropping all columns
# PySpark preserves row count even when all columns are dropped
if not cols_to_keep:
# Create empty DataFrame with same number of rows
# Use select with empty column list to preserve row structure
row_count = df_collected.height
# Create a dummy column, select it, then drop it to preserve row count
# Actually, Polars can't represent a DataFrame with rows but no columns
# So we create a single dummy column with null values
df_collected = pl.DataFrame({"_dummy": [None] * row_count})
# But wait, PySpark behavior is to return empty schema but preserve rows
# For now, we'll use the dummy column approach and handle it in schema projection
# The schema will show no columns, but the row count will be preserved
else:
df_collected = df_collected.select(cols_to_keep)
# Store the materialized DataFrame instead of converting back to lazy
# This avoids schema mismatch issues when subsequent operations (like withColumn)
# need to collect again. They can use the materialized DataFrame directly.
# Only convert to lazy if the next operation requires it (like filter)
next_op_index = current_op_index + 1
if next_op_index < len(optimized_operations):
next_op_name, _ = optimized_operations[next_op_index]
# Operations that can work with lazy frames (don't collect)
lazy_ops = {
"filter",
"distinct",
"limit",
"offset",
"orderBy",
"sort",
}
if next_op_name in lazy_ops:
# Next operation works with lazy, so convert back to lazy
# Create a fresh lazy frame by converting to dicts and back to avoid schema issues
lazy_df = df_collected.lazy()
df_materialized = None
else:
# Next operation will collect anyway (withColumn, select, etc.)
# Keep it materialized to avoid schema issues when converting back to lazy
# Store the materialized DataFrame directly - no conversion to dicts needed
# as we'll use it directly in the next operation
df_materialized = df_collected
# Don't create lazy_df here - we'll use df_materialized directly
# Set lazy_df to None or keep it as is, but operations will check df_materialized first
lazy_df = (
df_collected.lazy()
) # Still need for final collection if no more ops
else:
# No more operations, convert back to lazy for final collection
lazy_df = df_collected.lazy()
df_materialized = None
elif columns_to_preserve:
# No future operations but we have columns to preserve
# Use materialized DataFrame if available, otherwise collect from lazy
if df_materialized is not None:
df_collected = df_materialized
df_materialized = None # Clear after use
elif lazy_df is not None:
df_collected = lazy_df.collect()
else:
raise ValueError("No DataFrame available for drop operation")
current_cols = set(df_collected.columns)
cols_to_keep = list(current_cols - set(columns_to_drop))
if not cols_to_keep:
# All columns dropped - preserve row count
row_count = df_collected.height
df_collected = pl.DataFrame({"_dummy": [None] * row_count})
else:
df_collected = df_collected.select(cols_to_keep)
lazy_df = df_collected.lazy()
else:
# No dependencies and no future operations - can drop directly
# But we still need to handle non-existent columns and all-columns case
# Use materialized DataFrame if available, otherwise collect from lazy
if df_materialized is not None:
df_collected = df_materialized
df_materialized = None # Clear after use
elif lazy_df is not None:
df_collected = lazy_df.collect()
else:
raise ValueError("No DataFrame available for drop operation")
cols_to_keep = [
col
for col in df_collected.columns
if col not in columns_to_drop
]
if not cols_to_keep:
# All columns dropped - preserve row count
row_count = df_collected.height
df_collected = pl.DataFrame({"_dummy": [None] * row_count})
else:
df_collected = df_collected.select(cols_to_keep)
# Store materialized for final collection
df_materialized = df_collected
lazy_df = None
# Update available columns after drop
for col in columns_to_drop:
current_available_columns.discard(col)
# Update schema after drop
from ...dataframe.schema.schema_manager import SchemaManager
# Get case sensitivity from session for schema projection
case_sensitive = self._get_case_sensitive()
current_schema = SchemaManager.project_schema_with_operations(
current_schema, [(op_name, payload)], case_sensitive
)
elif op_name == "withColumnRenamed":
# WithColumnRenamed operation
# If we have a materialized DataFrame, rename directly in it to preserve columns
# Otherwise, rename in the lazy DataFrame
old_name, new_name = payload
if df_materialized is not None:
# Rename directly in the materialized DataFrame to ensure all columns are preserved
df_materialized = df_materialized.rename({old_name: new_name})
# Update lazy_df to match (for consistency)
lazy_df = df_materialized.lazy()
# Keep df_materialized set if there are more operations
# This ensures columns are preserved for subsequent operations
next_op_index = current_op_index + 1
if next_op_index < len(optimized_operations):
next_op_name, _ = optimized_operations[next_op_index]
# Only convert to lazy for operations that explicitly need it
lazy_ops = {
"filter",
"distinct",
"limit",
"offset",
"orderBy",
"sort",
}
if next_op_name in lazy_ops:
# Next operation needs lazy - convert
lazy_df = df_materialized.lazy()
df_materialized = None
# Otherwise, keep df_materialized set for operations like select, withColumn, withColumnRenamed
# If no more operations, keep df_materialized for final collection
else:
# Rename in the lazy DataFrame
lazy_df = lazy_df.rename({old_name: new_name})
# Update schema and available columns after rename
from ...dataframe.schema.schema_manager import SchemaManager
# Get case sensitivity from session for schema projection
case_sensitive = self._get_case_sensitive()
current_schema = SchemaManager.project_schema_with_operations(
current_schema, [(op_name, payload)], case_sensitive
)
# Update available columns set
if old_name in current_available_columns:
current_available_columns.remove(old_name)
current_available_columns.add(new_name)
else:
raise ValueError(f"Unsupported operation: {op_name}")
# Materialize (collect) the lazy DataFrame
# Use materialized DataFrame if available, otherwise collect from lazy
if df_materialized is not None:
result_df = df_materialized
elif lazy_df is not None:
result_df = lazy_df.collect()
else:
# Should not happen, but handle gracefully
raise ValueError("No DataFrame to materialize")
# Convert to List[Row]
# Handle special case: if all columns were dropped, result_df may have a _dummy column
# We need to convert this to empty dicts to match PySpark's behavior
if "_dummy" in result_df.columns and len(result_df.columns) == 1:
# All columns were dropped - create empty rows preserving row count
rows = []
row_count = result_df.height
for _ in range(row_count):
rows.append(Row({}, schema=None))
return rows
# For joins with duplicate columns, Polars uses _right suffix.
# Build schema->polars column mapping so we produce Rows matching PySpark order.
rows = []
# Build mapping: schema field -> polars column (handles _right suffix for duplicates)
schema_to_polars: List[Tuple[str, str]] = []
name_occurrence: Dict[str, int] = {}
schema_has_duplicates = len(current_schema.fields) != len(
{f.name for f in current_schema.fields}
)
for field in current_schema.fields:
name = field.name
occ = name_occurrence.get(name, 0)
name_occurrence[name] = occ + 1
if occ == 0 and name in result_df.columns:
schema_to_polars.append((name, name))
elif f"{name}_right" in result_df.columns:
schema_to_polars.append((name, f"{name}_right"))
elif name in result_df.columns:
schema_to_polars.append((name, name))
else:
schema_to_polars.append((name, name))
# Convert Polars DataFrame to dicts and preserve date/timestamp types
# Polars to_dicts() converts dates to strings, we need to convert them back
import datetime as dt_module
from .type_mapper import polars_dtype_to_mock_type
from sparkless.spark_types import DateType, TimestampType
# Get column types from Polars DataFrame schema
polars_schema = result_df.schema
column_types = {
col: polars_dtype_to_mock_type(dtype)
for col, dtype in polars_schema.items()
}
for row_dict in result_df.to_dicts():
# When schema has duplicates, build Row from schema order so dict uses last-wins
if schema_has_duplicates and schema_to_polars:
ordered_pairs: List[Tuple[str, Any]] = []
for field_name, polars_col in schema_to_polars:
value = row_dict.get(polars_col)
col_type = column_types.get(polars_col)
if isinstance(value, dict):
ordered_pairs.append((field_name, value))
continue
if (
col_type is not None
and isinstance(col_type, DateType)
and isinstance(value, str)
):
with contextlib.suppress(ValueError, AttributeError):
value = dt_module.date.fromisoformat(value)
elif (
col_type is not None
and isinstance(col_type, TimestampType)
and isinstance(value, str)
):
with contextlib.suppress(ValueError, AttributeError):
value = dt_module.datetime.fromisoformat(
value.replace("Z", "+00:00") if "T" in value else value
)
ordered_pairs.append((field_name, value))
rows.append(Row(ordered_pairs, schema=None))
continue
# Convert date/timestamp strings back to date/datetime objects
# Polars to_dicts() converts dates to ISO format strings
converted_row_dict: Dict[str, Any] = {}
for col, value in row_dict.items():
# Always preserve dict values - they represent structs (from Object dtype or StructType)
if isinstance(value, dict):
converted_row_dict[col] = value
continue
col_type = column_types.get(col)
# Handle Object dtype - preserve Python dicts/objects as-is
# Check if this column has Object dtype in the Polars DataFrame
try:
if col in result_df.columns:
polars_dtype = result_df[col].dtype
if polars_dtype == pl.Object:
# This is an Object dtype column - preserve the value as-is
converted_row_dict[col] = value
continue
except (KeyError, AttributeError, IndexError):
# Column might not exist or dtype check failed - continue with normal processing
pass
# Also check for empty StructType (fallback for Object dtype mapping)
if (
col_type is not None
and isinstance(col_type, StructType)
and len(col_type.fields) == 0
):
# This is an Object dtype that was converted to empty StructType
# Preserve the value as-is (should be a dict)
converted_row_dict[col] = value
continue
elif isinstance(col_type, DateType) and isinstance(value, str):
# Convert ISO date string back to date object
try:
converted_row_dict[col] = dt_module.date.fromisoformat(value)
except (ValueError, AttributeError):
# If parsing fails, keep as string
converted_row_dict[col] = value
elif isinstance(col_type, TimestampType) and isinstance(value, str):
# Convert ISO timestamp string back to datetime object
try:
# Handle various ISO formats
if "T" in value:
converted_row_dict[col] = dt_module.datetime.fromisoformat(
value.replace("Z", "+00:00")
)
else:
converted_row_dict[col] = dt_module.datetime.fromisoformat(
value
)
except (ValueError, AttributeError):
# If parsing fails, keep as string
converted_row_dict[col] = value
else:
converted_row_dict[col] = value
# Create Row from dict - Row will handle the conversion
# The schema will be applied later in _convert_materialized_rows
rows.append(Row(converted_row_dict, schema=None))
return rows
def _has_window_function(self, expr: Any) -> bool:
"""Check if expression contains WindowFunction objects that require manual materialization.
Args:
expr: Expression to check (Column, ColumnOperation, WindowFunction, or nested structure)
Returns:
True if expression contains WindowFunction objects
"""
# Check if this is a WindowFunction directly
if hasattr(expr, "__class__") and expr.__class__.__name__ == "WindowFunction":
return True
# Also try isinstance check as backup
try:
from sparkless.functions.window_execution import WindowFunction
if isinstance(expr, WindowFunction):
return True
except (ImportError, AttributeError):
pass
# Recursively check nested expressions
if hasattr(expr, "column") and self._has_window_function(expr.column):
return True
if hasattr(expr, "value") and self._has_window_function(expr.value):
return True
return bool(
hasattr(expr, "function") and self._has_window_function(expr.function)
)
def _has_expr_expression(self, expr: Any) -> bool:
"""Check if expression contains F.expr() or complex operations that need manual materialization.
Args:
expr: Expression to check (Column, ColumnOperation, or nested structure)
Returns:
True if expression contains operations that require manual materialization
"""
# Do NOT reject expressions just because they came from F.expr(); we can translate
# them (e.g. "col IN (lit)" for Issue #370).
# Check if this is a ColumnOperation with expr operation
if hasattr(expr, "operation"):
# Check for direct expr operation (old F.expr() style)
if expr.operation == "expr":
return True
# Check for function_name="expr" (another F.expr() marker)
if hasattr(expr, "function_name") and expr.function_name == "expr":
return True
# Recursively check nested expressions
if hasattr(expr, "column") and self._has_expr_expression(expr.column):
return True
if hasattr(expr, "value") and self._has_expr_expression(expr.value):
return True
# Check if this is a Column (simple reference, no issue)
elif hasattr(expr, "name") and not hasattr(expr, "operation"):
return False
return False
def _has_unsupported_operation(self, expr: Any) -> bool:
"""Check if expression contains unsupported operations.
Args:
expr: Expression to check (Column, ColumnOperation, or nested structure)
Returns:
True if expression contains unsupported operations
"""
# Check if this is a ColumnOperation with an unsupported operation
if hasattr(expr, "operation") and expr.operation in self.UNSUPPORTED_OPERATIONS:
return True
# Recursively check nested expressions
if hasattr(expr, "column") and self._has_unsupported_operation(expr.column):
return True
if hasattr(expr, "value") and self._has_unsupported_operation(expr.value):
return True
return bool(
hasattr(expr, "function") and self._has_unsupported_operation(expr.function)
)
[docs]
def can_handle_operation(self, op_name: str, op_payload: Any) -> bool:
"""Check if this materializer can handle a specific operation.
Args:
op_name: Name of the operation (e.g., "to_timestamp", "filter")
op_payload: Operation payload (operation-specific)
Returns:
True if the materializer can handle this operation, False otherwise
"""
# Check unsupported operations first
if op_name in self.UNSUPPORTED_OPERATIONS:
return False
# For complex operations, inspect payload for unsupported nested operations
if op_name == "select":
# Payload is a list of column expressions
if isinstance(op_payload, (list, tuple)):
for col in op_payload:
# Window functions (incl. arithmetic) handled by operation_executor.apply_select
# Check for unsupported operations only
if self._has_unsupported_operation(col):
return False
return op_name in self.SUPPORTED_OPERATIONS
elif op_name == "withColumn":
# Payload is (col_name, expression)
if isinstance(op_payload, (list, tuple)) and len(op_payload) == 2:
_, expression = op_payload
# Window functions handled by operation_executor.apply_withColumn
if self._has_unsupported_operation(expression):
return False
return op_name in self.SUPPORTED_OPERATIONS
elif op_name == "filter":
# Payload is filter expression
# Check for F.expr() expressions
if self._has_expr_expression(op_payload):
return False
# Check for unsupported operations
if self._has_unsupported_operation(op_payload):
return False
return op_name in self.SUPPORTED_OPERATIONS
# For simple operations, check supported operations set
# Default: assume unsupported for safety
return op_name in self.SUPPORTED_OPERATIONS
[docs]
def can_handle_operations(
self, operations: List[Tuple[str, Any]]
) -> Tuple[bool, List[str]]:
"""Check if this materializer can handle a list of operations.
Args:
operations: List of (operation_name, payload) tuples
Returns:
Tuple of (can_handle_all, unsupported_operations)
- can_handle_all: True if all operations are supported
- unsupported_operations: List of operation names that are unsupported
"""
unsupported_operations: List[str] = []
for op_name, op_payload in operations:
if not self.can_handle_operation(op_name, op_payload):
unsupported_operations.append(op_name)
can_handle_all = len(unsupported_operations) == 0
return (can_handle_all, unsupported_operations)
[docs]
def materialize_from_plan(
self,
data: List[Dict[str, Any]],
schema: StructType,
logical_plan: List[Dict[str, Any]],
) -> List[Row]:
"""Materialize from a serialized logical plan using the Polars plan interpreter."""
from .plan_interpreter import execute_plan
return execute_plan(data, schema, logical_plan)
[docs]
def close(self) -> None:
"""Close the materializer and clean up resources."""
# Polars doesn't require explicit cleanup
pass