Source code for sparkless.functions.core.column

"""
Column implementation for Sparkless.

This module provides the Column class for DataFrame column operations,
maintaining compatibility with PySpark's Column interface.
"""

from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING, Union

from ...core.interfaces.functions import IColumn
from ...spark_types import DataType, StringType

if TYPE_CHECKING:
    from ...window import WindowSpec
    from ..conditional import CaseWhen
    from ..window_execution import WindowFunction
    from ..base import AggregateFunction
    from .literals import Literal


[docs] class ColumnOperatorMixin: """Mixin providing common operator methods for Column and ColumnOperation.""" if TYPE_CHECKING: @property def name(self) -> str: ... def _create_operation(self, operation: str, other: Any) -> "ColumnOperation": """Create a ColumnOperation with the given operation and other operand. Args: operation: The operation to perform (e.g., "==", "+", etc.) other: The other operand Returns: ColumnOperation instance """ return ColumnOperation(self, operation, other)
[docs] def __eq__(self, other: Any) -> "ColumnOperation": # type: ignore[override] """Equality comparison.""" return self._create_operation("==", other)
[docs] def eqNullSafe(self, other: Any) -> "ColumnOperation": """Null-safe equality comparison (PySpark eqNullSafe). This behaves like PySpark's eqNullSafe: - If both sides are null, the comparison is True. - If exactly one side is null, the comparison is False. - Otherwise, it behaves like standard equality, including any backend-specific type coercion rules. """ return self._create_operation("eqNullSafe", other)
[docs] def __ne__(self, other: Any) -> "ColumnOperation": # type: ignore[override] """Inequality comparison.""" return self._create_operation("!=", other)
[docs] def __lt__(self, other: Any) -> "ColumnOperation": """Less than comparison.""" return self._create_operation("<", other)
[docs] def __le__(self, other: Any) -> "ColumnOperation": """Less than or equal comparison.""" return self._create_operation("<=", other)
[docs] def __gt__(self, other: Any) -> "ColumnOperation": """Greater than comparison.""" return self._create_operation(">", other)
[docs] def __ge__(self, other: Any) -> "ColumnOperation": """Greater than or equal comparison.""" return self._create_operation(">=", other)
[docs] def __add__(self, other: Any) -> "ColumnOperation": """Addition operation.""" return self._create_operation("+", other)
[docs] def __sub__(self, other: Any) -> "ColumnOperation": """Subtraction operation.""" return self._create_operation("-", other)
[docs] def __mul__(self, other: Any) -> "ColumnOperation": """Multiplication operation.""" return self._create_operation("*", other)
[docs] def __truediv__(self, other: Any) -> "ColumnOperation": """Division operation.""" return self._create_operation("/", other)
[docs] def __mod__(self, other: Any) -> "ColumnOperation": """Modulo operation.""" return self._create_operation("%", other)
[docs] def __pow__(self, other: Any) -> "ColumnOperation": """Power operation (for `col ** 2`).""" return self._create_operation("**", other)
[docs] def __radd__(self, other: Any) -> "ColumnOperation": """Reverse addition operation (for `2 + col`).""" # For commutative operations, we can just swap operands return self._create_operation("+", other)
[docs] def __rsub__(self, other: Any) -> "ColumnOperation": """Reverse subtraction operation (for `2 - col`).""" # For non-commutative operations, create ColumnOperation with literal as left operand # This will evaluate as `other - self` which is correct for `2 - col` return ColumnOperation(other, "-", self)
[docs] def __rmul__(self, other: Any) -> "ColumnOperation": """Reverse multiplication operation (for `2 * col`).""" # For commutative operations, we can just swap operands return self._create_operation("*", other)
[docs] def __rtruediv__(self, other: Any) -> "ColumnOperation": """Reverse division operation (for `2 / col`).""" # For non-commutative operations, create ColumnOperation with literal as left operand # This will evaluate as `other / self` which is correct for `2 / col` return ColumnOperation(other, "/", self)
[docs] def __rmod__(self, other: Any) -> "ColumnOperation": """Reverse modulo operation (for `2 % col`).""" # For non-commutative operations, create ColumnOperation with literal as left operand # This will evaluate as `other % self` which is correct for `2 % col` return ColumnOperation(other, "%", self)
[docs] def __rpow__(self, other: Any) -> "ColumnOperation": """Reverse power operation (for `2 ** col` or `3.0 ** col`).""" # For non-commutative operations, create ColumnOperation with literal as left operand # This will evaluate as `other ** self` which is correct for `2 ** col` return ColumnOperation(other, "**", self)
[docs] def __and__(self, other: Any) -> "ColumnOperation": """Logical AND operation.""" return self._create_operation("&", other)
[docs] def __or__(self, other: Any) -> "ColumnOperation": """Logical OR operation.""" return self._create_operation("|", other)
[docs] def __invert__(self) -> "ColumnOperation": """Logical NOT operation.""" return self._create_operation("!", None)
[docs] def __neg__(self) -> "ColumnOperation": """Unary minus operation (-column).""" return self._create_operation("-", None)
[docs] def isnull(self) -> "ColumnOperation": """Check if column value is null.""" return self._create_operation("isnull", None)
[docs] def isnotnull(self) -> "ColumnOperation": """Check if column value is not null.""" return self._create_operation("isnotnull", None)
[docs] def isNull(self) -> "ColumnOperation": """Check if column value is null (PySpark compatibility).""" return self.isnull()
[docs] def isNotNull(self) -> "ColumnOperation": """Check if column value is not null (PySpark compatibility).""" return self.isnotnull()
[docs] def isin(self, *values: Any) -> "ColumnOperation": """Check if column value is in list of values. Args: *values: Variable number of values to check against. Can be passed as individual arguments (e.g., col.isin(1, 2, 3)) or as a single list (e.g., col.isin([1, 2, 3])) for backward compatibility. Supports automatic type coercion for mixed types (e.g., checking integers in a string column will convert values to strings). Returns: ColumnOperation representing the isin check. Example: >>> df.filter(F.col("value").isin(1, 2, 3)) >>> df.filter(F.col("value").isin([1, 2, 3])) # Also supported >>> df.filter(F.col("str_col").isin(1, 2, 3)) # Auto-converts to strings Note: Fixed in version 3.23.0 (Issue #226): Added support for ``*values`` arguments and automatic type coercion for mixed types to match PySpark behavior. """ # Normalize: if single list argument provided, use it directly # Otherwise convert *args to list if len(values) == 1 and isinstance(values[0], (list, tuple)): # Backward compatibility: single list/tuple argument normalized_values = list(values[0]) else: # Convert *args to list normalized_values = list(values) return self._create_operation("isin", normalized_values)
[docs] def between(self, lower: Any, upper: Any) -> "ColumnOperation": """Check if column value is between lower and upper bounds.""" return self._create_operation("between", (lower, upper))
[docs] def like(self, pattern: str) -> "ColumnOperation": """SQL LIKE pattern matching.""" return self._create_operation("like", pattern)
[docs] def rlike(self, pattern: str) -> "ColumnOperation": """Regular expression pattern matching.""" # Create operation with proper naming format: RLIKE(name, pattern) # Note: Pattern is used as-is without quotes to match PySpark format pattern_str = str(pattern) if not isinstance(pattern, str) else pattern return ColumnOperation( self, "rlike", pattern, name=f"RLIKE({self.name}, {pattern_str})" )
[docs] def contains(self, literal: str) -> "ColumnOperation": """Check if column contains the literal string.""" return self._create_operation("contains", literal)
[docs] def startswith(self, literal: str) -> "ColumnOperation": """Check if column starts with the literal string.""" return self._create_operation("startswith", literal)
[docs] def endswith(self, literal: str) -> "ColumnOperation": """Check if column ends with the literal string.""" return self._create_operation("endswith", literal)
[docs] def substr(self, start: int, length: int) -> "ColumnOperation": """Extract substring from string column. Args: start: Starting position (1-indexed, can be negative for reverse indexing). length: Length of substring (required). Returns: ColumnOperation representing the substr operation. Example: >>> df.select(F.col("name").substr(1, 2)) """ return self._create_operation("substr", (start, length))
[docs] def asc(self) -> "ColumnOperation": """Ascending sort order.""" return self._create_operation("asc", None)
[docs] def desc(self) -> "ColumnOperation": """Descending sort order.""" return self._create_operation("desc", None)
[docs] def desc_nulls_last(self) -> "ColumnOperation": """Descending sort order with nulls last.""" return self._create_operation("desc_nulls_last", None)
[docs] def desc_nulls_first(self) -> "ColumnOperation": """Descending sort order with nulls first.""" return self._create_operation("desc_nulls_first", None)
[docs] def asc_nulls_last(self) -> "ColumnOperation": """Ascending sort order with nulls last.""" return self._create_operation("asc_nulls_last", None)
[docs] def asc_nulls_first(self) -> "ColumnOperation": """Ascending sort order with nulls first.""" return self._create_operation("asc_nulls_first", None)
[docs] def cast(self, data_type: DataType) -> "ColumnOperation": """Cast column to different data type.""" return self._create_operation("cast", data_type)
[docs] def astype(self, data_type: Union[DataType, str]) -> "ColumnOperation": """Cast column to different data type (alias for cast). This method is an alias for cast() and matches PySpark's API. Args: data_type: The target data type (DataType object or string name like "date", "string", etc.). Returns: ColumnOperation representing the cast operation. Example: >>> df.select(F.col("name").astype("string")) >>> df.select(F.substring("date", 1, 10).astype("date")) """ # Note: cast() accepts both DataType and str in practice, despite type hint return self.cast(data_type) # type: ignore[arg-type]
[docs] def getItem(self, key: Any) -> "ColumnOperation": """Get item from array by index or map by key. Args: key: Index (int) for array access or key (any) for map access. Returns: ColumnOperation representing the getItem operation. Returns None for out-of-bounds array access (matching PySpark behavior). Example: >>> df.select(F.col("array_col").getItem(0)) >>> df.select(F.col("map_col").getItem("key")) >>> df.select(F.col("array_col").getItem(999)) # Returns None if out of bounds Note: Fixed in version 3.23.0 (Issue #227): Out-of-bounds array access now returns None instead of raising errors, matching PySpark behavior. """ return self._create_operation("getItem", key)
[docs] def withField( self, fieldName: str, col: Union["Column", "ColumnOperation", "Literal", Any], # type: ignore[name-defined,unused-ignore] ) -> "ColumnOperation": """Add or replace a field in a struct column. Args: fieldName: Name of the field to add or replace col: Column expression for the new field value. Can be a Column, ColumnOperation, Literal, or any value that will be converted to a Literal. Returns: ColumnOperation representing the withField operation. Example: >>> df.withColumn("my_struct", F.col("my_struct").withField("new_field", F.lit("value"))) >>> df.withColumn("my_struct", F.col("my_struct").withField("existing_field", F.col("other_col"))) Note: PySpark 3.1.0+ feature. Works only on struct columns. If field exists, it will be replaced. If it doesn't exist, it will be added. """ from .literals import Literal from ..window_execution import WindowFunction # Convert col to appropriate type (Issue #398: don't wrap WindowFunction in Literal) if isinstance(col, str): col = Column(col) elif isinstance(col, WindowFunction): # Keep WindowFunction as-is for evaluation in withField pass elif not isinstance(col, (Column, ColumnOperation, Literal)): # Wrap literals in Literal col = Literal(col) # Store field name and column in a dict for the operation value return ColumnOperation( self, "withField", value={"fieldName": fieldName, "column": col}, name=f"{self.name}.withField({fieldName}, ...)", )
[docs] class Column(ColumnOperatorMixin, IColumn): """Mock column expression for DataFrame operations. Provides a PySpark-compatible column expression that supports all comparison and logical operations. Used for creating complex DataFrame transformations and filtering conditions. """
[docs] def __init__(self, name: str, column_type: Optional[DataType] = None): """Initialize Column. Args: name: Column name. column_type: Optional data type. Defaults to StringType if not specified. """ self._name = name self._original_column: Optional[Column] = None self._alias_name: Optional[str] = None self.column_name = name self.column_type = column_type or StringType() self.operation = None self.operand = None self._operations: List[ColumnOperation] = [] # Add expr attribute for PySpark compatibility self.expr = f"Column('{name}')"
@property def name(self) -> str: """Get the column name (alias if set, otherwise original name).""" if hasattr(self, "_alias_name") and self._alias_name is not None: return self._alias_name return self._name @property def original_column(self) -> "Column": """Get the original column (for aliased columns).""" return getattr(self, "_original_column", self)
[docs] def __eq__(self, other: Any) -> "ColumnOperation": # type: ignore[override] """Equality comparison.""" if isinstance(other, Column): return ColumnOperation(self, "==", other) return ColumnOperation(self, "==", other)
[docs] def __hash__(self) -> int: """Hash method to make Column hashable.""" return hash((self.name, self.column_type))
[docs] def __getitem__(self, key: Any) -> Union["Column", "ColumnOperation"]: """Support subscript notation for struct field access and map lookup. Args: key: Field name (string) for struct field access, or Column for map lookup. Returns: For struct: New Column with the struct field path (e.g., "StructVal.E1"). For map: ColumnOperation getItem for map[key_column] lookup. Example: >>> F.col("StructVal")["E1"] # Returns Column("StructVal.E1") >>> F.col("map_col")[F.col("key_col")] # Map lookup by column (Issue #440) """ # Map lookup: map_col[other_col] -> getItem (Issue #440). Do NOT route int here - # struct columns use string keys only; struct[int] should raise TypeError. if isinstance(key, Column): return self.getItem(key) if not isinstance(key, str): raise TypeError( f"Column subscript access only supports string keys for struct fields, got {type(key).__name__}" ) # If this column has an alias (via _original_column), use the original column name # for the struct field path, not the alias name # e.g., F.col("StructVal").alias("SV")["E1"] -> Column("StructVal.E1"), not "SV.E1" base_column_name = self.name if hasattr(self, "_original_column") and self._original_column is not None: # Use the original column's name for the struct path base_column_name = self._original_column.name # Create a new Column with the struct field path # e.g., F.col("StructVal")["E1"] -> Column("StructVal.E1") field_path = f"{base_column_name}.{key}" return Column(field_path, self.column_type)
[docs] def __str__(self) -> str: """Return string representation of column for SQL generation.""" return self.name
[docs] def alias(self, name: str) -> "IColumn": """Create an alias for the column.""" aliased_column = Column(name, self.column_type) aliased_column._original_column = self aliased_column._alias_name = name return aliased_column
[docs] def getField( self, index_or_name: Union[int, str] ) -> Union["Column", "ColumnOperation"]: """Access array element by index or struct field by name (PySpark getField). Args: index_or_name: int for array index (same as getItem), str for struct field. Returns: Column for struct field path, ColumnOperation for array/map access. Example: >>> df.select(F.col("ArrayVal").getField(0)) >>> df.select(F.col("Person").getField("name")) """ if isinstance(index_or_name, str): return self[index_or_name] return self.getItem(index_or_name)
[docs] def when(self, condition: "ColumnOperation", value: Any) -> "CaseWhen": """Start a CASE WHEN expression.""" from ..conditional import CaseWhen return CaseWhen(self, condition, value)
[docs] def otherwise(self, value: Any) -> "CaseWhen": """End a CASE WHEN expression with default value.""" from ..conditional import CaseWhen return CaseWhen(self, None, value)
[docs] def over(self, window_spec: "WindowSpec") -> "WindowFunction": """Apply window function over window specification.""" from ..window_execution import WindowFunction return WindowFunction(self, window_spec)
[docs] def count(self) -> "ColumnOperation": """Count non-null values in this column. Returns: ColumnOperation representing the count operation. """ return ColumnOperation(self, "count", None)
[docs] def avg(self) -> "ColumnOperation": # noqa: F821 """Average values in this column. Returns: ColumnOperation representing the avg function (PySpark-compatible). """ from ..base import AggregateFunction from ...spark_types import DoubleType # Create AggregateFunction first to get correct name generation agg_func = AggregateFunction(self, "avg", DoubleType()) # Create ColumnOperation that wraps the aggregate function internally # This matches PySpark's behavior where aggregate functions return Column objects op = ColumnOperation(self, "avg", value=None, name=agg_func.name) # Store the aggregate function info for evaluation op._aggregate_function = agg_func return op
[docs] def sum(self) -> "ColumnOperation": # noqa: F821 """Sum values in this column. Returns: ColumnOperation representing the sum function (PySpark-compatible). """ from ..base import AggregateFunction from ...spark_types import DoubleType # Create AggregateFunction first to get correct name generation agg_func = AggregateFunction(self, "sum", DoubleType()) # Create ColumnOperation that wraps the aggregate function internally # This matches PySpark's behavior where aggregate functions return Column objects op = ColumnOperation(self, "sum", value=None, name=agg_func.name) # Store the aggregate function info for evaluation op._aggregate_function = agg_func return op
[docs] def max(self) -> "ColumnOperation": # noqa: F821 """Maximum value in this column. Returns: ColumnOperation representing the max function (PySpark-compatible). """ from ..base import AggregateFunction from ...spark_types import DoubleType # Create AggregateFunction first to get correct name generation agg_func = AggregateFunction(self, "max", DoubleType()) # Create ColumnOperation that wraps the aggregate function internally # This matches PySpark's behavior where aggregate functions return Column objects op = ColumnOperation(self, "max", value=None, name=agg_func.name) # Store the aggregate function info for evaluation op._aggregate_function = agg_func return op
[docs] def min(self) -> "ColumnOperation": # noqa: F821 """Minimum value in this column. Returns: ColumnOperation representing the min function (PySpark-compatible). """ from ..base import AggregateFunction from ...spark_types import DoubleType # Create AggregateFunction first to get correct name generation agg_func = AggregateFunction(self, "min", DoubleType()) # Create ColumnOperation that wraps the aggregate function internally # This matches PySpark's behavior where aggregate functions return Column objects op = ColumnOperation(self, "min", value=None, name=agg_func.name) # Store the aggregate function info for evaluation op._aggregate_function = agg_func return op
[docs] def stddev(self) -> "ColumnOperation": # noqa: F821 """Standard deviation of values in this column. Returns: ColumnOperation representing the stddev function (PySpark-compatible). """ from ..base import AggregateFunction from ...spark_types import DoubleType # Create AggregateFunction first to get correct name generation agg_func = AggregateFunction(self, "stddev", DoubleType()) # Create ColumnOperation that wraps the aggregate function internally # This matches PySpark's behavior where aggregate functions return Column objects op = ColumnOperation(self, "stddev", value=None, name=agg_func.name) # Store the aggregate function info for evaluation op._aggregate_function = agg_func return op
[docs] def variance(self) -> "ColumnOperation": # noqa: F821 """Variance of values in this column. Returns: ColumnOperation representing the variance function (PySpark-compatible). """ from ..base import AggregateFunction from ...spark_types import DoubleType # Create AggregateFunction first to get correct name generation agg_func = AggregateFunction(self, "variance", DoubleType()) # Create ColumnOperation that wraps the aggregate function internally # This matches PySpark's behavior where aggregate functions return Column objects op = ColumnOperation(self, "variance", value=None, name=agg_func.name) # Store the aggregate function info for evaluation op._aggregate_function = agg_func return op
[docs] def bitwise_not(self) -> "ColumnOperation": """Bitwise NOT operation on this column. Returns: ColumnOperation representing the bitwise_not function. """ # PySpark uses ~column for bitwise_not column names return ColumnOperation(self, "bitwise_not", name=f"~{self.name}")
[docs] class ColumnOperation(Column): """Represents a column operation (comparison, arithmetic, etc.). This class encapsulates column operations and their operands for evaluation during DataFrame operations. Inherits from Column to ensure isinstance() checks pass for PySpark compatibility. """
[docs] def __init__( self, column: Any, # Can be Column, ColumnOperation, IColumn, mixin, or None operation: str, value: Any = None, name: Optional[str] = None, ): """Initialize ColumnOperation. Args: column: The column being operated on (can be None for some operations). operation: The operation being performed. value: The value or operand for the operation. name: Optional custom name for the operation. """ # Set attributes needed for _generate_name() before calling super().__init__() # Store these temporarily since Column.__init__ will set operation to None self.column = column self.value = value # Store operation in a temporary variable to avoid mypy issues _operation = operation # Generate the name for the Column base class # We need to compute this before calling super().__init__() generated_name = name or ColumnOperation._generate_name_early_helper( column, _operation, value ) # Call super().__init__() with the generated name # This ensures ColumnOperation is a proper Column instance super().__init__(generated_name) # Set ColumnOperation-specific attributes AFTER super().__init__() # (Column.__init__ sets self.operation = None, so we override it here) # Type annotation ensures mypy knows this is always a string in ColumnOperation self.operation = _operation # type: ignore[assignment] self.function_name = _operation self.return_type: Optional[Any] = None # Type hint for return type # Override _name with the actual generated name (in case name was provided) if name is not None: self._name = name # Dynamic attributes for aggregate functions, UDFs, and window operations # These are set dynamically and may not always be present self._alias_names: Optional[Tuple[str, ...]] = None self._aggregate_function: Optional[AggregateFunction] = None self._udf_func: Optional[Any] = None self._udf_return_type: Optional[Any] = None self._udf_cols: Optional[List[Any]] = None self._is_pandas_udf: Optional[bool] = None self._is_table_udf: Optional[bool] = None self._window_duration: Optional[str] = None self._window_slide: Optional[str] = None self._window_start: Optional[str] = None # Ensure column_name is set (Column.__init__ sets it, but we want the operation name) self.column_name = self._name
@staticmethod def _generate_name_early_helper(column: Any, operation: str, value: Any) -> str: """Generate a name for this operation (static helper for use before super().__init__()). This is a static helper method that contains the same logic as _generate_name() but can be called before the Column base class is fully initialized. """ # Extract value from Literal if needed if hasattr(value, "value") and hasattr(value, "data_type"): # This is a Literal value_str = str(value.value) else: value_str = str(value) if value is not None else "None" # Handle column reference - use str() to get proper SQL for ColumnOperation if column is None: # For functions without column input (like current_date, current_timestamp) return operation + "()" # Handle Column objects properly if hasattr(column, "name"): column_ref = column.name else: # For ColumnOperation or other types, use string representation column_ref = str(column) if column is not None else "None" if operation == "bitwise_not": # PySpark uses ~column for bitwise_not return f"~{column_ref}" elif operation == "==": return f"{column_ref} = {value_str}" elif operation == "!=": return f"{column_ref} != {value_str}" elif operation == "eqNullSafe": # Use SQL-style null-safe equality operator semantics in the name return f"{column_ref} IS NOT DISTINCT FROM {value_str}" elif operation == "<": return f"{column_ref} < {value_str}" elif operation == "<=": return f"{column_ref} <= {value_str}" elif operation == ">": return f"{column_ref} > {value_str}" elif operation == ">=": return f"{column_ref} >= {value_str}" elif operation == "+": return f"({column_ref} + {value_str})" elif operation == "-": return f"({column_ref} - {value_str})" elif operation == "*": return f"({column_ref} * {value_str})" elif operation == "/": return f"({column_ref} / {value_str})" elif operation == "%": return f"({column_ref} % {value_str})" elif operation == "&": return f"({column_ref} & {value_str})" elif operation == "|": return f"({column_ref} | {value_str})" elif operation == "!": return f"(CASE WHEN {column_ref} THEN FALSE ELSE TRUE END)" elif operation == "isnull": if column is None or not hasattr(column, "name"): return "IS NULL" return f"{column.name} IS NULL" elif operation == "isnotnull": if column is None or not hasattr(column, "name"): return "IS NOT NULL" return f"{column.name} IS NOT NULL" elif operation == "isin": if column is None or not hasattr(column, "name"): return f"IN {value}" return f"{column.name} IN {value}" elif operation == "between": if column is None or not hasattr(column, "name"): if value is None: return "BETWEEN NULL AND NULL" return f"BETWEEN {value[0]} AND {value[1]}" if value is None: return f"{column.name} BETWEEN NULL AND NULL" return f"{column.name} BETWEEN {value[0]} AND {value[1]}" elif operation == "like": if column is None or not hasattr(column, "name"): return f"LIKE {value}" return f"{column.name} LIKE {value}" elif operation == "rlike": if column is None or not hasattr(column, "name"): return f"RLIKE {value}" return f"{column.name} RLIKE {value}" elif operation == "asc": if column is None or not hasattr(column, "name"): return "ASC" return f"{column.name} ASC" elif operation == "desc": if column is None or not hasattr(column, "name"): return "DESC" return f"{column.name} DESC" elif operation == "desc_nulls_last": if column is None or not hasattr(column, "name"): return "DESC NULLS LAST" return f"{column.name} DESC NULLS LAST" elif operation == "desc_nulls_first": if column is None or not hasattr(column, "name"): return "DESC NULLS FIRST" return f"{column.name} DESC NULLS FIRST" elif operation == "asc_nulls_last": if column is None or not hasattr(column, "name"): return "ASC NULLS LAST" return f"{column.name} ASC NULLS LAST" elif operation == "asc_nulls_first": if column is None or not hasattr(column, "name"): return "ASC NULLS FIRST" return f"{column.name} ASC NULLS FIRST" elif operation == "cast": # Map PySpark type names to DuckDB/SQL type names (DuckDB backend only) type_mapping = { "int": "INTEGER", "integer": "INTEGER", "long": "BIGINT", "bigint": "BIGINT", "double": "DOUBLE", "float": "FLOAT", "string": "VARCHAR", "varchar": "VARCHAR", "boolean": "BOOLEAN", "bool": "BOOLEAN", "date": "DATE", "timestamp": "TIMESTAMP", } if column is None or not hasattr(column, "name"): col_name = "column" else: col_name = column.name if isinstance(value, str): sql_type = type_mapping.get(value.lower(), value.upper()) else: # If value is a DataType, use its SQL representation sql_type = str(value) return f"CAST({col_name} AS {sql_type})" elif operation == "from_unixtime": # Handle from_unixtime function properly if column is None or not hasattr(column, "name"): col_name = "column" else: col_name = column.name if value is not None: return f"from_unixtime({col_name}, '{value}')" else: return f"from_unixtime({col_name})" elif operation == "array_sort": # Handle array_sort -> LIST_SORT or LIST_REVERSE_SORT if column is None or not hasattr(column, "name"): col_name = "column" else: col_name = column.name asc = value if isinstance(value, bool) else True if asc: return f"LIST_SORT({col_name})" else: return f"LIST_REVERSE_SORT({col_name})" elif operation == "array_reverse": if column is None or not hasattr(column, "name"): col_name = "column" else: col_name = column.name return f"LIST_REVERSE({col_name})" elif operation == "array_size": if column is None or not hasattr(column, "name"): col_name = "column" else: col_name = column.name return f"LEN({col_name})" elif operation == "array_max": if column is None or not hasattr(column, "name"): col_name = "column" else: col_name = column.name return f"LIST_MAX({col_name})" elif operation == "array_min": if column is None or not hasattr(column, "name"): col_name = "column" else: col_name = column.name return f"LIST_MIN({col_name})" elif operation == "struct": # Generate struct name from columns/literals in value if value is not None and isinstance(value, (list, tuple)): col_names = [] for col in value: if hasattr(col, "value") and hasattr(col, "data_type"): # It's a Literal col_names.append(str(col.value)) elif hasattr(col, "name"): col_names.append(col.name) else: col_names.append(str(col)) # Also include the first column if it's not a dummy if ( column and hasattr(column, "name") and column.name != "__struct_dummy__" ): if hasattr(column, "value") and hasattr(column, "data_type"): col_names.insert(0, str(column.value)) else: col_names.insert(0, column.name) return f"struct({', '.join(col_names)})" # Fallback to default return "struct(...)" else: # For aggregate functions and other operations, generate a standard name if column is not None and hasattr(column, "name"): return f"{operation}({column.name})" else: return f"{operation}({column_ref})" def _generate_name_early(self) -> str: """Generate a name for this operation (called before super().__init__()). This method delegates to the static helper method. """ # Use the instance attributes which are set after super().__init__() # self.operation is guaranteed to be a string in ColumnOperation op_str: str = self.operation # type: ignore[assignment] return ColumnOperation._generate_name_early_helper( self.column, op_str, self.value ) @property def name(self) -> str: """Get column name.""" # If there's an alias, use it if hasattr(self, "_alias_name") and self._alias_name: return self._alias_name # For cast operations, PySpark keeps the original column name # self.operation is guaranteed to be a string in ColumnOperation op_str: str = self.operation # type: ignore[assignment] if ( op_str == "cast" and hasattr(self, "column") and hasattr(self.column, "name") ): col_name = getattr(self.column, "name", "") return col_name if isinstance(col_name, str) else str(col_name) # If _name was explicitly set (e.g., by datetime functions), use it # Check this BEFORE falling back to SQL representation for datetime operations # This ensures PySpark-style names (like "year(hire_date)") are used instead of SQL if self._name: # Check if _name was explicitly provided and differs from what _generate_name() would produce # For datetime functions, this will be the PySpark-style name like "year(hire_date)" generated_name = self._generate_name() if self._name != generated_name: return self._name # Also check if _name differs from SQL representation (str(self)) # This catches cases where _name was set to PySpark-style but str(self) returns SQL-style sql_repr = str(self) if self._name != sql_repr: return self._name # For datetime and comparison operations, use the SQL representation # But only if _name wasn't explicitly set to a different value if op_str in [ "hour", "minute", "second", "year", "month", "day", "dayofmonth", "dayofweek", "dayofyear", "weekofyear", "quarter", "to_date", "to_timestamp", "==", "!=", "<", ">", "<=", ">=", ]: return str(self) return self._name @name.setter def name(self, value: str) -> None: """Set column name.""" self._name = value
[docs] def __str__(self) -> str: """Generate SQL representation of this operation.""" # For datetime functions, generate proper SQL if self.operation in ["hour", "minute", "second"]: return f"extract({self.operation} from TRY_CAST({self.column.name} AS TIMESTAMP))" elif self.operation in ["year", "month", "day", "dayofmonth"]: part = "day" if self.operation == "dayofmonth" else self.operation return f"extract({part} from TRY_CAST({self.column.name} AS DATE))" elif self.operation in ["dayofweek", "dayofyear", "weekofyear", "quarter"]: part_map: Dict[str, str] = { "dayofweek": "dow", "dayofyear": "doy", "weekofyear": "week", "quarter": "quarter", } # self.operation is guaranteed to be a string in ColumnOperation op_str: str = self.operation # type: ignore[assignment] part = part_map.get(op_str, op_str) # PySpark dayofweek returns 1-7 (Sunday=1, Saturday=7) # DuckDB DOW returns 0-6 (Sunday=0, Saturday=6) - NOTE: DuckDB backend only # Add 1 to dayofweek to match PySpark if self.operation == "dayofweek": return f"CAST(extract({part} from TRY_CAST({self.column.name} AS DATE)) + 1 AS INTEGER)" else: return f"CAST(extract({part} from TRY_CAST({self.column.name} AS DATE)) AS INTEGER)" elif self.operation in ["to_date", "to_timestamp"]: if self.value is not None: return f"STRPTIME({self.column.name}, '{self.value}')" else: target_type = "DATE" if self.operation == "to_date" else "TIMESTAMP" return f"TRY_CAST({self.column.name} AS {target_type})" elif self.operation in ["==", "!=", "<", ">", "<=", ">=", "eqNullSafe"]: # For comparison operations, generate proper SQL left = ( str(self.column) if hasattr(self.column, "__str__") else self.column.name ) right = str(self.value) if self.value is not None else "NULL" if self.operation == "eqNullSafe": # Implement null-safe equality using SQL's IS NOT DISTINCT FROM, # which treats NULL = NULL as TRUE and NULL compared to non-NULL as FALSE. return f"({left} IS NOT DISTINCT FROM {right})" return f"({left} {self.operation} {right})" elif self.operation == "cast": # For cast operations, use the generated name which handles proper SQL syntax return self._generate_name() else: # For other operations, use the generated name return self._generate_name()
def _generate_name(self) -> str: """Generate a name for this operation. This method delegates to _generate_name_early() which contains the actual implementation. This allows the same logic to be used both before and after super().__init__() is called. """ return self._generate_name_early()
[docs] def alias(self, *alias_names: str) -> "ColumnOperation": """Create an alias for this operation (PySpark: one or more names, e.g. posexplode).""" if not alias_names: raise ValueError("alias() requires at least one name") # self.operation is guaranteed to be a string in ColumnOperation op_str: str = self.operation # type: ignore[assignment] first_name = alias_names[0] aliased_operation = ColumnOperation( self.column, op_str, self.value, name=self._name ) aliased_operation._alias_name = first_name aliased_operation._alias_names = tuple(alias_names) # Preserve _aggregate_function if present (for PySpark-compatible aggregate functions) if hasattr(self, "_aggregate_function"): aliased_operation._aggregate_function = self._aggregate_function return aliased_operation
[docs] def getField(self, index_or_name: Union[int, str]) -> "ColumnOperation": """Access array element by index or struct field by name (PySpark getField).""" return self._create_operation("getField", index_or_name)
[docs] def over(self, window_spec: "WindowSpec") -> "WindowFunction": """Apply window function over window specification.""" from ..window_execution import WindowFunction return WindowFunction(self, window_spec)