"""
Conditional functions for Sparkless.
This module contains conditional functions including CASE WHEN expressions.
"""
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Tuple, Union, cast
from sparkless.functions.base import Column, ColumnOperation
from sparkless.core.condition_evaluator import ConditionEvaluator
from sparkless.core.type_utils import get_expression_name
from sparkless.spark_types import get_row_value
if TYPE_CHECKING:
from sparkless.spark_types import DataType
from sparkless.functions.aggregate import AggregateFunction
[docs]
def validate_rule(
column: Union[Column, str], rule: Union[str, List[Any]]
) -> ColumnOperation:
"""Convert validation rule to column expression.
Args:
column: The column to validate.
rule: Validation rule as string or list.
Returns:
Column expression for the validation rule.
Raises:
ValueError: If rule is not recognized.
"""
if isinstance(column, str):
column = Column(column)
if isinstance(rule, str):
# String rules
if rule == "not_null":
return column.isNotNull()
elif rule == "positive":
return column > 0
elif rule == "non_negative":
return column >= 0
elif rule == "negative":
return column < 0
elif rule == "non_positive":
return column <= 0
elif rule == "non_zero":
return column != 0
elif rule == "zero":
return column == 0
else:
raise ValueError(f"Unknown string validation rule: {rule}")
elif isinstance(rule, list):
# List rules: ["operator", arg1, arg2, ...]
if not rule:
raise ValueError("Empty rule list")
op = rule[0]
if op == "gt":
if len(rule) < 2:
raise ValueError("gt rule requires a value")
return cast("ColumnOperation", column > rule[1])
elif op == "gte":
if len(rule) < 2:
raise ValueError("gte rule requires a value")
return cast("ColumnOperation", column >= rule[1])
elif op == "lt":
if len(rule) < 2:
raise ValueError("lt rule requires a value")
return cast("ColumnOperation", column < rule[1])
elif op == "lte":
if len(rule) < 2:
raise ValueError("lte rule requires a value")
return cast("ColumnOperation", column <= rule[1])
elif op == "eq":
if len(rule) < 2:
raise ValueError("eq rule requires a value")
return cast("ColumnOperation", column == rule[1])
elif op == "ne":
if len(rule) < 2:
raise ValueError("ne rule requires a value")
return cast("ColumnOperation", column != rule[1])
elif op == "between":
if len(rule) < 3:
raise ValueError("between rule requires two values")
return column.between(rule[1], rule[2])
elif op == "in":
if len(rule) < 2:
raise ValueError("in rule requires a list of values")
return column.isin(rule[1])
elif op == "not_in":
if len(rule) < 2:
raise ValueError("not_in rule requires a list of values")
return ~column.isin(rule[1])
elif op == "contains":
if len(rule) < 2:
raise ValueError("contains rule requires a value")
return column.contains(rule[1])
elif op == "starts_with":
if len(rule) < 2:
raise ValueError("starts_with rule requires a value")
return column.startswith(rule[1])
elif op == "ends_with":
if len(rule) < 2:
raise ValueError("ends_with rule requires a value")
return column.endswith(rule[1])
elif op == "regex":
if len(rule) < 2:
raise ValueError("regex rule requires a pattern")
return column.rlike(rule[1])
else:
raise ValueError(f"Unknown list validation rule: {op}")
else:
raise ValueError(f"Unknown validation rule type: {type(rule)}")
[docs]
class CaseWhen:
"""Represents a CASE WHEN expression.
This class handles complex conditional logic with multiple conditions
and default values, similar to SQL CASE WHEN statements.
"""
[docs]
def __init__(self, column: Any = None, condition: Any = None, value: Any = None):
"""Initialize CaseWhen.
Args:
column: The column or expression being evaluated.
condition: The condition for this case.
value: The value to return if condition is true.
"""
self.column = column
self.conditions: List[Tuple[Any, Any]] = []
self.default_value: Any = None
if condition is not None and value is not None:
self.conditions.append((condition, value))
# Generate a meaningful name from the condition and value
# This will be updated later when otherwise() is called
self.name = "CASE WHEN"
@property
def else_value(self) -> Any:
"""Get the else value (alias for default_value for compatibility)."""
return self.default_value
@else_value.setter
def else_value(self, value: Any) -> None:
"""Set the else value (alias for default_value for compatibility)."""
self.default_value = value
[docs]
def when(self, condition: Any, value: Any) -> "CaseWhen":
"""Add another WHEN condition.
Args:
condition: The condition to check.
value: The value to return if condition is true.
Returns:
Self for method chaining.
"""
self.conditions.append((condition, value))
return self
[docs]
def otherwise(self, value: Any) -> "CaseWhen":
"""Set the default value for the CASE WHEN expression.
Args:
value: The default value to return if no conditions match.
Returns:
Self for method chaining.
"""
self.default_value = value
# Generate full SQL expression for the name
# Format: CASE WHEN (condition) THEN value ELSE otherwise END
if self.conditions:
condition, then_value = self.conditions[0]
condition_str = (
str(condition) if hasattr(condition, "__str__") else str(condition)
)
name = f"CASE WHEN ({condition_str}) THEN {then_value} ELSE {value} END"
self.name = name
return self
[docs]
def alias(self, name: str) -> "CaseWhen":
"""Create an alias for the CASE WHEN expression.
Args:
name: The alias name.
Returns:
Self for method chaining.
"""
self.name = name
return self
[docs]
def cast(self, data_type: Any) -> ColumnOperation:
"""Cast the CASE WHEN expression to a different data type.
Args:
data_type: The target data type (DataType instance or string type name).
Returns:
ColumnOperation representing the cast operation.
Example:
>>> F.when(F.col("value") == "A", F.lit(100)).otherwise(F.lit(200)).cast("long")
"""
return ColumnOperation(self, "cast", data_type)
def _create_operation(self, operation: str, other: Any) -> ColumnOperation:
"""Create a ColumnOperation with the given operation and other operand.
Args:
operation: The operation to perform (e.g., "+", "-", "|", etc.)
other: The other operand
Returns:
ColumnOperation instance
"""
return ColumnOperation(self, operation, other)
[docs]
def __add__(self, other: Any) -> ColumnOperation:
"""Addition operation (PySpark-compatible)."""
return self._create_operation("+", other)
[docs]
def __sub__(self, other: Any) -> ColumnOperation:
"""Subtraction operation (PySpark-compatible)."""
return self._create_operation("-", other)
[docs]
def __mul__(self, other: Any) -> ColumnOperation:
"""Multiplication operation (PySpark-compatible)."""
return self._create_operation("*", other)
[docs]
def __truediv__(self, other: Any) -> ColumnOperation:
"""Division operation (PySpark-compatible)."""
return self._create_operation("/", other)
[docs]
def __mod__(self, other: Any) -> ColumnOperation:
"""Modulo operation (PySpark-compatible)."""
return self._create_operation("%", other)
[docs]
def __radd__(self, other: Any) -> ColumnOperation:
"""Reverse addition operation (for `2 + case_when`)."""
# For commutative operations, we can just swap operands
return self._create_operation("+", other)
[docs]
def __rsub__(self, other: Any) -> ColumnOperation:
"""Reverse subtraction operation (for `2 - case_when`)."""
# For non-commutative operations, create ColumnOperation with literal as left operand
return ColumnOperation(other, "-", self)
[docs]
def __rmul__(self, other: Any) -> ColumnOperation:
"""Reverse multiplication operation (for `2 * case_when`)."""
# For commutative operations, we can just swap operands
return self._create_operation("*", other)
[docs]
def __rtruediv__(self, other: Any) -> ColumnOperation:
"""Reverse division operation (for `2 / case_when`)."""
# For non-commutative operations, create ColumnOperation with literal as left operand
return ColumnOperation(other, "/", self)
[docs]
def __rmod__(self, other: Any) -> ColumnOperation:
"""Reverse modulo operation (for `2 % case_when`)."""
# For non-commutative operations, create ColumnOperation with literal as left operand
return ColumnOperation(other, "%", self)
[docs]
def __or__(self, other: Any) -> ColumnOperation:
"""Bitwise OR operation (PySpark-compatible)."""
return self._create_operation("|", other)
[docs]
def __and__(self, other: Any) -> ColumnOperation:
"""Bitwise AND operation (PySpark-compatible)."""
return self._create_operation("&", other)
[docs]
def __invert__(self) -> ColumnOperation:
"""Bitwise NOT operation (unary ~, PySpark-compatible)."""
return ColumnOperation(self, "~", None)
[docs]
def evaluate(self, row: Dict[str, Any]) -> Any:
"""Evaluate the CASE WHEN expression for a given row.
Args:
row: The data row to evaluate against.
Returns:
The evaluated result.
"""
# Evaluate conditions in order
for condition, value in self.conditions:
if self._evaluate_condition(row, condition):
return self._evaluate_value(row, value)
# Return default value if no condition matches
return self._evaluate_value(row, self.default_value)
[docs]
def get_result_type(self) -> "DataType":
"""Infer the result type from condition values."""
from ..spark_types import (
BooleanType,
IntegerType,
StringType,
DoubleType,
LongType,
)
from .core.literals import Literal
# Check all condition values and default value
all_values = [v for _, v in self.conditions]
if self.default_value is not None:
all_values.append(self.default_value)
# Check if all values are literals (which are never nullable)
all_literals = all(
isinstance(val, Literal) or val is None for val in all_values
)
for val in all_values:
if val is not None:
if isinstance(val, Literal):
# For Literal, create a new instance with correct nullable
data_type = val.data_type
if isinstance(data_type, BooleanType):
return BooleanType(
nullable=False
) # Literals are never nullable
elif isinstance(data_type, IntegerType):
return IntegerType(
nullable=False
) # Literals are never nullable
elif isinstance(data_type, DoubleType):
return DoubleType(nullable=False) # Literals are never nullable
elif isinstance(data_type, StringType):
return StringType(nullable=False) # Literals are never nullable
else:
# For other types, create with correct nullable
return data_type.__class__(
nullable=False
) # Literals are never nullable
elif isinstance(val, bool):
return BooleanType(nullable=False) # Literals are never nullable
elif isinstance(val, int):
return IntegerType(nullable=False) # Literals are never nullable
elif isinstance(val, float):
return DoubleType(nullable=False) # Literals are never nullable
elif isinstance(val, str):
return StringType(nullable=False) # Literals are never nullable
elif hasattr(val, "operation") and hasattr(val, "column"):
# Handle ColumnOperation - check the operation type
if val.operation in ["+", "-", "*", "/", "%", "abs"]:
# Arithmetic operations return LongType
return LongType(nullable=False)
elif val.operation in ["round"]:
# Round operations return DoubleType
return DoubleType(nullable=False)
else:
# Default to StringType for other operations
return StringType(nullable=False)
# Default to LongType for arithmetic operations, not BooleanType
return LongType(nullable=not all_literals)
def _evaluate_condition(self, row: Dict[str, Any], condition: Any) -> bool:
"""Evaluate a condition for a given row.
Delegates to shared ConditionEvaluator for consistency.
Args:
row: The data row to evaluate against.
condition: The condition to evaluate.
Returns:
True if condition is met, False otherwise.
"""
from sparkless.core.condition_evaluator import ConditionEvaluator
result = ConditionEvaluator.evaluate_condition(row, condition)
return bool(result)
def _evaluate_value(self, row: Dict[str, Any], value: Any) -> Any:
"""Evaluate a value for a given row.
Args:
row: The data row to evaluate against.
value: The value to evaluate.
Returns:
The evaluated value.
"""
from .core.literals import Literal
if isinstance(value, Literal):
# For Literal, return the actual value
return value.value
elif hasattr(value, "operation") and hasattr(value, "column"):
# Handle ColumnOperation (e.g., unary minus, arithmetic operations)
from sparkless.functions.base import ColumnOperation
if isinstance(value, ColumnOperation):
return self._evaluate_column_operation_value(row, value)
elif hasattr(value, "name"):
return get_row_value(row, value.name)
elif hasattr(value, "value"):
return value.value
else:
return value
def _evaluate_column_operation_value(
self, row: Dict[str, Any], operation: Any
) -> Any:
"""Evaluate a column operation for a value.
Args:
row: The data row.
operation: The column operation to evaluate.
Returns:
The evaluated result.
"""
if operation.operation == "-" and operation.value is None:
# Unary minus operation
left_value = ConditionEvaluator._get_column_value(row, operation.column)
if left_value is None:
return None
return -left_value
elif operation.operation == "+" and operation.value is None:
# Unary plus operation (just return the value)
return ConditionEvaluator._get_column_value(row, operation.column)
elif operation.operation in ["+", "-", "*", "/", "%"]:
# Binary arithmetic operations
left_value = ConditionEvaluator._get_column_value(row, operation.column)
right_value = ConditionEvaluator._get_column_value(row, operation.value)
if left_value is None or right_value is None:
return None
if operation.operation == "+":
return left_value + right_value
elif operation.operation == "-":
return left_value - right_value
elif operation.operation == "*":
return left_value * right_value
elif operation.operation == "/":
return left_value / right_value if right_value != 0 else None
elif operation.operation == "%":
return left_value % right_value if right_value != 0 else None
else:
# For other operations, try to get the column value
return ConditionEvaluator._get_column_value(row, operation.column)
[docs]
class ConditionalFunctions:
"""Collection of conditional functions."""
[docs]
@staticmethod
def coalesce(*columns: Union[Column, str, Any]) -> ColumnOperation:
"""Return the first non-null value from a list of columns.
Args:
*columns: Variable number of columns or values to check.
Returns:
ColumnOperation representing the coalesce function.
"""
# Convert string columns to Column objects
mock_columns = []
for col in columns:
if isinstance(col, str):
mock_columns.append(Column(col))
else:
mock_columns.append(col)
# Create operation with first column as base
operation = ColumnOperation(mock_columns[0], "coalesce", mock_columns[1:])
# Generate column name, handling Literals specially
# get_expression_name is imported at module level
name_parts = [get_expression_name(c) for c in mock_columns]
operation.name = f"coalesce({', '.join(name_parts)})"
return operation
[docs]
@staticmethod
def isnull(column: Union[Column, str]) -> ColumnOperation:
"""Check if a column is null.
Args:
column: The column to check.
Returns:
ColumnOperation representing the isnull function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "isnull", name=f"({column.name} IS NULL)")
return operation
[docs]
@staticmethod
def isnotnull(column: Union[Column, str]) -> ColumnOperation:
"""Check if a column is not null.
Args:
column: The column to check.
Returns:
ColumnOperation representing the isnotnull function.
"""
if isinstance(column, str):
column = Column(column)
# PySpark's isnotnull is implemented as ~isnull, so it generates (NOT (column IS NULL))
operation = ColumnOperation(
column, "isnotnull", name=f"(NOT ({column.name} IS NULL))"
)
return operation
[docs]
@staticmethod
def isnan(column: Union[Column, str]) -> ColumnOperation:
"""Check if a column is NaN (Not a Number).
Args:
column: The column to check.
Returns:
ColumnOperation representing the isnan function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "isnan")
operation.name = f"isnan({column.name})"
return operation
[docs]
@staticmethod
def when(condition: Any, value: Any = None) -> CaseWhen:
"""Start a CASE WHEN expression.
Args:
condition: The initial condition.
value: Optional value for the condition.
Returns:
CaseWhen object for chaining.
"""
if value is not None:
return CaseWhen(condition=condition, value=value)
return CaseWhen(condition=condition)
[docs]
@staticmethod
def assert_true(
condition: Union[Column, ColumnOperation, str], # str may be passed at runtime
) -> ColumnOperation:
"""Assert that a condition is true, raises error if false.
Args:
condition: Boolean condition to assert.
Returns:
ColumnOperation representing the assert_true function.
Example:
>>> df.select(F.assert_true(F.col("value") > 0))
"""
from sparkless.functions import Column, ColumnOperation
from sparkless.core.type_utils import (
is_column,
is_column_operation,
get_expression_name,
)
if is_column(condition):
col = condition
value: Optional[ColumnOperation] = None
elif is_column_operation(condition):
# Type guard narrows condition to ColumnOperation
# Cast to help mypy understand the type narrowing in Python 3.9
col_op = cast("ColumnOperation", condition) # type: ignore[redundant-cast,unused-ignore]
col = col_op.column
value = col_op
elif isinstance(condition, str):
col = Column(condition)
value = None
else:
# This branch should not be reached due to type annotation
# Union[Column, ColumnOperation, str] is exhaustive
assert False, f"Unexpected condition type: {type(condition)}"
name_str = (
get_expression_name(condition)
if not isinstance(condition, str)
else condition
)
# col is guaranteed to be a Column after the if/elif/elif branches
# No need for additional isinstance check
return ColumnOperation(
col,
"assert_true",
value,
name=f"assert_true({name_str})",
)
# Priority 2: Conditional/Null Functions
[docs]
@staticmethod
def ifnull(col1: Union[Column, str], col2: Union[Column, str]) -> ColumnOperation:
"""Alias for coalesce(col1, col2) - Returns col2 if col1 is null (PySpark 3.5+).
Args:
col1: First column.
col2: Second column (replacement for null).
Returns:
ColumnOperation representing the ifnull function.
"""
return ConditionalFunctions.coalesce(col1, col2)
[docs]
@staticmethod
def equal_null(
col1: Union[Column, str], col2: Union[Column, str, Any]
) -> ColumnOperation:
"""Equality check that treats NULL as equal.
Args:
col1: First column or value.
col2: Second column or value.
Returns:
ColumnOperation representing the equal_null function.
"""
if isinstance(col1, str):
col1 = Column(col1)
if isinstance(col2, str):
col2 = Column(col2)
operation = ColumnOperation(
col1,
"equal_null",
col2,
name=f"equal_null({col1.name}, {col2.name if hasattr(col2, 'name') else col2})",
)
return operation
[docs]
@staticmethod
def nullif(col1: Union[Column, str], col2: Any) -> ColumnOperation:
"""Returns null if col1 equals col2, otherwise returns col1 (PySpark 3.5+).
Args:
col1: First column.
col2: Column, column name, or literal value to compare.
Returns:
ColumnOperation representing the nullif function.
"""
from typing import Union, Any
from ..functions.core.literals import Literal
column1 = Column(col1) if isinstance(col1, str) else col1
# col2 can be a column, column name, or literal value
column2: Union[Literal, Column, Any]
if isinstance(col2, (int, float, bool, type(None))):
# It's a literal value
column2 = Literal(col2)
elif isinstance(col2, str):
# It's a column name (str not in literal tuple above)
column2 = Column(col2)
else:
# It's already a Column or ColumnOperation
column2 = col2
# Get proper name for the column expression
col2_name = column2.name if hasattr(column2, "name") else str(column2)
# Use NULLIF function for DuckDB (DuckDB backend only)
return ColumnOperation(
column1,
"nullif",
value=column2,
name=f"nullif({column1.name}, {col2_name})",
)
[docs]
@staticmethod
def case_when(*conditions: Tuple[Any, Any], else_value: Any = None) -> CaseWhen:
"""Create CASE WHEN expression with multiple conditions.
Args:
*conditions: Variable number of (condition, value) tuples.
else_value: Default value if no conditions match.
Returns:
CaseWhen object representing the CASE WHEN expression.
Example:
>>> F.case_when(
... (F.col("age") > 18, "adult"),
... (F.col("age") > 12, "teen"),
... else_value="child"
... )
"""
if not conditions:
raise ValueError("At least one condition must be provided")
# Create CaseWhen with the first condition
first_condition, first_value = conditions[0]
case_when = CaseWhen(condition=first_condition, value=first_value)
# Add remaining conditions
for condition, value in conditions[1:]:
case_when.when(condition, value)
# Set default value if provided
if else_value is not None:
case_when.otherwise(else_value)
return case_when
[docs]
@staticmethod
def try_add(
left: Union[Column, str, int, float], right: Union[Column, str, int, float]
) -> ColumnOperation:
"""Null-safe addition - returns NULL on error (PySpark 3.5+).
Args:
left: Left operand (column or literal).
right: Right operand (column or literal).
Returns:
ColumnOperation representing the try_add function.
"""
from sparkless.functions.base import Column
if isinstance(left, (str, int, float)):
left = Column(str(left)) if isinstance(left, (int, float)) else Column(left)
if isinstance(right, (str, int, float)):
right = (
Column(str(right)) if isinstance(right, (int, float)) else Column(right)
)
operation = ColumnOperation(
left,
"try_add",
value=right,
name=f"try_add({left.name}, {right.name if hasattr(right, 'name') else right})",
)
return operation
[docs]
@staticmethod
def try_subtract(
left: Union[Column, str, int, float], right: Union[Column, str, int, float]
) -> ColumnOperation:
"""Null-safe subtraction - returns NULL on error (PySpark 3.5+).
Args:
left: Left operand (column or literal).
right: Right operand (column or literal).
Returns:
ColumnOperation representing the try_subtract function.
"""
from sparkless.functions.base import Column
if isinstance(left, (str, int, float)):
left = Column(str(left)) if isinstance(left, (int, float)) else Column(left)
if isinstance(right, (str, int, float)):
right = (
Column(str(right)) if isinstance(right, (int, float)) else Column(right)
)
operation = ColumnOperation(
left,
"try_subtract",
value=right,
name=f"try_subtract({left.name}, {right.name if hasattr(right, 'name') else right})",
)
return operation
[docs]
@staticmethod
def try_multiply(
left: Union[Column, str, int, float], right: Union[Column, str, int, float]
) -> ColumnOperation:
"""Null-safe multiplication - returns NULL on error (PySpark 3.5+).
Args:
left: Left operand (column or literal).
right: Right operand (column or literal).
Returns:
ColumnOperation representing the try_multiply function.
"""
from sparkless.functions.base import Column
if isinstance(left, (str, int, float)):
left = Column(str(left)) if isinstance(left, (int, float)) else Column(left)
if isinstance(right, (str, int, float)):
right = (
Column(str(right)) if isinstance(right, (int, float)) else Column(right)
)
operation = ColumnOperation(
left,
"try_multiply",
value=right,
name=f"try_multiply({left.name}, {right.name if hasattr(right, 'name') else right})",
)
return operation
[docs]
@staticmethod
def try_divide(
left: Union[Column, str, int, float], right: Union[Column, str, int, float]
) -> ColumnOperation:
"""Null-safe division - returns NULL on error (PySpark 3.5+).
Args:
left: Left operand (column or literal).
right: Right operand (column or literal).
Returns:
ColumnOperation representing the try_divide function.
"""
from sparkless.functions.base import Column
if isinstance(left, (str, int, float)):
left = Column(str(left)) if isinstance(left, (int, float)) else Column(left)
if isinstance(right, (str, int, float)):
right = (
Column(str(right)) if isinstance(right, (int, float)) else Column(right)
)
operation = ColumnOperation(
left,
"try_divide",
value=right,
name=f"try_divide({left.name}, {right.name if hasattr(right, 'name') else right})",
)
return operation
[docs]
@staticmethod
def try_sum(column: Union[Column, str]) -> "AggregateFunction":
"""Null-safe sum aggregate - returns NULL on error (PySpark 3.5+).
Args:
column: The column to sum.
Returns:
AggregateFunction representing the try_sum function.
"""
from sparkless.functions.base import AggregateFunction, Column
from sparkless.spark_types import DoubleType
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "try_sum", name=f"try_sum({column.name})")
return AggregateFunction(operation, "try_sum", DoubleType())
[docs]
@staticmethod
def try_avg(column: Union[Column, str]) -> "AggregateFunction":
"""Null-safe average aggregate - returns NULL on error (PySpark 3.5+).
Args:
column: The column to average.
Returns:
AggregateFunction representing the try_avg function.
"""
from sparkless.functions.base import AggregateFunction, Column
from sparkless.spark_types import DoubleType
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "try_avg", name=f"try_avg({column.name})")
return AggregateFunction(operation, "try_avg", DoubleType())
[docs]
@staticmethod
def try_element_at(
column: Union[Column, str], index: Union[Column, str, int]
) -> ColumnOperation:
"""Null-safe element_at - returns NULL on error (PySpark 3.5+).
Args:
column: The column containing array or map.
index: The index or key to access.
Returns:
ColumnOperation representing the try_element_at function.
"""
from sparkless.functions.base import Column
if isinstance(column, str):
column = Column(column)
if isinstance(index, (str, int)):
index = Column(str(index)) if isinstance(index, int) else Column(index)
operation = ColumnOperation(
column,
"try_element_at",
value=index,
name=f"try_element_at({column.name}, {index.name if hasattr(index, 'name') else index})",
)
return operation
[docs]
@staticmethod
def try_to_binary(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Null-safe to_binary - returns NULL on error (PySpark 3.5+).
Args:
column: The column to convert to binary.
format: Optional format ('hex', 'base64', 'utf-8').
Returns:
ColumnOperation representing the try_to_binary function.
"""
from sparkless.functions.base import Column
if isinstance(column, str):
column = Column(column)
if format is not None:
operation = ColumnOperation(
column,
"try_to_binary",
value=format,
name=f"try_to_binary({column.name}, '{format}')",
)
else:
operation = ColumnOperation(
column, "try_to_binary", name=f"try_to_binary({column.name})"
)
return operation
[docs]
@staticmethod
def try_to_number(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Null-safe to_number - returns NULL on error (PySpark 3.5+).
Args:
column: The column to convert to number.
format: Optional format string.
Returns:
ColumnOperation representing the try_to_number function.
"""
from sparkless.functions.base import Column
if isinstance(column, str):
column = Column(column)
if format is not None:
operation = ColumnOperation(
column,
"try_to_number",
value=format,
name=f"try_to_number({column.name}, '{format}')",
)
else:
operation = ColumnOperation(
column, "try_to_number", name=f"try_to_number({column.name})"
)
return operation
[docs]
@staticmethod
def try_to_timestamp(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Null-safe to_timestamp - returns NULL on error (PySpark 3.5+).
Args:
column: The column to convert to timestamp.
format: Optional format string.
Returns:
ColumnOperation representing the try_to_timestamp function.
"""
from sparkless.functions.base import Column
if isinstance(column, str):
column = Column(column)
if format is not None:
operation = ColumnOperation(
column,
"try_to_timestamp",
value=format,
name=f"try_to_timestamp({column.name}, '{format}')",
)
else:
operation = ColumnOperation(
column, "try_to_timestamp", name=f"try_to_timestamp({column.name})"
)
return operation