"""
Core functions module for Sparkless.
This module provides the main F namespace and re-exports all function classes
for backward compatibility with the original functions.py structure. The Functions
class serves as the primary interface for all PySpark-compatible functions.
Key Features:
- Complete PySpark F namespace compatibility
- Column functions (col, lit, when, coalesce, isnull)
- String functions (upper, lower, length, trim, regexp_replace, split)
- Math functions (abs, round, ceil, floor, sqrt, exp, log, pow, sin, cos, tan)
- Aggregate functions (count, sum, avg, max, min, stddev, variance)
- DateTime functions (current_timestamp, current_date, to_date, to_timestamp)
- Window functions (row_number, rank, dense_rank, lag, lead)
Example:
>>> from sparkless.sql import SparkSession, functions as F
>>> spark = SparkSession("test")
>>> data = [{"name": "Alice", "age": 25}]
>>> df = spark.createDataFrame(data)
>>> df.select(F.upper(F.col("name")), F.col("age") * 2).show()
DataFrame[1 rows, 2 columns]
<BLANKLINE>
upper(name) (age * 2)
ALICE 50.0
"""
from typing import Any, Callable, Dict, Optional, TYPE_CHECKING, Tuple, Union
from .core.column import Column, ColumnOperation
if TYPE_CHECKING:
from .conditional import CaseWhen
from .core.literals import Literal
else:
CaseWhen = Any
Literal = Any
from .core.literals import Literal
from .base import AggregateFunction
from .conditional import CaseWhen, ConditionalFunctions
from .window_execution import WindowFunction
from .string import StringFunctions
from .math import MathFunctions
from .aggregate import AggregateFunctions
from .datetime import DateTimeFunctions
from .array import ArrayFunctions
from .map import MapFunctions
from .bitwise import BitwiseFunctions
from .xml import XMLFunctions
from .crypto import CryptoFunctions
from ..errors import PySparkTypeError, PySparkValueError
if TYPE_CHECKING:
from ..session import SparkSession
[docs]
class Functions:
"""Main functions namespace (F) for Sparkless.
This class provides access to all functions in a PySpark-compatible way.
"""
# Column functions
@staticmethod
def _resolve_session(session: Optional["SparkSession"]) -> "SparkSession":
"""Resolve an active SparkSession for session-aware functions."""
from ..session import SparkSession
if session is not None:
return session
# Use getActiveSession() for PySpark compatibility
active = SparkSession.getActiveSession()
if active is not None:
return active
raise PySparkValueError(
"No active SparkSession found. Call SparkSession.builder.getOrCreate() "
"or pass a session explicitly."
)
@staticmethod
def _require_active_session(operation_name: str) -> None:
"""Require an active SparkSession for the operation.
Raises:
RuntimeError: If no active SparkSession is available
"""
from ..session import SparkSession
if not SparkSession._has_active_session():
raise RuntimeError(
f"Cannot perform {operation_name}: "
"No active SparkSession found. "
"This operation requires an active SparkSession, similar to PySpark. "
"Create a SparkSession first: spark = SparkSession('app_name')"
)
[docs]
@staticmethod
def col(name: str) -> Column:
"""Create a column reference.
Note:
In PySpark, col() can be called without an active SparkSession.
The column expression is evaluated later when used with a DataFrame.
"""
return Column(name)
[docs]
@staticmethod
def lit(value: Any) -> Literal:
"""Create a literal value.
Note:
In PySpark, lit() can be called without an active SparkSession.
The literal expression is evaluated later when used with a DataFrame.
"""
return Literal(value)
[docs]
@staticmethod
def cast(column: Union[Column, str], data_type: Any) -> ColumnOperation:
"""Cast column to different data type.
Args:
column: The column to cast.
data_type: The target data type.
Returns:
ColumnOperation representing the cast function.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("cast operation")
if isinstance(column, str):
column = Column(column)
return column.cast(data_type)
[docs]
@staticmethod
def current_catalog(session: Optional["SparkSession"] = None) -> Literal:
"""Return the current catalog name as a literal."""
# Validate session at creation time (matches PySpark behavior)
Functions._resolve_session(session)
# Store the explicit session if provided, otherwise will resolve at evaluation
explicit_session = session
def resolver() -> str:
"""Resolve catalog name from session at evaluation time."""
# Use explicit session if provided, otherwise resolve from singleton
if explicit_session is not None:
eval_spark = explicit_session
else:
eval_spark = Functions._resolve_session(None)
catalog = getattr(eval_spark.catalog, "currentCatalog", None)
return catalog() if callable(catalog) else "spark_catalog"
# Create lazy literal that resolves at evaluation time
# Don't call resolver() here - it will be called during evaluation
literal = Literal("", resolver=resolver)
literal.name = "current_catalog()"
return literal
[docs]
@staticmethod
def current_database(session: Optional["SparkSession"] = None) -> Literal:
"""Return the current database/schema as a literal."""
# Validate session at creation time (matches PySpark behavior)
Functions._resolve_session(session)
# Store the explicit session if provided, otherwise will resolve at evaluation
explicit_session = session
def resolver() -> str:
"""Resolve database name from session at evaluation time."""
# Use explicit session if provided, otherwise resolve from singleton
if explicit_session is not None:
return explicit_session.catalog.currentDatabase()
else:
eval_spark = Functions._resolve_session(None)
return eval_spark.catalog.currentDatabase()
# Create lazy literal that resolves at evaluation time
# Don't call resolver() here - it will be called during evaluation
literal = Literal("", resolver=resolver)
literal.name = "current_database()"
return literal
[docs]
@staticmethod
def current_schema(session: Optional["SparkSession"] = None) -> Literal:
"""Alias for current_database (Spark SQL compatibility)."""
# Validate session at creation time (matches PySpark behavior)
Functions._resolve_session(session)
# Store the explicit session if provided, otherwise will resolve at evaluation
explicit_session = session
def resolver() -> str:
"""Resolve schema name from session at evaluation time."""
# Use explicit session if provided, otherwise resolve from singleton
if explicit_session is not None:
return explicit_session.catalog.currentDatabase()
else:
eval_spark = Functions._resolve_session(None)
return eval_spark.catalog.currentDatabase()
# Create lazy literal that resolves at evaluation time
# Don't call resolver() here - it will be called during evaluation
literal = Literal("", resolver=resolver)
literal.name = "current_schema()"
return literal
[docs]
@staticmethod
def current_user(session: Optional["SparkSession"] = None) -> Literal:
"""Return the current Spark user as a literal."""
# Validate session at creation time (matches PySpark behavior)
Functions._resolve_session(session)
# Store the explicit session if provided, otherwise will resolve at evaluation
explicit_session = session
def resolver() -> str:
"""Resolve user name from session at evaluation time."""
# Use explicit session if provided, otherwise resolve from singleton
if explicit_session is not None:
eval_spark = explicit_session
else:
eval_spark = Functions._resolve_session(None)
spark_user = getattr(eval_spark.sparkContext, "sparkUser", None)
return spark_user() if callable(spark_user) else "mock_user"
# Create lazy literal that resolves at evaluation time
# Don't call resolver() here - it will be called during evaluation
literal = Literal("", resolver=resolver)
literal.name = "current_user()"
return literal
# String functions
[docs]
@staticmethod
def upper(column: Union[Column, str]) -> ColumnOperation:
"""Convert string to uppercase."""
return StringFunctions.upper(column)
[docs]
@staticmethod
def lower(column: Union[Column, str]) -> ColumnOperation:
"""Convert string to lowercase."""
return StringFunctions.lower(column)
[docs]
@staticmethod
def length(column: Union[Column, str]) -> ColumnOperation:
"""Get string length."""
return StringFunctions.length(column)
[docs]
@staticmethod
def char_length(column: Union[Column, str]) -> ColumnOperation:
"""Get character length (alias for length) (PySpark 3.5+)."""
return StringFunctions.char_length(column)
[docs]
@staticmethod
def character_length(column: Union[Column, str]) -> ColumnOperation:
"""Get character length (alias for length) (PySpark 3.5+)."""
return StringFunctions.character_length(column)
[docs]
@staticmethod
def trim(column: Union[Column, str]) -> ColumnOperation:
"""Trim whitespace."""
return StringFunctions.trim(column)
[docs]
@staticmethod
def ltrim(column: Union[Column, str]) -> ColumnOperation:
"""Trim left whitespace."""
return StringFunctions.ltrim(column)
[docs]
@staticmethod
def rtrim(column: Union[Column, str]) -> ColumnOperation:
"""Trim right whitespace."""
return StringFunctions.rtrim(column)
[docs]
@staticmethod
def regexp_replace(
column: Union[Column, str], pattern: str, replacement: str
) -> ColumnOperation:
"""Replace regex pattern."""
return StringFunctions.regexp_replace(column, pattern, replacement)
[docs]
@staticmethod
def split(
column: Union[Column, str], delimiter: str, limit: Optional[int] = None
) -> ColumnOperation:
"""Split string by delimiter.
Args:
column: The column to split.
delimiter: The delimiter to split on.
limit: Optional limit on the number of times the pattern is applied.
If None or -1, no limit (default PySpark behavior).
"""
return StringFunctions.split(column, delimiter, limit)
[docs]
@staticmethod
def substring(
column: Union[Column, str], start: int, length: Optional[int] = None
) -> ColumnOperation:
"""Extract substring."""
return StringFunctions.substring(column, start, length)
[docs]
@staticmethod
def concat(*columns: Union[Column, str]) -> ColumnOperation:
"""Concatenate strings."""
return StringFunctions.concat(*columns)
[docs]
@staticmethod
def translate(
column: Union[Column, str], matching_string: str, replace_string: str
) -> ColumnOperation:
"""Translate characters in a string using a character mapping."""
return StringFunctions.translate(column, matching_string, replace_string)
[docs]
@staticmethod
def ascii(column: Union[Column, str]) -> ColumnOperation:
"""Return ASCII value of the first character."""
return StringFunctions.ascii(column)
[docs]
@staticmethod
def base64(column: Union[Column, str]) -> ColumnOperation:
"""Encode the string to base64."""
return StringFunctions.base64(column)
[docs]
@staticmethod
def btrim(
column: Union[Column, str], trim_string: Optional[str] = None
) -> ColumnOperation:
"""Trim characters from both ends of string."""
return StringFunctions.btrim(column, trim_string)
[docs]
@staticmethod
def contains(column: Union[Column, str], substring: str) -> ColumnOperation:
"""Check if string contains substring."""
return StringFunctions.contains(column, substring)
[docs]
@staticmethod
def left(column: Union[Column, str], length: int) -> ColumnOperation:
"""Extract left N characters from string."""
return StringFunctions.left(column, length)
[docs]
@staticmethod
def right(column: Union[Column, str], length: int) -> ColumnOperation:
"""Extract right N characters from string."""
return StringFunctions.right(column, length)
[docs]
@staticmethod
def bit_length(column: Union[Column, str]) -> ColumnOperation:
"""Get bit length of string."""
return StringFunctions.bit_length(column)
[docs]
@staticmethod
def startswith(column: Union[Column, str], substring: str) -> ColumnOperation:
"""Check if string starts with substring."""
return StringFunctions.startswith(column, substring)
[docs]
@staticmethod
def endswith(column: Union[Column, str], substring: str) -> ColumnOperation:
"""Check if string ends with substring."""
return StringFunctions.endswith(column, substring)
[docs]
@staticmethod
def like(column: Union[Column, str], pattern: str) -> ColumnOperation:
"""SQL LIKE pattern matching."""
return StringFunctions.like(column, pattern)
[docs]
@staticmethod
def rlike(column: Union[Column, str], pattern: str) -> ColumnOperation:
"""Regular expression pattern matching."""
return StringFunctions.rlike(column, pattern)
[docs]
@staticmethod
def isin(column: Union[Column, str], *values: Any) -> ColumnOperation:
"""Check if column value is in list of values.
Args:
column: The column to check.
*values: Variable number of values to check against.
Returns:
ColumnOperation representing the isin function.
"""
if isinstance(column, str):
column = Column(column)
return column.isin(*values)
[docs]
@staticmethod
def replace(column: Union[Column, str], old: str, new: str) -> ColumnOperation:
"""Replace occurrences of substring in string."""
return StringFunctions.replace(column, old, new)
[docs]
@staticmethod
def substr(
column: Union[Column, str], start: int, length: Optional[int] = None
) -> ColumnOperation:
"""Alias for substring - Extract substring from string."""
return StringFunctions.substr(column, start, length)
[docs]
@staticmethod
def split_part(
column: Union[Column, str], delimiter: str, part: int
) -> ColumnOperation:
"""Extract part of string split by delimiter."""
return StringFunctions.split_part(column, delimiter, part)
[docs]
@staticmethod
def position(
substring: Union[Column, str], column: Union[Column, str]
) -> ColumnOperation:
"""Find position of substring in string (1-indexed)."""
return StringFunctions.position(substring, column)
[docs]
@staticmethod
def octet_length(column: Union[Column, str]) -> ColumnOperation:
"""Get byte length (octet length) of string."""
return StringFunctions.octet_length(column)
[docs]
@staticmethod
def char(column: Union[Column, str]) -> ColumnOperation:
"""Convert integer to character."""
return StringFunctions.char(column)
[docs]
@staticmethod
def ucase(column: Union[Column, str]) -> ColumnOperation:
"""Alias for upper - Convert string to uppercase."""
return StringFunctions.ucase(column)
[docs]
@staticmethod
def lcase(column: Union[Column, str]) -> ColumnOperation:
"""Alias for lower - Convert string to lowercase."""
return StringFunctions.lcase(column)
[docs]
@staticmethod
def elt(n: Union[Column, int], *columns: Union[Column, str]) -> ColumnOperation:
"""Return element at index from list of columns."""
return StringFunctions.elt(n, *columns)
[docs]
@staticmethod
def unbase64(column: Union[Column, str]) -> ColumnOperation:
"""Decode a base64-encoded string."""
return StringFunctions.unbase64(column)
[docs]
@staticmethod
def md5(column: Union[Column, str]) -> ColumnOperation:
"""MD5 hash (PySpark 3.0+)."""
return StringFunctions.md5(column)
[docs]
@staticmethod
def sha1(column: Union[Column, str]) -> ColumnOperation:
"""SHA-1 hash (PySpark 3.0+)."""
return StringFunctions.sha1(column)
[docs]
@staticmethod
def sha2(column: Union[Column, str], numBits: int) -> ColumnOperation:
"""SHA-2 hash family (PySpark 3.0+)."""
return StringFunctions.sha2(column, numBits)
[docs]
@staticmethod
def sha(column: Union[Column, str]) -> ColumnOperation:
"""SHA-1 hash alias (PySpark 3.5+)."""
return StringFunctions.sha(column)
[docs]
@staticmethod
def mask(
column: Union[Column, str],
upperChar: Optional[str] = None,
lowerChar: Optional[str] = None,
digitChar: Optional[str] = None,
otherChar: Optional[str] = None,
) -> ColumnOperation:
"""Mask sensitive data in a string (PySpark 3.5+)."""
return StringFunctions.mask(column, upperChar, lowerChar, digitChar, otherChar)
[docs]
@staticmethod
def json_array_length(
column: Union[Column, str], path: Optional[str] = None
) -> ColumnOperation:
"""Get the length of a JSON array (PySpark 3.5+)."""
return StringFunctions.json_array_length(column, path)
[docs]
@staticmethod
def json_object_keys(
column: Union[Column, str], path: Optional[str] = None
) -> ColumnOperation:
"""Get the keys of a JSON object (PySpark 3.5+)."""
return StringFunctions.json_object_keys(column, path)
[docs]
@staticmethod
def xpath_number(column: Union[Column, str], path: str) -> ColumnOperation:
"""Extract number from XML using XPath (PySpark 3.5+)."""
return StringFunctions.xpath_number(column, path)
[docs]
@staticmethod
def user() -> ColumnOperation:
"""Get current user name (PySpark 3.5+)."""
return StringFunctions.user()
[docs]
@staticmethod
def crc32(column: Union[Column, str]) -> ColumnOperation:
"""CRC32 checksum (PySpark 3.0+)."""
return StringFunctions.crc32(column)
# Cryptographic functions
[docs]
@staticmethod
def aes_encrypt(
data: Union[Column, str],
key: Union[Column, str],
mode: Optional[str] = None,
padding: Optional[str] = None,
) -> ColumnOperation:
"""Encrypt data using AES encryption (PySpark 3.5+)."""
return CryptoFunctions.aes_encrypt(data, key, mode, padding)
[docs]
@staticmethod
def aes_decrypt(
data: Union[Column, str],
key: Union[Column, str],
mode: Optional[str] = None,
padding: Optional[str] = None,
) -> ColumnOperation:
"""Decrypt data using AES decryption (PySpark 3.5+)."""
return CryptoFunctions.aes_decrypt(data, key, mode, padding)
[docs]
@staticmethod
def try_aes_decrypt(
data: Union[Column, str],
key: Union[Column, str],
mode: Optional[str] = None,
padding: Optional[str] = None,
) -> ColumnOperation:
"""Null-safe AES decryption - returns NULL on error (PySpark 3.5+)."""
return CryptoFunctions.try_aes_decrypt(data, key, mode, padding)
[docs]
@staticmethod
def to_str(column: Union[Column, str]) -> ColumnOperation:
"""Convert column to string (all PySpark versions)."""
return StringFunctions.to_str(column)
[docs]
@staticmethod
def array_join(
column: Union[Column, str],
delimiter: str,
null_replacement: Optional[str] = None,
) -> ColumnOperation:
"""Join array elements with a delimiter."""
return StringFunctions.array_join(column, delimiter, null_replacement)
[docs]
@staticmethod
def repeat(column: Union[Column, str], n: int) -> ColumnOperation:
"""Repeat a string N times."""
return StringFunctions.repeat(column, n)
[docs]
@staticmethod
def concat_ws(sep: str, *cols: Union[Column, str]) -> ColumnOperation:
"""Concatenate multiple columns with separator."""
return StringFunctions.concat_ws(sep, *cols)
[docs]
@staticmethod
def substring_index(
column: Union[Column, str], delim: str, count: int
) -> ColumnOperation:
"""Returns substring before/after count occurrences of delimiter."""
return StringFunctions.substring_index(column, delim, count)
[docs]
@staticmethod
def instr(column: Union[Column, str], substr: str) -> ColumnOperation:
"""Locate position of first occurrence of substr."""
return StringFunctions.instr(column, substr)
[docs]
@staticmethod
def locate(
substr: str, column: Union[Column, str], pos: int = 1
) -> ColumnOperation:
"""Locate position of substr starting from pos."""
return StringFunctions.locate(substr, column, pos)
[docs]
@staticmethod
def lpad(column: Union[Column, str], len: int, pad: str) -> ColumnOperation:
"""Left-pad string to length len with pad string."""
return StringFunctions.lpad(column, len, pad)
[docs]
@staticmethod
def rpad(column: Union[Column, str], len: int, pad: str) -> ColumnOperation:
"""Right-pad string to length len with pad string."""
return StringFunctions.rpad(column, len, pad)
[docs]
@staticmethod
def levenshtein(
left: Union[Column, str], right: Union[Column, str]
) -> ColumnOperation:
"""Compute Levenshtein distance between two strings."""
return StringFunctions.levenshtein(left, right)
[docs]
@staticmethod
def bin(column: Union[Column, str]) -> ColumnOperation:
"""Convert to binary string."""
return StringFunctions.bin(column)
[docs]
@staticmethod
def hex(column: Union[Column, str]) -> ColumnOperation:
"""Convert to hexadecimal string."""
return StringFunctions.hex(column)
[docs]
@staticmethod
def unhex(column: Union[Column, str]) -> ColumnOperation:
"""Convert hex string to binary."""
return StringFunctions.unhex(column)
[docs]
@staticmethod
def hash(*cols: Union[Column, str]) -> ColumnOperation:
"""Compute hash value."""
return StringFunctions.hash(*cols)
[docs]
@staticmethod
def xxhash64(*cols: Union[Column, str]) -> ColumnOperation:
"""Compute xxHash64 value (all PySpark versions)."""
return StringFunctions.xxhash64(*cols)
[docs]
@staticmethod
def encode(column: Union[Column, str], charset: str) -> ColumnOperation:
"""Encode string to binary."""
return StringFunctions.encode(column, charset)
[docs]
@staticmethod
def decode(column: Union[Column, str], charset: str) -> ColumnOperation:
"""Decode binary to string."""
return StringFunctions.decode(column, charset)
[docs]
@staticmethod
def conv(
column: Union[Column, str], from_base: int, to_base: int
) -> ColumnOperation:
"""Convert number between bases."""
return StringFunctions.conv(column, from_base, to_base)
[docs]
@staticmethod
def initcap(column: Union[Column, str]) -> ColumnOperation:
"""Capitalize first letter of each word."""
return StringFunctions.initcap(column)
[docs]
@staticmethod
def soundex(column: Union[Column, str]) -> ColumnOperation:
"""Soundex encoding for phonetic matching."""
return StringFunctions.soundex(column)
# Math functions
[docs]
@staticmethod
def abs(column: Union[Column, str]) -> ColumnOperation:
"""Get absolute value."""
return MathFunctions.abs(column)
[docs]
@staticmethod
def round(column: Union[Column, str], scale: int = 0) -> ColumnOperation:
"""Round to decimal places."""
return MathFunctions.round(column, scale)
[docs]
@staticmethod
def ceil(column: Union[Column, str]) -> ColumnOperation:
"""Round up."""
return MathFunctions.ceil(column)
[docs]
@staticmethod
def ceiling(column: Union[Column, str]) -> ColumnOperation:
"""Alias for ceil - Round up to nearest integer."""
return MathFunctions.ceiling(column)
[docs]
@staticmethod
def floor(column: Union[Column, str]) -> ColumnOperation:
"""Round down."""
return MathFunctions.floor(column)
[docs]
@staticmethod
def sqrt(column: Union[Column, str]) -> ColumnOperation:
"""Square root."""
return MathFunctions.sqrt(column)
[docs]
@staticmethod
def exp(column: Union[Column, str]) -> ColumnOperation:
"""Exponential."""
return MathFunctions.exp(column)
[docs]
@staticmethod
def log(
base: Union[Column, str, float, int, None],
column: Optional[Union[Column, str]] = None,
) -> ColumnOperation:
"""Logarithm.
PySpark signature: log(base, column) or log(column) for natural log.
"""
return MathFunctions.log(base, column)
[docs]
@staticmethod
def log10(column: Union[Column, str]) -> ColumnOperation:
"""Base-10 logarithm (PySpark 3.0+)."""
return MathFunctions.log10(column)
[docs]
@staticmethod
def log2(column: Union[Column, str]) -> ColumnOperation:
"""Base-2 logarithm (PySpark 3.0+)."""
return MathFunctions.log2(column)
[docs]
@staticmethod
def log1p(column: Union[Column, str]) -> ColumnOperation:
"""Natural log of (1 + x) (PySpark 3.0+)."""
return MathFunctions.log1p(column)
[docs]
@staticmethod
def expm1(column: Union[Column, str]) -> ColumnOperation:
"""exp(x) - 1 (PySpark 3.0+)."""
return MathFunctions.expm1(column)
[docs]
@staticmethod
def pow(
column: Union[Column, str], exponent: Union[Column, float, int]
) -> ColumnOperation:
"""Power."""
return MathFunctions.pow(column, exponent)
[docs]
@staticmethod
def power(
column: Union[Column, str], exponent: Union[Column, float, int]
) -> ColumnOperation:
"""Alias for pow - Raise to power."""
return MathFunctions.power(column, exponent)
[docs]
@staticmethod
def positive(column: Union[Column, str]) -> ColumnOperation:
"""Return positive value (identity function)."""
return MathFunctions.positive(column)
[docs]
@staticmethod
def negative(column: Union[Column, str]) -> ColumnOperation:
"""Return negative value."""
return MathFunctions.negative(column)
[docs]
@staticmethod
def sin(column: Union[Column, str]) -> ColumnOperation:
"""Sine."""
return MathFunctions.sin(column)
[docs]
@staticmethod
def cos(column: Union[Column, str]) -> ColumnOperation:
"""Cosine."""
return MathFunctions.cos(column)
[docs]
@staticmethod
def tan(column: Union[Column, str]) -> ColumnOperation:
"""Tangent."""
return MathFunctions.tan(column)
[docs]
@staticmethod
def acosh(column: Union[Column, str]) -> ColumnOperation:
"""Inverse hyperbolic cosine (PySpark 3.0+)."""
return MathFunctions.acosh(column)
[docs]
@staticmethod
def asinh(column: Union[Column, str]) -> ColumnOperation:
"""Inverse hyperbolic sine (PySpark 3.0+)."""
return MathFunctions.asinh(column)
[docs]
@staticmethod
def atanh(column: Union[Column, str]) -> ColumnOperation:
"""Inverse hyperbolic tangent (PySpark 3.0+)."""
return MathFunctions.atanh(column)
[docs]
@staticmethod
def acos(column: Union[Column, str]) -> ColumnOperation:
"""Inverse cosine (arc cosine)."""
return MathFunctions.acos(column)
[docs]
@staticmethod
def asin(column: Union[Column, str]) -> ColumnOperation:
"""Inverse sine (arc sine)."""
return MathFunctions.asin(column)
[docs]
@staticmethod
def atan(column: Union[Column, str]) -> ColumnOperation:
"""Inverse tangent (arc tangent)."""
return MathFunctions.atan(column)
[docs]
@staticmethod
def atan2(
y: Union[Column, str, float, int], x: Union[Column, str, float, int]
) -> ColumnOperation:
"""2-argument arctangent (PySpark 3.0+)."""
return MathFunctions.atan2(y, x)
[docs]
@staticmethod
def cosh(column: Union[Column, str]) -> ColumnOperation:
"""Hyperbolic cosine."""
return MathFunctions.cosh(column)
[docs]
@staticmethod
def sinh(column: Union[Column, str]) -> ColumnOperation:
"""Hyperbolic sine."""
return MathFunctions.sinh(column)
[docs]
@staticmethod
def tanh(column: Union[Column, str]) -> ColumnOperation:
"""Hyperbolic tangent."""
return MathFunctions.tanh(column)
[docs]
@staticmethod
def degrees(column: Union[Column, str]) -> ColumnOperation:
"""Convert radians to degrees."""
return MathFunctions.degrees(column)
[docs]
@staticmethod
def radians(column: Union[Column, str]) -> ColumnOperation:
"""Convert degrees to radians."""
return MathFunctions.radians(column)
[docs]
@staticmethod
def cbrt(column: Union[Column, str]) -> ColumnOperation:
"""Cube root."""
return MathFunctions.cbrt(column)
[docs]
@staticmethod
def factorial(column: Union[Column, str]) -> ColumnOperation:
"""Factorial of non-negative integer."""
return MathFunctions.factorial(column)
[docs]
@staticmethod
def rand(seed: Optional[int] = None) -> ColumnOperation:
"""Generate random column with uniform distribution [0.0, 1.0]."""
return MathFunctions.rand(seed)
[docs]
@staticmethod
def randn(seed: Optional[int] = None) -> ColumnOperation:
"""Generate random column with standard normal distribution."""
return MathFunctions.randn(seed)
[docs]
@staticmethod
def rint(column: Union[Column, str]) -> ColumnOperation:
"""Round to nearest integer using banker's rounding."""
return MathFunctions.rint(column)
[docs]
@staticmethod
def bround(column: Union[Column, str], scale: int = 0) -> ColumnOperation:
"""Round using HALF_EVEN rounding mode."""
return MathFunctions.bround(column, scale)
[docs]
@staticmethod
def sign(column: Union[Column, str]) -> ColumnOperation:
"""Sign of number (matches PySpark signum)."""
return MathFunctions.sign(column)
[docs]
@staticmethod
def hypot(col1: Union[Column, str], col2: Union[Column, str]) -> ColumnOperation:
"""Compute hypotenuse."""
return MathFunctions.hypot(col1, col2)
[docs]
@staticmethod
def nanvl(col1: Union[Column, str], col2: Union[Column, str]) -> ColumnOperation:
"""Return col1 if not NaN, else col2."""
return MathFunctions.nanvl(col1, col2)
[docs]
@staticmethod
def signum(column: Union[Column, str]) -> ColumnOperation:
"""Compute signum (sign)."""
return MathFunctions.signum(column)
[docs]
@staticmethod
def width_bucket(
value: Union[Column, str],
min_value: Union[Column, str, float],
max_value: Union[Column, str, float],
num_buckets: Union[Column, str, int],
) -> ColumnOperation:
"""Compute histogram bucket number for value (PySpark 3.5+)."""
return MathFunctions.width_bucket(value, min_value, max_value, num_buckets)
[docs]
@staticmethod
def cot(column: Union[Column, str]) -> ColumnOperation:
"""Compute cotangent (PySpark 3.3+)."""
return MathFunctions.cot(column)
[docs]
@staticmethod
def csc(column: Union[Column, str]) -> ColumnOperation:
"""Compute cosecant (PySpark 3.3+)."""
return MathFunctions.csc(column)
[docs]
@staticmethod
def sec(column: Union[Column, str]) -> ColumnOperation:
"""Compute secant (PySpark 3.3+)."""
return MathFunctions.sec(column)
[docs]
@staticmethod
def e() -> ColumnOperation:
"""Euler's number e (PySpark 3.5+)."""
return MathFunctions.e()
[docs]
@staticmethod
def pi() -> ColumnOperation:
"""Pi constant (PySpark 3.5+)."""
return MathFunctions.pi()
[docs]
@staticmethod
def ln(column: Union[Column, str]) -> ColumnOperation:
"""Natural logarithm (PySpark 3.5+)."""
return MathFunctions.ln(column)
[docs]
@staticmethod
def greatest(*columns: Union[Column, str]) -> ColumnOperation:
"""Greatest value among columns."""
return MathFunctions.greatest(*columns)
[docs]
@staticmethod
def least(*columns: Union[Column, str]) -> ColumnOperation:
"""Least value among columns."""
return MathFunctions.least(*columns)
# Aggregate functions
[docs]
@staticmethod
def count(column: Union[Column, str, None] = None) -> ColumnOperation:
"""Count values."""
return AggregateFunctions.count(column)
[docs]
@staticmethod
def sum(column: Union[Column, str]) -> ColumnOperation:
"""Sum values."""
return AggregateFunctions.sum(column)
[docs]
@staticmethod
def avg(column: Union[Column, str]) -> ColumnOperation:
"""Average values."""
return AggregateFunctions.avg(column)
[docs]
@staticmethod
def max(column: Union[Column, str]) -> ColumnOperation:
"""Maximum value."""
return AggregateFunctions.max(column)
[docs]
@staticmethod
def min(column: Union[Column, str]) -> ColumnOperation:
"""Minimum value."""
return AggregateFunctions.min(column)
[docs]
@staticmethod
def first(
column: Union[Column, str], ignorenulls: bool = False
) -> AggregateFunction:
"""First value."""
return AggregateFunctions.first(column, ignorenulls=ignorenulls)
[docs]
@staticmethod
def last(column: Union[Column, str]) -> AggregateFunction:
"""Last value."""
return AggregateFunctions.last(column)
[docs]
@staticmethod
def collect_list(column: Union[Column, str]) -> AggregateFunction:
"""Collect values into list."""
return AggregateFunctions.collect_list(column)
[docs]
@staticmethod
def collect_set(column: Union[Column, str]) -> AggregateFunction:
"""Collect unique values into set."""
return AggregateFunctions.collect_set(column)
[docs]
@staticmethod
def stddev(column: Union[Column, str]) -> "ColumnOperation": # noqa: F821
"""Standard deviation."""
return AggregateFunctions.stddev(column)
[docs]
@staticmethod
def std(column: Union[Column, str]) -> "ColumnOperation": # noqa: F821
"""Alias for stddev - Standard deviation."""
return AggregateFunctions.std(column)
[docs]
@staticmethod
def product(column: Union[Column, str]) -> AggregateFunction:
"""Multiply all values in column."""
return AggregateFunctions.product(column)
[docs]
@staticmethod
def sum_distinct(column: Union[Column, str]) -> AggregateFunction:
"""Sum of distinct values."""
return AggregateFunctions.sum_distinct(column)
[docs]
@staticmethod
def variance(column: Union[Column, str]) -> "ColumnOperation": # noqa: F821
"""Variance."""
return AggregateFunctions.variance(column)
[docs]
@staticmethod
def skewness(column: Union[Column, str]) -> AggregateFunction:
"""Skewness."""
return AggregateFunctions.skewness(column)
[docs]
@staticmethod
def kurtosis(column: Union[Column, str]) -> AggregateFunction:
"""Kurtosis."""
return AggregateFunctions.kurtosis(column)
[docs]
@staticmethod
def countDistinct(column: Union[Column, str]) -> AggregateFunction:
"""Count distinct values."""
return AggregateFunctions.countDistinct(column)
[docs]
@staticmethod
def count_distinct(column: Union[Column, str]) -> AggregateFunction:
"""Alias for countDistinct - Count distinct values."""
return AggregateFunctions.count_distinct(column)
[docs]
@staticmethod
def percentile_approx(
column: Union[Column, str], percentage: float, accuracy: int = 10000
) -> AggregateFunction:
"""Approximate percentile."""
return AggregateFunctions.percentile_approx(column, percentage, accuracy)
[docs]
@staticmethod
def corr(
column1: Union[Column, str], column2: Union[Column, str]
) -> ColumnOperation:
"""Correlation between two columns."""
return AggregateFunctions.corr(column1, column2)
[docs]
@staticmethod
def covar_samp(
column1: Union[Column, str], column2: Union[Column, str]
) -> ColumnOperation:
"""Sample covariance between two columns."""
return AggregateFunctions.covar_samp(column1, column2)
[docs]
@staticmethod
def mean(column: Union[Column, str]) -> AggregateFunction:
"""Mean of values (alias for avg)."""
return AggregateFunctions.mean(column)
[docs]
@staticmethod
def approx_count_distinct(
column: Union[Column, str], rsd: Optional[float] = None
) -> ColumnOperation:
"""Approximate count of distinct elements.
Args:
column: Column to count distinct values.
rsd: Optional relative standard deviation (default: None, which uses PySpark's default of 0.05).
Controls the approximation accuracy. Lower values provide better accuracy but use more memory.
"""
return AggregateFunctions.approx_count_distinct(column, rsd=rsd)
[docs]
@staticmethod
def stddev_pop(column: Union[Column, str]) -> AggregateFunction:
"""Population standard deviation."""
return AggregateFunctions.stddev_pop(column)
[docs]
@staticmethod
def stddev_samp(column: Union[Column, str]) -> AggregateFunction:
"""Sample standard deviation."""
return AggregateFunctions.stddev_samp(column)
[docs]
@staticmethod
def var_pop(column: Union[Column, str]) -> AggregateFunction:
"""Population variance."""
return AggregateFunctions.var_pop(column)
[docs]
@staticmethod
def var_samp(column: Union[Column, str]) -> AggregateFunction:
"""Sample variance."""
return AggregateFunctions.var_samp(column)
[docs]
@staticmethod
def covar_pop(
column1: Union[Column, str], column2: Union[Column, str]
) -> AggregateFunction:
"""Population covariance."""
return AggregateFunctions.covar_pop(column1, column2)
[docs]
@staticmethod
def mode(column: Union[Column, str]) -> AggregateFunction:
"""Most frequent value (PySpark 3.4+)."""
return AggregateFunctions.mode(column)
[docs]
@staticmethod
def percentile(column: Union[Column, str], percentage: float) -> AggregateFunction:
"""Exact percentile (PySpark 3.5+)."""
return AggregateFunctions.percentile(column, percentage)
[docs]
@staticmethod
def approx_percentile(
column: Union[Column, str],
percentage: Union[float, Column, str],
accuracy: Union[int, Column, str] = 10000,
) -> AggregateFunction:
"""Approximate percentile (PySpark 3.5+)."""
return AggregateFunctions.approx_percentile(column, percentage, accuracy)
[docs]
@staticmethod
def bool_and(column: Union[Column, str]) -> AggregateFunction:
"""Aggregate AND (PySpark 3.1+)."""
return AggregateFunctions.bool_and(column)
[docs]
@staticmethod
def bool_or(column: Union[Column, str]) -> AggregateFunction:
"""Aggregate OR (PySpark 3.1+)."""
return AggregateFunctions.bool_or(column)
[docs]
@staticmethod
def every(column: Union[Column, str]) -> AggregateFunction:
"""Alias for bool_and (PySpark 3.1+)."""
return AggregateFunctions.every(column)
[docs]
@staticmethod
def some(column: Union[Column, str]) -> AggregateFunction:
"""Alias for bool_or (PySpark 3.1+)."""
return AggregateFunctions.some(column)
[docs]
@staticmethod
def max_by(
column: Union[Column, str], ord: Union[Column, str]
) -> AggregateFunction:
"""Value with max of ord column (PySpark 3.1+)."""
return AggregateFunctions.max_by(column, ord)
[docs]
@staticmethod
def min_by(
column: Union[Column, str], ord: Union[Column, str]
) -> AggregateFunction:
"""Value with min of ord column (PySpark 3.1+)."""
return AggregateFunctions.min_by(column, ord)
[docs]
@staticmethod
def count_if(column: Union[Column, str]) -> AggregateFunction:
"""Count where condition is true (PySpark 3.1+)."""
return AggregateFunctions.count_if(column)
[docs]
@staticmethod
def any_value(column: Union[Column, str]) -> AggregateFunction:
"""Return any non-null value (PySpark 3.1+)."""
return AggregateFunctions.any_value(column)
# Datetime functions
[docs]
@staticmethod
def current_timestamp() -> ColumnOperation:
"""Current timestamp.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("current_timestamp function")
return DateTimeFunctions.current_timestamp()
[docs]
@staticmethod
def current_date() -> ColumnOperation:
"""Current date.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("current_date function")
return DateTimeFunctions.current_date()
[docs]
@staticmethod
def version() -> Literal:
"""Return Spark version string (PySpark 3.0+).
Returns:
Literal with sparkless version
"""
from sparkless import __version__
# Return sparkless version as a constant expression
return Literal(f"sparkless-{__version__}")
[docs]
@staticmethod
def to_date(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert to date."""
return DateTimeFunctions.to_date(column, format)
[docs]
@staticmethod
def to_timestamp(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert to timestamp."""
return DateTimeFunctions.to_timestamp(column, format)
[docs]
@staticmethod
def date_from_unix_date(days: Union[Column, str, int]) -> ColumnOperation:
"""Convert unix date (days since epoch) to date (PySpark 3.5+)."""
return DateTimeFunctions.date_from_unix_date(days)
[docs]
@staticmethod
def to_timestamp_ltz(
timestamp_str: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert string to timestamp with local timezone (PySpark 3.5+)."""
return DateTimeFunctions.to_timestamp_ltz(timestamp_str, format)
[docs]
@staticmethod
def to_timestamp_ntz(
timestamp_str: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert string to timestamp with no timezone (PySpark 3.5+)."""
return DateTimeFunctions.to_timestamp_ntz(timestamp_str, format)
[docs]
@staticmethod
def hour(column: Union[Column, str]) -> ColumnOperation:
"""Extract hour."""
return DateTimeFunctions.hour(column)
[docs]
@staticmethod
def day(column: Union[Column, str]) -> ColumnOperation:
"""Extract day."""
return DateTimeFunctions.day(column)
[docs]
@staticmethod
def dayofmonth(column: Union[Column, str]) -> ColumnOperation:
"""Extract day of month (alias for day)."""
return DateTimeFunctions.dayofmonth(column)
[docs]
@staticmethod
def month(column: Union[Column, str]) -> ColumnOperation:
"""Extract month."""
return DateTimeFunctions.month(column)
[docs]
@staticmethod
def year(column: Union[Column, str]) -> ColumnOperation:
"""Extract year."""
return DateTimeFunctions.year(column)
# Conditional functions
[docs]
@staticmethod
def coalesce(*columns: Union[Column, str, Any]) -> ColumnOperation:
"""Return first non-null value."""
return ConditionalFunctions.coalesce(*columns)
[docs]
@staticmethod
def isnull(column: Union[Column, str]) -> ColumnOperation:
"""Check if column is null."""
return ConditionalFunctions.isnull(column)
[docs]
@staticmethod
def isnotnull(column: Union[Column, str]) -> ColumnOperation:
"""Check if column is not null."""
return ConditionalFunctions.isnotnull(column)
[docs]
@staticmethod
def isnan(column: Union[Column, str]) -> ColumnOperation:
"""Check if column is NaN."""
return ConditionalFunctions.isnan(column)
[docs]
@staticmethod
def when(condition: Any, value: Any = None) -> CaseWhen:
"""Start CASE WHEN expression.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("CASE WHEN expression")
if value is not None:
return ConditionalFunctions.when(condition, value)
return ConditionalFunctions.when(condition)
[docs]
@staticmethod
def case_when(*conditions: Tuple[Any, Any], else_value: Any = None) -> CaseWhen:
"""Create CASE WHEN expression with multiple conditions."""
return ConditionalFunctions.case_when(*conditions, else_value=else_value)
[docs]
@staticmethod
def dayofweek(column: Union[Column, str]) -> ColumnOperation:
"""Extract day of week."""
return DateTimeFunctions.dayofweek(column)
[docs]
@staticmethod
def dayofyear(column: Union[Column, str]) -> ColumnOperation:
"""Extract day of year."""
return DateTimeFunctions.dayofyear(column)
[docs]
@staticmethod
def weekofyear(column: Union[Column, str]) -> ColumnOperation:
"""Extract week of year."""
return DateTimeFunctions.weekofyear(column)
[docs]
@staticmethod
def quarter(column: Union[Column, str]) -> ColumnOperation:
"""Extract quarter."""
return DateTimeFunctions.quarter(column)
[docs]
@staticmethod
def now() -> ColumnOperation:
"""Alias for current_timestamp - Get current timestamp."""
return DateTimeFunctions.now()
[docs]
@staticmethod
def curdate() -> ColumnOperation:
"""Alias for current_date - Get current date."""
return DateTimeFunctions.curdate()
[docs]
@staticmethod
def days(column: Union[Column, str, int]) -> ColumnOperation:
"""Convert number to days interval."""
return DateTimeFunctions.days(column)
[docs]
@staticmethod
def hours(column: Union[Column, str, int]) -> ColumnOperation:
"""Convert number to hours interval."""
return DateTimeFunctions.hours(column)
[docs]
@staticmethod
def months(column: Union[Column, str, int]) -> ColumnOperation:
"""Convert number to months interval."""
return DateTimeFunctions.months(column)
# SQL expression function
[docs]
@staticmethod
def expr(expression: str) -> Union[ColumnOperation, Column, "CaseWhen", "Literal"]:
"""Parse SQL expression into a column.
Args:
expression: SQL expression string (e.g., "id IS NOT NULL", "age > 18").
Must use SQL syntax, not Python expressions.
Returns:
ColumnOperation for the expression.
Raises:
RuntimeError: If no active SparkSession is available
ParseException: If SQL syntax is invalid
"""
Functions._require_active_session(f"expression '{expression}'")
# Parse SQL expression instead of storing as raw string
from ..functions.core.sql_expr_parser import SQLExprParser
from ..functions.core.column import ColumnOperation, Column
from ..core.exceptions.analysis import ParseException
try:
parsed = SQLExprParser.parse(expression)
# If parsed result is a Column or ColumnOperation, return it
if isinstance(parsed, ColumnOperation):
# Mark as coming from F.expr() for detection in materialization
setattr(parsed, "_from_expr", True)
return parsed
elif isinstance(parsed, Column):
# Simple column reference - return as ColumnOperation for consistency
# Use a dummy operation to wrap the column
result = ColumnOperation(parsed, "expr", expression)
setattr(result, "_from_expr", True)
return result
# Check for CaseWhen or Literal
from ..functions.core.literals import Literal
if isinstance(parsed, Literal):
# Literal value - wrap in ColumnOperation
result = ColumnOperation(None, "lit", parsed)
setattr(result, "_from_expr", True)
return result
# For CaseWhen or other types, check if it has evaluate method
if hasattr(parsed, "evaluate") and hasattr(parsed, "conditions"):
# CaseWhen object - return directly (it has evaluate() method)
return parsed
# Fallback: wrap unknown types in ColumnOperation
result = ColumnOperation(None, "lit", Literal(parsed))
setattr(result, "_from_expr", True)
return result
except Exception as e:
if isinstance(e, ParseException):
raise
# Fallback to old behavior if parsing fails (for backward compatibility)
# But warn that this might not work correctly
import warnings
warnings.warn(
f"Failed to parse SQL expression '{expression}'. "
f"F.expr() should use SQL syntax (e.g., 'id IS NOT NULL'), "
f"not Python expressions (e.g., \"col('id').isNotNull()\"). "
f"Error: {str(e)}",
UserWarning,
stacklevel=2,
)
from sparkless.functions.base import Column
dummy = Column("__expr__")
operation = ColumnOperation(dummy, "expr", expression, name=expression)
operation.function_name = "expr"
return operation
[docs]
@staticmethod
def minute(column: Union[Column, str]) -> ColumnOperation:
"""Extract minute."""
return DateTimeFunctions.minute(column)
[docs]
@staticmethod
def second(column: Union[Column, str]) -> ColumnOperation:
"""Extract second."""
return DateTimeFunctions.second(column)
[docs]
@staticmethod
def add_months(column: Union[Column, str], num_months: int) -> ColumnOperation:
"""Add months to date."""
return DateTimeFunctions.add_months(column, num_months)
[docs]
@staticmethod
def months_between(
column1: Union[Column, str], column2: Union[Column, str]
) -> ColumnOperation:
"""Calculate months between two dates."""
return DateTimeFunctions.months_between(column1, column2)
[docs]
@staticmethod
def date_add(column: Union[Column, str], days: int) -> ColumnOperation:
"""Add days to date."""
return DateTimeFunctions.date_add(column, days)
[docs]
@staticmethod
def date_sub(column: Union[Column, str], days: int) -> ColumnOperation:
"""Subtract days from date."""
return DateTimeFunctions.date_sub(column, days)
[docs]
@staticmethod
def make_date(
year: Union[Column, int],
month: Union[Column, int],
day: Union[Column, int],
) -> ColumnOperation:
"""Construct date from year, month, day (PySpark 3.0+)."""
return DateTimeFunctions.make_date(year, month, day)
[docs]
@staticmethod
def date_trunc(format: str, timestamp: Union[Column, str]) -> ColumnOperation:
"""Truncate timestamp to specified unit."""
return DateTimeFunctions.date_trunc(format, timestamp)
[docs]
@staticmethod
def datediff(end: Union[Column, str], start: Union[Column, str]) -> ColumnOperation:
"""Number of days between two dates."""
return DateTimeFunctions.datediff(end, start)
[docs]
@staticmethod
def date_diff(
end: Union[Column, str], start: Union[Column, str]
) -> ColumnOperation:
"""Alias for datediff - Returns number of days between two dates."""
return DateTimeFunctions.date_diff(end, start)
[docs]
@staticmethod
def unix_timestamp(
timestamp: Optional[Union[Column, str]] = None,
format: str = "yyyy-MM-dd HH:mm:ss",
) -> ColumnOperation:
"""Convert timestamp to Unix timestamp."""
return DateTimeFunctions.unix_timestamp(timestamp, format)
[docs]
@staticmethod
def last_day(date: Union[Column, str]) -> ColumnOperation:
"""Last day of the month for given date."""
return DateTimeFunctions.last_day(date)
[docs]
@staticmethod
def next_day(date: Union[Column, str], dayOfWeek: str) -> ColumnOperation:
"""First date later than date on specified day of week."""
return DateTimeFunctions.next_day(date, dayOfWeek)
[docs]
@staticmethod
def trunc(date: Union[Column, str], format: str) -> ColumnOperation:
"""Truncate date to specified unit."""
return DateTimeFunctions.trunc(date, format)
[docs]
@staticmethod
def timestamp_seconds(col: Union[Column, str, int]) -> ColumnOperation:
"""Convert seconds since epoch to timestamp (PySpark 3.1+)."""
return DateTimeFunctions.timestamp_seconds(col)
[docs]
@staticmethod
def weekday(col: Union[Column, str]) -> ColumnOperation:
"""Day of week as integer (0=Monday, 6=Sunday) (PySpark 3.5+)."""
return DateTimeFunctions.weekday(col)
[docs]
@staticmethod
def raise_error(msg: Union[Column, str]) -> ColumnOperation:
"""Raise an error with the specified message (PySpark 3.1+).
Args:
msg: Error message
Returns:
ColumnOperation representing the raise_error function
"""
if isinstance(msg, str):
from sparkless.functions.core.literals import Literal
msg = Literal(msg) # type: ignore[assignment]
return ColumnOperation(
msg,
"raise_error",
name=f"raise_error({msg})",
)
[docs]
@staticmethod
def from_unixtime(
column: Union[Column, str], format: str = "yyyy-MM-dd HH:mm:ss"
) -> ColumnOperation:
"""Convert unix timestamp to string."""
return DateTimeFunctions.from_unixtime(column, format)
[docs]
@staticmethod
def timestampadd(
unit: str, quantity: Union[int, Column], timestamp: Union[str, Column]
) -> ColumnOperation:
"""Add time units to a timestamp."""
return DateTimeFunctions.timestampadd(unit, quantity, timestamp)
[docs]
@staticmethod
def timestampdiff(
unit: str, start: Union[str, Column], end: Union[str, Column]
) -> ColumnOperation:
"""Calculate difference between two timestamps."""
return DateTimeFunctions.timestampdiff(unit, start, end)
[docs]
@staticmethod
def nvl(column: Union[Column, str], default_value: Any) -> ColumnOperation:
"""Return default if null. PySpark uses coalesce internally."""
# Use coalesce for SQL generation compatibility
from .conditional import ConditionalFunctions
return ConditionalFunctions.coalesce(column, default_value)
[docs]
@staticmethod
def nvl2(
column: Union[Column, str], value_if_not_null: Any, value_if_null: Any
) -> Any:
"""Return value based on null check. PySpark uses when/otherwise internally."""
# Use when/otherwise for SQL generation compatibility
from .conditional import ConditionalFunctions
from sparkless.functions.base import Column
# Convert string to Column if needed
col = Column(column) if isinstance(column, str) else column
# nvl2 should check if column IS NULL, not if column is truthy
return ConditionalFunctions.when(col.isNull(), value_if_null).otherwise(
value_if_not_null
)
[docs]
@staticmethod
def equal_null(
col1: Union[Column, str], col2: Union[Column, str, Any]
) -> ColumnOperation:
"""Equality check that treats NULL as equal."""
from .conditional import ConditionalFunctions
return ConditionalFunctions.equal_null(col1, col2)
# Window functions
[docs]
@staticmethod
def row_number() -> ColumnOperation:
"""Row number window function.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("row_number window function")
# Create a special column for functions without input
from sparkless.functions.base import Column
from sparkless.spark_types import IntegerType
dummy_column = Column("__row_number__")
operation = ColumnOperation(dummy_column, "row_number")
operation.name = "row_number()"
operation.function_name = "row_number"
operation.return_type = IntegerType(nullable=False)
return operation
[docs]
@staticmethod
def rank() -> ColumnOperation:
"""Rank window function.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("rank window function")
# Create a special column for functions without input
from sparkless.functions.base import Column
dummy_column = Column("__rank__")
operation = ColumnOperation(dummy_column, "rank")
operation.name = "rank()"
operation.function_name = "rank"
return operation
[docs]
@staticmethod
def dense_rank() -> ColumnOperation:
"""Dense rank window function.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("dense_rank window function")
# Create a special column for functions without input
from sparkless.functions.base import Column
dummy_column = Column("__dense_rank__")
operation = ColumnOperation(dummy_column, "dense_rank")
operation.name = "dense_rank()"
operation.function_name = "dense_rank"
return operation
[docs]
@staticmethod
def lag(
column: Union[Column, str], offset: int = 1, default: Any = None
) -> ColumnOperation:
"""Lag window function.
Args:
column: The column to lag.
offset: Number of rows to look back. Default is 1.
default: Default value if offset goes beyond partition. Default is None.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("lag window function")
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "lag", (offset, default))
operation.name = f"lag({column.name}, {offset})"
operation.function_name = "lag"
return operation
[docs]
@staticmethod
def lead(
column: Union[Column, str], offset: int = 1, default: Any = None
) -> ColumnOperation:
"""Lead window function.
Args:
column: The column to lead.
offset: Number of rows to look forward. Default is 1.
default: Default value if offset goes beyond partition. Default is None.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("lead window function")
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "lead", (offset, default))
operation.name = f"lead({column.name}, {offset})"
operation.function_name = "lead"
return operation
[docs]
@staticmethod
def nth_value(column: Union[Column, str], n: int) -> ColumnOperation:
"""Nth value window function.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("nth_value window function")
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "nth_value", n)
operation.name = f"nth_value({column.name}, {n})"
operation.function_name = "nth_value"
return operation
[docs]
@staticmethod
def ntile(n: int) -> ColumnOperation:
"""NTILE window function.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("ntile window function")
from sparkless.functions.base import Column
dummy_column = Column("__ntile__")
operation = ColumnOperation(dummy_column, "ntile", n)
operation.name = f"ntile({n})"
operation.function_name = "ntile"
return operation
[docs]
@staticmethod
def cume_dist() -> ColumnOperation:
"""Cumulative distribution window function.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("cume_dist window function")
from sparkless.functions.base import Column
dummy_column = Column("__cume_dist__")
operation = ColumnOperation(dummy_column, "cume_dist")
operation.name = "cume_dist()"
operation.function_name = "cume_dist"
return operation
[docs]
@staticmethod
def percent_rank() -> ColumnOperation:
"""Percent rank window function.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("percent_rank window function")
from sparkless.functions.base import Column
dummy_column = Column("__percent_rank__")
operation = ColumnOperation(dummy_column, "percent_rank")
operation.name = "percent_rank()"
operation.function_name = "percent_rank"
return operation
[docs]
@staticmethod
def first_value(column: Union[Column, str]) -> ColumnOperation:
"""First value window function.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("first_value window function")
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "first_value")
operation.name = f"first_value({column.name})"
operation.function_name = "first_value"
return operation
[docs]
@staticmethod
def last_value(column: Union[Column, str]) -> ColumnOperation:
"""Last value window function.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("last_value window function")
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "last_value")
operation.name = f"last_value({column.name})"
operation.function_name = "last_value"
return operation
[docs]
@staticmethod
def desc(column: Union[Column, str]) -> ColumnOperation:
"""Create descending order column."""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "desc", None, name=f"{column.name} DESC")
operation.function_name = "desc"
return operation
# Array functions
[docs]
@staticmethod
def array(*cols: Union[Column, str]) -> ColumnOperation:
"""Create array from columns (PySpark 3.0+)."""
return ArrayFunctions.array(*cols)
[docs]
@staticmethod
def array_repeat(col: Union[Column, str], count: int) -> ColumnOperation:
"""Repeat value to create array (PySpark 3.0+)."""
return ArrayFunctions.array_repeat(col, count)
[docs]
@staticmethod
def sort_array(col: Union[Column, str], asc: bool = True) -> ColumnOperation:
"""Sort array elements (PySpark 3.0+)."""
return ArrayFunctions.sort_array(col, asc)
[docs]
@staticmethod
def array_agg(col: Union[Column, str]) -> AggregateFunction:
"""Aggregate values into array (PySpark 3.5+)."""
return ArrayFunctions.array_agg(col)
[docs]
@staticmethod
def cardinality(col: Union[Column, str]) -> ColumnOperation:
"""Return size of array or map (PySpark 3.5+)."""
return ArrayFunctions.cardinality(col)
[docs]
@staticmethod
def array_distinct(column: Union[Column, str]) -> ColumnOperation:
"""Remove duplicate elements from array."""
return ArrayFunctions.array_distinct(column)
[docs]
@staticmethod
def array_intersect(
column1: Union[Column, str], column2: Union[Column, str]
) -> ColumnOperation:
"""Intersection of two arrays."""
return ArrayFunctions.array_intersect(column1, column2)
[docs]
@staticmethod
def array_union(
column1: Union[Column, str], column2: Union[Column, str]
) -> ColumnOperation:
"""Union of two arrays."""
return ArrayFunctions.array_union(column1, column2)
[docs]
@staticmethod
def array_except(
column1: Union[Column, str], column2: Union[Column, str]
) -> ColumnOperation:
"""Elements in first array but not second."""
return ArrayFunctions.array_except(column1, column2)
[docs]
@staticmethod
def array_position(column: Union[Column, str], value: Any) -> ColumnOperation:
"""Position of element in array."""
return ArrayFunctions.array_position(column, value)
[docs]
@staticmethod
def array_remove(column: Union[Column, str], value: Any) -> ColumnOperation:
"""Remove all occurrences of element from array."""
return ArrayFunctions.array_remove(column, value)
# Higher-order array functions (PySpark 3.2+)
[docs]
@staticmethod
def filter(
column: Union[Column, str], function: Callable[[Any], bool]
) -> ColumnOperation:
"""Filter array elements with predicate."""
return ArrayFunctions.filter(column, function)
[docs]
@staticmethod
def exists(
column: Union[Column, str], function: Callable[[Any], bool]
) -> ColumnOperation:
"""Check if any element satisfies predicate."""
return ArrayFunctions.exists(column, function)
[docs]
@staticmethod
def forall(
column: Union[Column, str], function: Callable[[Any], bool]
) -> ColumnOperation:
"""Check if all elements satisfy predicate."""
return ArrayFunctions.forall(column, function)
[docs]
@staticmethod
def aggregate(
column: Union[Column, str],
initial_value: Any,
merge: Callable[[Any, Any], Any],
finish: Optional[Callable[[Any], Any]] = None,
) -> ColumnOperation:
"""Aggregate array elements to single value."""
return ArrayFunctions.aggregate(column, initial_value, merge, finish)
[docs]
@staticmethod
def zip_with(
left: Union[Column, str],
right: Union[Column, str],
function: Callable[[Any, Any], Any],
) -> ColumnOperation:
"""Merge two arrays element-wise."""
return ArrayFunctions.zip_with(left, right, function)
# Basic array functions (PySpark 3.2+)
[docs]
@staticmethod
def array_compact(column: Union[Column, str]) -> ColumnOperation:
"""Remove null values from array."""
return ArrayFunctions.array_compact(column)
[docs]
@staticmethod
def slice(column: Union[Column, str], start: int, length: int) -> ColumnOperation:
"""Extract array slice."""
return ArrayFunctions.slice(column, start, length)
[docs]
@staticmethod
def element_at(column: Union[Column, str], index: int) -> ColumnOperation:
"""Get element at index."""
return ArrayFunctions.element_at(column, index)
[docs]
@staticmethod
def array_append(column: Union[Column, str], element: Any) -> ColumnOperation:
"""Append element to array."""
return ArrayFunctions.array_append(column, element)
[docs]
@staticmethod
def array_prepend(column: Union[Column, str], element: Any) -> ColumnOperation:
"""Prepend element to array."""
return ArrayFunctions.array_prepend(column, element)
[docs]
@staticmethod
def array_insert(
column: Union[Column, str], pos: int, value: Any
) -> ColumnOperation:
"""Insert element at position."""
return ArrayFunctions.array_insert(column, pos, value)
[docs]
@staticmethod
def array_size(column: Union[Column, str]) -> ColumnOperation:
"""Get array length."""
return ArrayFunctions.array_size(column)
[docs]
@staticmethod
def array_sort(column: Union[Column, str]) -> ColumnOperation:
"""Sort array elements."""
return ArrayFunctions.array_sort(column)
[docs]
@staticmethod
def arrays_overlap(
column1: Union[Column, str], column2: Union[Column, str]
) -> ColumnOperation:
"""Check if arrays have common elements."""
return ArrayFunctions.arrays_overlap(column1, column2)
[docs]
@staticmethod
def array_contains(column: Union[Column, str], value: Any) -> ColumnOperation:
"""Check if array contains value."""
return ArrayFunctions.array_contains(column, value)
[docs]
@staticmethod
def array_max(column: Union[Column, str]) -> ColumnOperation:
"""Return maximum value from array."""
return ArrayFunctions.array_max(column)
[docs]
@staticmethod
def array_min(column: Union[Column, str]) -> ColumnOperation:
"""Return minimum value from array."""
return ArrayFunctions.array_min(column)
[docs]
@staticmethod
def explode(column: Union[Column, str]) -> ColumnOperation:
"""Returns a new row for each element in array or map."""
return ArrayFunctions.explode(column)
[docs]
@staticmethod
def size(column: Union[Column, str]) -> ColumnOperation:
"""Return size of array or map."""
return ArrayFunctions.size(column)
[docs]
@staticmethod
def flatten(column: Union[Column, str]) -> ColumnOperation:
"""Flatten array of arrays into single array."""
return ArrayFunctions.flatten(column)
[docs]
@staticmethod
def reverse(column: Union[Column, str]) -> ColumnOperation:
"""Reverse string or array elements. Defaults to string reverse."""
# Default to string reverse (more common use case)
# If array reverse is needed, use ArrayFunctions.reverse() directly
return StringFunctions.reverse(column)
[docs]
@staticmethod
def explode_outer(column: Union[Column, str]) -> ColumnOperation:
"""Explode array including null/empty arrays."""
return ArrayFunctions.explode_outer(column)
[docs]
@staticmethod
def posexplode(column: Union[Column, str]) -> ColumnOperation:
"""Explode array with position."""
return ArrayFunctions.posexplode(column)
[docs]
@staticmethod
def posexplode_outer(column: Union[Column, str]) -> ColumnOperation:
"""Explode array with position including null/empty."""
return ArrayFunctions.posexplode_outer(column)
[docs]
@staticmethod
def arrays_zip(*columns: Union[Column, str]) -> ColumnOperation:
"""Merge arrays into array of structs."""
return ArrayFunctions.arrays_zip(*columns)
[docs]
@staticmethod
def sequence(
start: Union[Column, str, int],
stop: Union[Column, str, int],
step: Union[Column, str, int] = 1,
) -> ColumnOperation:
"""Generate array sequence from start to stop."""
return ArrayFunctions.sequence(start, stop, step)
[docs]
@staticmethod
def shuffle(column: Union[Column, str]) -> ColumnOperation:
"""Randomly shuffle array elements."""
return ArrayFunctions.shuffle(column)
# Map functions
[docs]
@staticmethod
def map_keys(column: Union[Column, str]) -> ColumnOperation:
"""Get all keys from map."""
return MapFunctions.map_keys(column)
[docs]
@staticmethod
def map_values(column: Union[Column, str]) -> ColumnOperation:
"""Get all values from map."""
return MapFunctions.map_values(column)
[docs]
@staticmethod
def map_entries(column: Union[Column, str]) -> ColumnOperation:
"""Get key-value pairs as array of structs."""
return MapFunctions.map_entries(column)
[docs]
@staticmethod
def map_concat(*columns: Union[Column, str]) -> ColumnOperation:
"""Concatenate multiple maps."""
return MapFunctions.map_concat(*columns)
[docs]
@staticmethod
def map_from_arrays(
keys: Union[Column, str], values: Union[Column, str]
) -> ColumnOperation:
"""Create map from key and value arrays."""
return MapFunctions.map_from_arrays(keys, values)
# Advanced map functions (PySpark 3.2+)
[docs]
@staticmethod
def create_map(*cols: Union[Column, str, Any]) -> ColumnOperation:
"""Create map from key-value pairs."""
return MapFunctions.create_map(*cols)
[docs]
@staticmethod
def map_contains_key(column: Union[Column, str], key: Any) -> ColumnOperation:
"""Check if map contains key."""
return MapFunctions.map_contains_key(column, key)
[docs]
@staticmethod
def map_from_entries(column: Union[Column, str]) -> ColumnOperation:
"""Convert array of structs to map."""
return MapFunctions.map_from_entries(column)
[docs]
@staticmethod
def map_filter(
column: Union[Column, str], function: Callable[[Any, Any], bool]
) -> ColumnOperation:
"""Filter map entries with predicate."""
return MapFunctions.map_filter(column, function)
[docs]
@staticmethod
def map_zip_with(
col1: Union[Column, str],
col2: Union[Column, str],
function: Callable[[Any, Any, Any], Any],
) -> ColumnOperation:
"""Merge two maps using function (PySpark 3.1+)."""
return MapFunctions.map_zip_with(col1, col2, function)
# Struct functions (PySpark 3.2+)
[docs]
@staticmethod
def struct(*cols: Union[Column, str]) -> ColumnOperation:
"""Create a struct column from given columns."""
if not cols:
raise ValueError("struct requires at least one column")
# Check if first column is a Literal - if so, store all columns in value
# Generate name with actual column/literal values
from sparkless.core.type_utils import get_expression_name, is_literal
col_names = [get_expression_name(col) for col in cols]
struct_name = f"struct({', '.join(col_names)})"
# Check if first column is a Literal (runtime check, not in type annotation)
if is_literal(cols[0]):
# Use a dummy column and store all columns in value
base_col = Column("__struct_dummy__")
return ColumnOperation(
base_col,
"struct",
value=cols, # Store all columns including the first one
name=struct_name,
)
else:
# Use first column as base
base_col = cols[0] if isinstance(cols[0], Column) else Column(str(cols[0]))
return ColumnOperation(
base_col,
"struct",
value=cols[1:] if len(cols) > 1 else None,
name=struct_name,
)
[docs]
@staticmethod
def named_struct(*cols: Any) -> ColumnOperation:
"""Create a struct column with named fields.
Args:
*cols: Alternating field names (strings) and column values.
"""
if len(cols) < 2 or len(cols) % 2 != 0:
raise ValueError("named_struct requires alternating field names and values")
# Use first value column as base (skip first name)
base_col = cols[1] if isinstance(cols[1], Column) else Column(str(cols[1]))
return ColumnOperation(
base_col,
"named_struct",
value=cols,
name="named_struct(...)",
)
# Bitwise functions (PySpark 3.2+)
[docs]
@staticmethod
def bit_count(column: Union[Column, str]) -> ColumnOperation:
"""Count set bits."""
return BitwiseFunctions.bit_count(column)
[docs]
@staticmethod
def bit_get(column: Union[Column, str], pos: int) -> ColumnOperation:
"""Get bit at position."""
return BitwiseFunctions.bit_get(column, pos)
[docs]
@staticmethod
def getbit(column: Union[Column, str], pos: int) -> ColumnOperation:
"""Get bit at position (alias for bit_get) (PySpark 3.5+)."""
return BitwiseFunctions.getbit(column, pos)
# Bitmap Functions (PySpark 3.5+)
[docs]
@staticmethod
def bitmap_bit_position(column: Union[Column, str]) -> ColumnOperation:
"""Get the bit position in a bitmap (PySpark 3.5+)."""
return BitwiseFunctions.bitmap_bit_position(column)
[docs]
@staticmethod
def bitmap_bucket_number(column: Union[Column, str]) -> ColumnOperation:
"""Get the bucket number in a bitmap (PySpark 3.5+)."""
return BitwiseFunctions.bitmap_bucket_number(column)
[docs]
@staticmethod
def bitmap_construct_agg(column: Union[Column, str]) -> AggregateFunction:
"""Aggregate function - construct bitmap from values (PySpark 3.5+)."""
return BitwiseFunctions.bitmap_construct_agg(column)
[docs]
@staticmethod
def bitmap_count(column: Union[Column, str]) -> ColumnOperation:
"""Count the number of set bits in a bitmap (PySpark 3.5+)."""
return BitwiseFunctions.bitmap_count(column)
[docs]
@staticmethod
def bitmap_or_agg(column: Union[Column, str]) -> AggregateFunction:
"""Aggregate function - bitwise OR of bitmaps (PySpark 3.5+)."""
return BitwiseFunctions.bitmap_or_agg(column)
[docs]
@staticmethod
def bitwise_not(column: Union[Column, str]) -> ColumnOperation:
"""Bitwise NOT."""
return BitwiseFunctions.bitwise_not(column)
[docs]
@staticmethod
def bit_and(column: Union[Column, str]) -> AggregateFunction:
"""Bitwise AND aggregate (PySpark 3.5+)."""
return BitwiseFunctions.bit_and(column)
[docs]
@staticmethod
def bit_or(column: Union[Column, str]) -> AggregateFunction:
"""Bitwise OR aggregate (PySpark 3.5+)."""
return BitwiseFunctions.bit_or(column)
[docs]
@staticmethod
def bit_xor(column: Union[Column, str]) -> AggregateFunction:
"""Bitwise XOR aggregate (PySpark 3.5+)."""
return BitwiseFunctions.bit_xor(column)
# Timezone functions (PySpark 3.2+)
[docs]
@staticmethod
def convert_timezone(
sourceTz: str, targetTz: str, sourceTs: Union[Column, str]
) -> ColumnOperation:
"""Convert timestamp between timezones."""
return DateTimeFunctions.convert_timezone(sourceTz, targetTz, sourceTs)
[docs]
@staticmethod
def current_timezone() -> ColumnOperation:
"""Get current timezone.
Raises:
RuntimeError: If no active SparkSession is available
"""
Functions._require_active_session("current_timezone function")
return DateTimeFunctions.current_timezone()
[docs]
@staticmethod
def from_utc_timestamp(ts: Union[Column, str], tz: str) -> ColumnOperation:
"""Convert UTC timestamp to timezone."""
return DateTimeFunctions.from_utc_timestamp(ts, tz)
[docs]
@staticmethod
def to_utc_timestamp(ts: Union[Column, str], tz: str) -> ColumnOperation:
"""Convert timestamp to UTC."""
return DateTimeFunctions.to_utc_timestamp(ts, tz)
# URL functions (PySpark 3.2+)
[docs]
@staticmethod
def parse_url(url: Union[Column, str], part: str) -> ColumnOperation:
"""Extract part from URL."""
return StringFunctions.parse_url(url, part)
[docs]
@staticmethod
def url_encode(url: Union[Column, str]) -> ColumnOperation:
"""URL-encode string."""
return StringFunctions.url_encode(url)
[docs]
@staticmethod
def url_decode(url: Union[Column, str]) -> ColumnOperation:
"""URL-decode string."""
return StringFunctions.url_decode(url)
[docs]
@staticmethod
def overlay(
src: Union[Column, str],
replace: Union[Column, str],
pos: Union[Column, int],
len: Union[Column, int] = -1,
) -> ColumnOperation:
"""Replace part of string (PySpark 3.0+)."""
return StringFunctions.overlay(src, replace, pos, len)
# Miscellaneous functions (PySpark 3.2+)
[docs]
@staticmethod
def date_part(field: str, source: Union[Column, str]) -> ColumnOperation:
"""Extract date/time part."""
return DateTimeFunctions.date_part(field, source)
[docs]
@staticmethod
def dayname(date: Union[Column, str]) -> ColumnOperation:
"""Get day of week name."""
return DateTimeFunctions.dayname(date)
[docs]
@staticmethod
def assert_true(
condition: Union[Column, ColumnOperation],
) -> ColumnOperation:
"""Assert condition is true."""
return ConditionalFunctions.assert_true(condition)
[docs]
@staticmethod
def ifnull(col1: Union[Column, str], col2: Union[Column, str]) -> ColumnOperation:
"""Return col2 if col1 is null (PySpark 3.5+)."""
return ConditionalFunctions.ifnull(col1, col2)
[docs]
@staticmethod
def nullif(col1: Union[Column, str], col2: Union[Column, str]) -> ColumnOperation:
"""Return null if col1 equals col2 (PySpark 3.5+)."""
return ConditionalFunctions.nullif(col1, col2)
# Null-safe try_* functions (PySpark 3.5+)
[docs]
@staticmethod
def try_add(
left: Union[Column, str, int, float],
right: Union[Column, str, int, float],
) -> ColumnOperation:
"""Null-safe addition - returns NULL on error (PySpark 3.5+)."""
from .conditional import ConditionalFunctions
return ConditionalFunctions.try_add(left, right)
[docs]
@staticmethod
def try_subtract(
left: Union[Column, str, int, float],
right: Union[Column, str, int, float],
) -> ColumnOperation:
"""Null-safe subtraction - returns NULL on error (PySpark 3.5+)."""
from .conditional import ConditionalFunctions
return ConditionalFunctions.try_subtract(left, right)
[docs]
@staticmethod
def try_multiply(
left: Union[Column, str, int, float],
right: Union[Column, str, int, float],
) -> ColumnOperation:
"""Null-safe multiplication - returns NULL on error (PySpark 3.5+)."""
from .conditional import ConditionalFunctions
return ConditionalFunctions.try_multiply(left, right)
[docs]
@staticmethod
def try_divide(
left: Union[Column, str, int, float],
right: Union[Column, str, int, float],
) -> ColumnOperation:
"""Null-safe division - returns NULL on error (PySpark 3.5+)."""
from .conditional import ConditionalFunctions
return ConditionalFunctions.try_divide(left, right)
[docs]
@staticmethod
def try_sum(column: Union[Column, str]) -> AggregateFunction:
"""Null-safe sum aggregate - returns NULL on error (PySpark 3.5+)."""
from .conditional import ConditionalFunctions
return ConditionalFunctions.try_sum(column)
[docs]
@staticmethod
def try_avg(column: Union[Column, str]) -> AggregateFunction:
"""Null-safe average aggregate - returns NULL on error (PySpark 3.5+)."""
from .conditional import ConditionalFunctions
return ConditionalFunctions.try_avg(column)
[docs]
@staticmethod
def try_element_at(
column: Union[Column, str], index: Union[Column, str, int]
) -> ColumnOperation:
"""Null-safe element_at - returns NULL on error (PySpark 3.5+)."""
from .conditional import ConditionalFunctions
return ConditionalFunctions.try_element_at(column, index)
[docs]
@staticmethod
def try_to_binary(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Null-safe to_binary - returns NULL on error (PySpark 3.5+)."""
from .conditional import ConditionalFunctions
return ConditionalFunctions.try_to_binary(column, format)
[docs]
@staticmethod
def try_to_number(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Null-safe to_number - returns NULL on error (PySpark 3.5+)."""
from .conditional import ConditionalFunctions
return ConditionalFunctions.try_to_number(column, format)
[docs]
@staticmethod
def try_to_timestamp(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Null-safe to_timestamp - returns NULL on error (PySpark 3.5+)."""
from .conditional import ConditionalFunctions
return ConditionalFunctions.try_to_timestamp(column, format)
# XML functions (PySpark 3.2+)
[docs]
@staticmethod
def from_xml(col: Union[Column, str], schema: str) -> ColumnOperation:
"""Parse XML string to struct."""
return XMLFunctions.from_xml(col, schema)
[docs]
@staticmethod
def to_xml(col: Union[Column, ColumnOperation]) -> ColumnOperation:
"""Convert struct to XML string."""
return XMLFunctions.to_xml(col)
[docs]
@staticmethod
def schema_of_xml(col: Union[Column, str]) -> ColumnOperation:
"""Infer schema from XML."""
return XMLFunctions.schema_of_xml(col)
[docs]
@staticmethod
def xpath(xml: Union[Column, str], path: str) -> ColumnOperation:
"""Extract array from XML using XPath."""
return XMLFunctions.xpath(xml, path)
[docs]
@staticmethod
def xpath_boolean(xml: Union[Column, str], path: str) -> ColumnOperation:
"""Extract boolean from XML using XPath."""
return XMLFunctions.xpath_boolean(xml, path)
[docs]
@staticmethod
def xpath_double(xml: Union[Column, str], path: str) -> ColumnOperation:
"""Extract double from XML using XPath."""
return XMLFunctions.xpath_double(xml, path)
[docs]
@staticmethod
def xpath_float(xml: Union[Column, str], path: str) -> ColumnOperation:
"""Extract float from XML using XPath."""
return XMLFunctions.xpath_float(xml, path)
[docs]
@staticmethod
def xpath_int(xml: Union[Column, str], path: str) -> ColumnOperation:
"""Extract integer from XML using XPath."""
return XMLFunctions.xpath_int(xml, path)
[docs]
@staticmethod
def xpath_long(xml: Union[Column, str], path: str) -> ColumnOperation:
"""Extract long from XML using XPath."""
return XMLFunctions.xpath_long(xml, path)
[docs]
@staticmethod
def xpath_short(xml: Union[Column, str], path: str) -> ColumnOperation:
"""Extract short from XML using XPath."""
return XMLFunctions.xpath_short(xml, path)
[docs]
@staticmethod
def xpath_string(xml: Union[Column, str], path: str) -> ColumnOperation:
"""Extract string from XML using XPath."""
return XMLFunctions.xpath_string(xml, path)
# JSON/CSV functions
[docs]
@staticmethod
def from_json(
column: Union[Column, str],
schema: Any,
options: Optional[Dict[str, Any]] = None,
) -> ColumnOperation:
"""Parse JSON string into struct/array."""
from sparkless.functions.json_csv import JSONCSVFunctions
return JSONCSVFunctions.from_json(column, schema, options)
[docs]
@staticmethod
def to_json(column: Union[Column, str]) -> ColumnOperation:
"""Convert struct/array to JSON string."""
from sparkless.functions.json_csv import JSONCSVFunctions
return JSONCSVFunctions.to_json(column)
[docs]
@staticmethod
def get_json_object(column: Union[Column, str], path: str) -> ColumnOperation:
"""Extract JSON object at path."""
from sparkless.functions.json_csv import JSONCSVFunctions
return JSONCSVFunctions.get_json_object(column, path)
[docs]
@staticmethod
def json_tuple(column: Union[Column, str], *fields: str) -> ColumnOperation:
"""Extract multiple fields from JSON."""
from sparkless.functions.json_csv import JSONCSVFunctions
return JSONCSVFunctions.json_tuple(column, *fields)
[docs]
@staticmethod
def schema_of_json(json_string: str) -> ColumnOperation:
"""Infer schema from JSON string."""
from sparkless.functions.json_csv import JSONCSVFunctions
return JSONCSVFunctions.schema_of_json(json_string)
[docs]
@staticmethod
def from_csv(
column: Union[Column, str],
schema: Any,
options: Optional[Dict[str, Any]] = None,
) -> ColumnOperation:
"""Parse CSV string into struct."""
from sparkless.functions.json_csv import JSONCSVFunctions
return JSONCSVFunctions.from_csv(column, schema, options)
[docs]
@staticmethod
def to_csv(column: Union[Column, str]) -> ColumnOperation:
"""Convert struct to CSV string."""
from sparkless.functions.json_csv import JSONCSVFunctions
return JSONCSVFunctions.to_csv(column)
[docs]
@staticmethod
def schema_of_csv(csv_string: str) -> ColumnOperation:
"""Infer schema from CSV string."""
from sparkless.functions.json_csv import JSONCSVFunctions
return JSONCSVFunctions.schema_of_csv(csv_string)
# Column ordering functions
[docs]
@staticmethod
def asc(column: Union[Column, str]) -> ColumnOperation:
"""Sort ascending."""
from sparkless.functions.ordering import OrderingFunctions
return OrderingFunctions.asc(column)
[docs]
@staticmethod
def asc_nulls_first(column: Union[Column, str]) -> ColumnOperation:
"""Sort ascending, nulls first."""
from sparkless.functions.ordering import OrderingFunctions
return OrderingFunctions.asc_nulls_first(column)
[docs]
@staticmethod
def asc_nulls_last(column: Union[Column, str]) -> ColumnOperation:
"""Sort ascending, nulls last."""
from sparkless.functions.ordering import OrderingFunctions
return OrderingFunctions.asc_nulls_last(column)
[docs]
@staticmethod
def desc_nulls_first(column: Union[Column, str]) -> ColumnOperation:
"""Sort descending, nulls first."""
from sparkless.functions.ordering import OrderingFunctions
return OrderingFunctions.desc_nulls_first(column)
[docs]
@staticmethod
def desc_nulls_last(column: Union[Column, str]) -> ColumnOperation:
"""Sort descending, nulls last."""
from sparkless.functions.ordering import OrderingFunctions
return OrderingFunctions.desc_nulls_last(column)
# Metadata/utility functions
[docs]
@staticmethod
def monotonically_increasing_id() -> ColumnOperation:
"""Generate monotonically increasing ID."""
from sparkless.functions.metadata import MetadataFunctions
return MetadataFunctions.monotonically_increasing_id()
[docs]
@staticmethod
def spark_partition_id() -> ColumnOperation:
"""Return partition ID."""
from sparkless.functions.metadata import MetadataFunctions
return MetadataFunctions.spark_partition_id()
[docs]
@staticmethod
def broadcast(df: Any) -> Any:
"""Mark DataFrame for broadcast (hint)."""
from sparkless.functions.metadata import MetadataFunctions
return MetadataFunctions.broadcast(df)
[docs]
@staticmethod
def column(col_name: str) -> Column:
"""Create column reference (alias for col)."""
from sparkless.functions.metadata import MetadataFunctions
return MetadataFunctions.column(col_name)
[docs]
@staticmethod
def grouping(column: Union[Column, str]) -> ColumnOperation:
"""Grouping indicator for CUBE/ROLLUP."""
from sparkless.functions.metadata import GroupingFunctions
return GroupingFunctions.grouping(column)
[docs]
@staticmethod
def grouping_id(*cols: Union[Column, str]) -> ColumnOperation:
"""Grouping ID for CUBE/ROLLUP."""
from sparkless.functions.metadata import GroupingFunctions
return GroupingFunctions.grouping_id(*cols)
[docs]
@staticmethod
def udf(
f: Optional[Callable[..., Any]] = None, returnType: Any = None
) -> Callable[..., Any]:
"""Create a user-defined function (all PySpark versions).
Args:
f: Python function to wrap, or DataType if used as decorator with returnType
returnType: Return type of the function (defaults to StringType)
Returns:
Wrapped function that can be used in DataFrame operations
Example:
>>> from sparkless.sql import SparkSession, functions as F
>>> from sparkless.spark_types import IntegerType
>>> spark = SparkSession("test")
>>> square = F.udf(lambda x: x * x, IntegerType())
>>> df = spark.createDataFrame([{"value": 5}])
>>> df.select(square("value").alias("squared")).show()
# Decorator pattern:
>>> @F.udf(IntegerType())
>>> def square(x):
... return x * x
>>> df.select(square("value")).show()
"""
from sparkless.spark_types import DataType, StringType
# Handle decorator pattern: @udf(DataType()) where DataType is passed as first arg
# When used as @udf(T.StringType()), the DataType instance is passed as f
# Check if f is a DataType instance (not a callable function)
if f is not None and isinstance(f, DataType):
# f is actually the returnType, not a function
returnType = f
f = None
if returnType is None:
returnType = StringType()
def udf_wrapper(func: Callable[..., Any]) -> Callable[..., Any]:
"""Wrap function to create ColumnOperation."""
def apply_udf(*cols: Union[Column, str]) -> ColumnOperation:
# Convert string column names to Column objects
column_objs = []
for col in cols:
if isinstance(col, str):
column_objs.append(Column(col))
else:
column_objs.append(col)
# Create the first column operation
if not column_objs:
raise ValueError("UDF requires at least one column argument")
first_col = column_objs[0]
# Get column name safely
col_name = getattr(first_col, "name", str(first_col))
# Generate name with all column names
if len(column_objs) == 1:
udf_name = f"udf({col_name})"
else:
all_names = [getattr(c, "name", str(c)) for c in column_objs]
udf_name = f"udf({', '.join(all_names)})"
# Create a UDF operation that stores the function
op = ColumnOperation(first_col, "udf", name=udf_name)
op._udf_func = func
op._udf_return_type = returnType
op._udf_cols = column_objs
return op
return apply_udf
# Support decorator pattern: @udf or udf(lambda x: x)
if f is None:
return udf_wrapper
else:
return udf_wrapper(f)
[docs]
@staticmethod
def pandas_udf(
f: Optional[Any] = None, returnType: Any = None, functionType: Any = None
) -> Any:
"""Create a Pandas UDF (vectorized UDF) (all PySpark versions).
Pandas UDFs are user-defined functions that execute vectorized operations
using Pandas Series/DataFrame, providing better performance than row-at-a-time UDFs.
Args:
f: Python function to wrap OR return type (if used as decorator)
returnType: Return type of the function (defaults to StringType)
functionType: Type of Pandas UDF (optional, for compatibility)
Returns:
Wrapped function that can be used in DataFrame operations
Example:
>>> from sparkless.sql import SparkSession, functions as F
>>> from sparkless.spark_types import IntegerType
>>> spark = SparkSession("test")
>>> @F.pandas_udf(IntegerType())
>>> def multiply_by_two(s):
... return s * 2
>>> df = spark.createDataFrame([{"value": 5}])
>>> df.select(multiply_by_two("value").alias("doubled")).show()
"""
from sparkless.spark_types import StringType
from sparkless.functions.udf import UserDefinedFunction
# Handle different call patterns:
# 1. @pandas_udf(IntegerType()) - f is the type, returnType is None
# 2. @pandas_udf(returnType=IntegerType()) - f is None, returnType is the type
# 3. pandas_udf(lambda x: x, IntegerType()) - f is function, returnType is the type
# Check if first argument is a data type (not a function)
if f is not None and not callable(f):
# f is actually the return type
actual_returnType = f
f = None
else:
actual_returnType = returnType if returnType is not None else StringType()
def pandas_udf_wrapper(func: Callable[..., Any]) -> UserDefinedFunction:
"""Wrap function to create UserDefinedFunction with Pandas eval type."""
udf_obj = UserDefinedFunction(func, actual_returnType, evalType="PANDAS")
return udf_obj
# Support decorator pattern: @pandas_udf or pandas_udf(lambda x: x)
if f is None:
return pandas_udf_wrapper
else:
return pandas_udf_wrapper(f)
[docs]
@staticmethod
def window(
timeColumn: Union[Column, str],
windowDuration: str,
slideDuration: Optional[str] = None,
startTime: Optional[str] = None,
) -> ColumnOperation:
"""Create time-based window for grouping operations (all PySpark versions).
Args:
timeColumn: Timestamp column to window
windowDuration: Duration string (e.g., "10 seconds", "1 minute", "2 hours")
slideDuration: Slide duration for sliding windows (defaults to windowDuration)
startTime: Offset for window alignment (e.g., "0 seconds")
Returns:
Column representing window struct with start and end times
Example:
>>> df.groupBy(F.window("timestamp", "10 minutes")).count()
>>> df.groupBy(F.window("timestamp", "10 minutes", "5 minutes")).agg(F.sum("value"))
"""
column = Column(timeColumn) if isinstance(timeColumn, str) else timeColumn
# Create a window operation
op = ColumnOperation(column, "window", name=f"window({column.name})")
op._window_duration = windowDuration
op._window_slide = slideDuration or windowDuration
op._window_start = startTime or "0 seconds"
return op
[docs]
@staticmethod
def window_time(windowColumn: Union[Column, str]) -> ColumnOperation:
"""Extract window start time from window column (PySpark 3.4+).
Args:
windowColumn: Window column to extract time from
Returns:
Column operation representing window start timestamp
Example:
>>> df.groupBy(F.window("timestamp", "1 hour")).agg(
... F.window_time(F.col("window")).alias("window_start")
... )
"""
column = Column(windowColumn) if isinstance(windowColumn, str) else windowColumn
op = ColumnOperation(column, "window_time", name=f"window_time({column.name})")
return op
# New string functions (Phase 1)
[docs]
@staticmethod
def ilike(column: Union[Column, str], pattern: str) -> ColumnOperation:
"""Case-insensitive LIKE pattern matching."""
return StringFunctions.ilike(column, pattern)
[docs]
@staticmethod
def find_in_set(
column: Union[Column, str], str_list: Union[Column, str]
) -> ColumnOperation:
"""Find position of value in comma-separated string list."""
return StringFunctions.find_in_set(column, str_list)
[docs]
@staticmethod
def regexp_count(column: Union[Column, str], pattern: str) -> ColumnOperation:
"""Count occurrences of regex pattern in string."""
return StringFunctions.regexp_count(column, pattern)
[docs]
@staticmethod
def regexp_like(column: Union[Column, str], pattern: str) -> ColumnOperation:
"""Regex pattern matching (similar to rlike)."""
return StringFunctions.regexp_like(column, pattern)
[docs]
@staticmethod
def regexp_substr(
column: Union[Column, str], pattern: str, pos: int = 1, occurrence: int = 1
) -> ColumnOperation:
"""Extract substring matching regex pattern."""
return StringFunctions.regexp_substr(column, pattern, pos, occurrence)
[docs]
@staticmethod
def regexp_instr(
column: Union[Column, str], pattern: str, pos: int = 1, occurrence: int = 1
) -> ColumnOperation:
"""Find position of regex pattern match."""
return StringFunctions.regexp_instr(column, pattern, pos, occurrence)
[docs]
@staticmethod
def regexp(column: Union[Column, str], pattern: str) -> ColumnOperation:
"""Alias for rlike - regex pattern matching."""
return StringFunctions.regexp(column, pattern)
[docs]
@staticmethod
def sentences(
column: Union[Column, str],
language: Optional[str] = None,
country: Optional[str] = None,
) -> ColumnOperation:
"""Split text into sentences."""
return StringFunctions.sentences(column, language, country)
[docs]
@staticmethod
def printf(format_str: str, *columns: Union[Column, str]) -> ColumnOperation:
"""Formatted string (like sprintf)."""
return StringFunctions.printf(format_str, *columns)
[docs]
@staticmethod
def to_char(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert number/date to character string."""
return StringFunctions.to_char(column, format)
[docs]
@staticmethod
def to_varchar(
column: Union[Column, str], length: Optional[int] = None
) -> ColumnOperation:
"""Convert to varchar type."""
return StringFunctions.to_varchar(column, length)
[docs]
@staticmethod
def typeof(column: Union[Column, str]) -> ColumnOperation:
"""Get type of value as string."""
return StringFunctions.typeof(column)
[docs]
@staticmethod
def stack(n: int, *cols: Union[Column, str, Any]) -> ColumnOperation:
"""Stack multiple columns into rows."""
return StringFunctions.stack(n, *cols)
# New math/bitwise functions (Phase 2)
[docs]
@staticmethod
def pmod(
dividend: Union[Column, str, int], divisor: Union[Column, str, int]
) -> ColumnOperation:
"""Positive modulo - always returns positive remainder."""
return MathFunctions.pmod(dividend, divisor)
[docs]
@staticmethod
def negate(column: Union[Column, str]) -> ColumnOperation:
"""Negate value (alias for negative)."""
return MathFunctions.negate(column)
[docs]
@staticmethod
def shiftleft(
column: Union[Column, str], num_bits: Union[Column, str, int]
) -> ColumnOperation:
"""Bitwise left shift."""
return BitwiseFunctions.shiftleft(column, num_bits)
[docs]
@staticmethod
def shiftright(
column: Union[Column, str], num_bits: Union[Column, str, int]
) -> ColumnOperation:
"""Bitwise right shift (signed)."""
return BitwiseFunctions.shiftright(column, num_bits)
[docs]
@staticmethod
def shiftrightunsigned(
column: Union[Column, str], num_bits: Union[Column, str, int]
) -> ColumnOperation:
"""Bitwise unsigned right shift."""
return BitwiseFunctions.shiftrightunsigned(column, num_bits)
# Deprecated camelCase aliases (PySpark 3.0-3.1)
[docs]
@staticmethod
def shiftLeft(
column: Union[Column, str], num_bits: Union[Column, str, int]
) -> ColumnOperation:
"""Deprecated alias for shiftleft (PySpark 3.0-3.1)."""
return BitwiseFunctions.shiftLeft(column, num_bits)
[docs]
@staticmethod
def shiftRight(
column: Union[Column, str], num_bits: Union[Column, str, int]
) -> ColumnOperation:
"""Deprecated alias for shiftright (PySpark 3.0-3.1)."""
return BitwiseFunctions.shiftRight(column, num_bits)
[docs]
@staticmethod
def shiftRightUnsigned(
column: Union[Column, str], num_bits: Union[Column, str, int]
) -> ColumnOperation:
"""Deprecated alias for shiftrightunsigned (PySpark 3.0-3.1)."""
return BitwiseFunctions.shiftRightUnsigned(column, num_bits)
# New datetime functions (Phase 3)
[docs]
@staticmethod
def years(column: Union[Column, str, int]) -> ColumnOperation:
"""Convert number to years interval."""
return DateTimeFunctions.years(column)
[docs]
@staticmethod
def localtimestamp() -> ColumnOperation:
"""Get local timestamp (without timezone)."""
return DateTimeFunctions.localtimestamp()
[docs]
@staticmethod
def dateadd(
date_part: str, value: Union[Column, str, int], date: Union[Column, str]
) -> ColumnOperation:
"""SQL Server style date addition."""
return DateTimeFunctions.dateadd(date_part, value, date)
[docs]
@staticmethod
def datepart(date_part: str, date: Union[Column, str]) -> ColumnOperation:
"""SQL Server style date part extraction."""
return DateTimeFunctions.datepart(date_part, date)
[docs]
@staticmethod
def make_timestamp(
year: Union[Column, str, int],
month: Union[Column, str, int],
day: Union[Column, str, int],
hour: Union[Column, str, int] = 0,
minute: Union[Column, str, int] = 0,
second: Union[Column, str, int] = 0,
) -> ColumnOperation:
"""Create timestamp from components."""
return DateTimeFunctions.make_timestamp(year, month, day, hour, minute, second)
[docs]
@staticmethod
def make_timestamp_ltz(
year: Union[Column, str, int],
month: Union[Column, str, int],
day: Union[Column, str, int],
hour: Union[Column, str, int] = 0,
minute: Union[Column, str, int] = 0,
second: Union[Column, str, int] = 0,
timezone: Optional[str] = None,
) -> ColumnOperation:
"""Create timestamp with local timezone."""
return DateTimeFunctions.make_timestamp_ltz(
year, month, day, hour, minute, second, timezone
)
[docs]
@staticmethod
def make_timestamp_ntz(
year: Union[Column, str, int],
month: Union[Column, str, int],
day: Union[Column, str, int],
hour: Union[Column, str, int] = 0,
minute: Union[Column, str, int] = 0,
second: Union[Column, str, int] = 0,
) -> ColumnOperation:
"""Create timestamp with no timezone."""
return DateTimeFunctions.make_timestamp_ntz(
year, month, day, hour, minute, second
)
[docs]
@staticmethod
def make_interval(
years: Union[Column, str, int] = 0,
months: Union[Column, str, int] = 0,
weeks: Union[Column, str, int] = 0,
days: Union[Column, str, int] = 0,
hours: Union[Column, str, int] = 0,
mins: Union[Column, str, int] = 0,
secs: Union[Column, str, int] = 0,
) -> ColumnOperation:
"""Create interval from components."""
return DateTimeFunctions.make_interval(
years, months, weeks, days, hours, mins, secs
)
[docs]
@staticmethod
def make_dt_interval(
days: Union[Column, str, int] = 0,
hours: Union[Column, str, int] = 0,
mins: Union[Column, str, int] = 0,
secs: Union[Column, str, int] = 0,
) -> ColumnOperation:
"""Create day-time interval."""
return DateTimeFunctions.make_dt_interval(days, hours, mins, secs)
[docs]
@staticmethod
def make_ym_interval(
years: Union[Column, str, int] = 0, months: Union[Column, str, int] = 0
) -> ColumnOperation:
"""Create year-month interval."""
return DateTimeFunctions.make_ym_interval(years, months)
[docs]
@staticmethod
def to_number(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert string to number."""
return DateTimeFunctions.to_number(column, format)
[docs]
@staticmethod
def to_binary(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert to binary format."""
return DateTimeFunctions.to_binary(column, format)
[docs]
@staticmethod
def to_unix_timestamp(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert to unix timestamp."""
return DateTimeFunctions.to_unix_timestamp(column, format)
[docs]
@staticmethod
def unix_date(column: Union[Column, str]) -> ColumnOperation:
"""Convert unix timestamp to date."""
return DateTimeFunctions.unix_date(column)
[docs]
@staticmethod
def unix_seconds(column: Union[Column, str]) -> ColumnOperation:
"""Convert timestamp to unix seconds."""
return DateTimeFunctions.unix_seconds(column)
[docs]
@staticmethod
def unix_millis(column: Union[Column, str]) -> ColumnOperation:
"""Convert timestamp to unix milliseconds."""
return DateTimeFunctions.unix_millis(column)
[docs]
@staticmethod
def unix_micros(column: Union[Column, str]) -> ColumnOperation:
"""Convert timestamp to unix microseconds."""
return DateTimeFunctions.unix_micros(column)
[docs]
@staticmethod
def timestamp_millis(column: Union[Column, str]) -> ColumnOperation:
"""Create timestamp from unix milliseconds."""
return DateTimeFunctions.timestamp_millis(column)
[docs]
@staticmethod
def timestamp_micros(column: Union[Column, str]) -> ColumnOperation:
"""Create timestamp from unix microseconds."""
return DateTimeFunctions.timestamp_micros(column)
# New aggregate functions (Phase 4)
[docs]
@staticmethod
def regr_avgx(y: Union[Column, str], x: Union[Column, str]) -> AggregateFunction:
"""Linear regression average of x."""
return AggregateFunctions.regr_avgx(y, x)
[docs]
@staticmethod
def regr_avgy(y: Union[Column, str], x: Union[Column, str]) -> AggregateFunction:
"""Linear regression average of y."""
return AggregateFunctions.regr_avgy(y, x)
[docs]
@staticmethod
def regr_count(y: Union[Column, str], x: Union[Column, str]) -> AggregateFunction:
"""Linear regression count."""
return AggregateFunctions.regr_count(y, x)
[docs]
@staticmethod
def regr_intercept(
y: Union[Column, str], x: Union[Column, str]
) -> AggregateFunction:
"""Linear regression intercept."""
return AggregateFunctions.regr_intercept(y, x)
[docs]
@staticmethod
def regr_r2(y: Union[Column, str], x: Union[Column, str]) -> AggregateFunction:
"""Linear regression R-squared."""
return AggregateFunctions.regr_r2(y, x)
[docs]
@staticmethod
def regr_slope(y: Union[Column, str], x: Union[Column, str]) -> AggregateFunction:
"""Linear regression slope."""
return AggregateFunctions.regr_slope(y, x)
[docs]
@staticmethod
def regr_sxx(y: Union[Column, str], x: Union[Column, str]) -> AggregateFunction:
"""Linear regression sum of squares of x."""
return AggregateFunctions.regr_sxx(y, x)
[docs]
@staticmethod
def regr_sxy(y: Union[Column, str], x: Union[Column, str]) -> AggregateFunction:
"""Linear regression sum of products."""
return AggregateFunctions.regr_sxy(y, x)
[docs]
@staticmethod
def regr_syy(y: Union[Column, str], x: Union[Column, str]) -> AggregateFunction:
"""Linear regression sum of squares of y."""
return AggregateFunctions.regr_syy(y, x)
# New utility functions (Phase 5)
[docs]
@staticmethod
def get(
col: Union[Column, str], key: Union[Column, str, int, Any]
) -> ColumnOperation:
"""Get element from array by index or map by key."""
return ArrayFunctions.get(col, key)
[docs]
@staticmethod
def inline(col: Union[Column, str]) -> ColumnOperation:
"""Explode array of structs into rows."""
return ArrayFunctions.inline(col)
[docs]
@staticmethod
def inline_outer(col: Union[Column, str]) -> ColumnOperation:
"""Explode array of structs into rows (outer join style)."""
return ArrayFunctions.inline_outer(col)
[docs]
@staticmethod
def str_to_map(
column: Union[Column, str],
pair_delim: Optional[str] = ",",
key_value_delim: Optional[str] = ":",
) -> ColumnOperation:
"""Convert string to map using delimiters."""
return MapFunctions.str_to_map(column, pair_delim, key_value_delim)
# Deprecated Aliases
[docs]
@staticmethod
def approxCountDistinct(*cols: Union[Column, str]) -> AggregateFunction:
"""Deprecated alias for approx_count_distinct (all PySpark versions)."""
return AggregateFunctions.approxCountDistinct(*cols)
[docs]
@staticmethod
def sumDistinct(column: Union[Column, str]) -> AggregateFunction:
"""Deprecated alias for sum_distinct (all PySpark versions)."""
return AggregateFunctions.sumDistinct(column)
[docs]
@staticmethod
def bitwiseNOT(column: Union[Column, str]) -> ColumnOperation:
"""Deprecated alias for bitwise_not (all PySpark versions)."""
return BitwiseFunctions.bitwiseNOT(column)
[docs]
@staticmethod
def toDegrees(column: Union[Column, str]) -> ColumnOperation:
"""Deprecated alias for degrees (all PySpark versions)."""
return MathFunctions.toDegrees(column)
[docs]
@staticmethod
def toRadians(column: Union[Column, str]) -> ColumnOperation:
"""Deprecated alias for radians (all PySpark versions)."""
return MathFunctions.toRadians(column)
# Dynamic dispatch helpers
[docs]
@staticmethod
def call_function(function_name: str, *columns: Any) -> Any:
"""
Dynamically invoke a function from the sparkless functions namespace.
Args:
function_name: Name of the function to invoke (e.g. ``"upper"``).
*columns: Positional arguments passed to the resolved function.
Returns:
Whatever the resolved function returns (typically a ColumnOperation).
Raises:
PySparkValueError: If the requested function is not registered.
PySparkTypeError: If the supplied arguments are incompatible with the
resolved function signature.
"""
if not isinstance(function_name, str) or not function_name:
raise PySparkTypeError(
"call_function() expects a non-empty string as function name"
)
if function_name == "call_function":
raise PySparkValueError("Function 'call_function' cannot be dispatched")
candidate = getattr(Functions, function_name, None)
if candidate is None or not callable(candidate):
raise PySparkValueError(
f"Function {function_name!r} is not registered in sparkless"
)
try:
return candidate(*columns)
except TypeError as exc:
raise PySparkTypeError(
f"Function {function_name!r} does not accept the provided arguments: {exc}"
) from exc
# Create the F namespace instance
F = Functions()
# Re-export all the main classes for backward compatibility
__all__ = [
"Column",
"ColumnOperation",
"Literal",
"AggregateFunction",
"CaseWhen",
"WindowFunction",
"Functions",
"F",
"StringFunctions",
"MathFunctions",
"AggregateFunctions",
"DateTimeFunctions",
]