Source code for sparkless.functions.udf

"""
User-Defined Function (UDF) implementation for Sparkless.

This module provides the UserDefinedFunction class for wrapping Python
functions to use in DataFrame transformations.
"""

from typing import Any, Callable, Optional, Union
from sparkless.functions.core.column import Column
from sparkless.functions.core.operations import ColumnOperation


[docs] class UserDefinedFunction: """User-defined function wrapper (all PySpark versions). Wraps a Python function to be used in DataFrame transformations. Supports marking as nondeterministic and applying to columns. Example: >>> def upper_case(s): ... return s.upper() >>> udf_func = UserDefinedFunction(upper_case, StringType()) >>> df.select(udf_func("name").alias("upper_name")) """
[docs] def __init__( self, func: Callable[..., Any], returnType: Any, name: Optional[str] = None, evalType: str = "SQL", ): """Initialize UserDefinedFunction. Args: func: Python function to wrap returnType: Return data type name: Optional function name evalType: Evaluation type ("SQL" or "PANDAS") """ self.func = func self.returnType = returnType self.evalType = evalType self._name = name self._deterministic = True self._is_pandas_udf = evalType == "PANDAS"
[docs] def asNondeterministic(self) -> "UserDefinedFunction": """Mark UDF as nondeterministic. Nondeterministic UDFs may return different results for the same input. This affects query optimization and caching. Returns: Self with nondeterministic flag set """ self._deterministic = False return self
[docs] def __call__(self, *cols: Union[str, Column]) -> ColumnOperation: """Apply UDF to columns. Args: *cols: Column names or Column objects Returns: ColumnOperation representing the UDF application """ # Convert string column names to Column objects column_objs = [] for col in cols: if isinstance(col, str): column_objs.append(Column(col)) else: column_objs.append(col) # Create the first column operation if not column_objs: raise ValueError("UDF requires at least one column argument") first_col = column_objs[0] # Get column name safely col_name = getattr(first_col, "name", str(first_col)) op = ColumnOperation(first_col, "udf", name=self._name or f"udf({col_name})") op._udf_func = self.func op._udf_return_type = self.returnType op._udf_cols = column_objs op._is_pandas_udf = self._is_pandas_udf return op
[docs] class UserDefinedTableFunction: """User-defined table function wrapper (PySpark 3.5+). Wraps a Python function that returns multiple rows (table-valued function). Similar to UserDefinedFunction but for functions that return tables. Example: >>> def split_string(s): ... return [(char,) for char in s] >>> table_udf = UserDefinedTableFunction(split_string, StructType([...])) >>> df.select(table_udf("name").alias("chars")) """
[docs] def __init__( self, func: Callable[..., Any], returnType: Any, name: Optional[str] = None, ): """Initialize UserDefinedTableFunction. Args: func: Python function to wrap (should return iterable of rows) returnType: Return schema (StructType) name: Optional function name """ self.func = func self.returnType = returnType self._name = name
[docs] def __call__(self, *cols: Union[str, Column]) -> ColumnOperation: """Apply table UDF to columns. Args: *cols: Column names or Column objects Returns: ColumnOperation representing the table UDF application """ # Convert string column names to Column objects column_objs = [] for col in cols: if isinstance(col, str): column_objs.append(Column(col)) else: column_objs.append(col) # Create the first column operation if not column_objs: raise ValueError("Table UDF requires at least one column argument") first_col = column_objs[0] # Get column name safely col_name = getattr(first_col, "name", str(first_col)) op = ColumnOperation( first_col, "table_udf", name=self._name or f"table_udf({col_name})" ) op._udf_func = self.func op._udf_return_type = self.returnType op._udf_cols = column_objs op._is_table_udf = True return op