"""
Datetime functions for Sparkless.
This module provides comprehensive datetime functions that match PySpark's
datetime function API. Includes date/time conversion, extraction, and manipulation
operations for temporal data processing in DataFrames.
Key Features:
- Complete PySpark datetime function API compatibility
- Current date/time functions (current_timestamp, current_date)
- Date conversion (to_date, to_timestamp)
- Date extraction (year, month, day, hour, minute, second)
- Date manipulation (dayofweek, dayofyear, weekofyear, quarter)
- Type-safe operations with proper return types
- Support for various date formats and time zones
- Proper handling of date parsing and validation
Example:
>>> from sparkless.sql import SparkSession, functions as F
>>> spark = SparkSession("test")
>>> data = [{"timestamp": "2024-01-15 10:30:00", "date_str": "2024-01-15"}]
>>> df = spark.createDataFrame(data)
>>> df.select(
... F.year(F.col("timestamp")),
... F.month(F.col("timestamp")),
... F.to_date(F.col("date_str"))
... ).show()
DataFrame[1 rows, 3 columns]
year(timestamp) month(timestamp) to_date(date_str)
2024 1 2024-01-15
"""
from typing import Optional, Union
from sparkless.functions.base import Column, ColumnOperation
from sparkless.functions.core.literals import Literal
from sparkless.core.type_utils import normalize_date_input, get_expression_name
[docs]
class DateTimeFunctions:
"""Collection of datetime functions."""
@staticmethod
def _require_active_session(operation_name: str) -> None:
"""Require an active SparkSession for the operation.
Raises:
RuntimeError: If no active SparkSession is available
"""
from sparkless.session import SparkSession
if not SparkSession._has_active_session():
raise RuntimeError(
f"Cannot perform {operation_name}: "
"No active SparkSession found. "
"This operation requires an active SparkSession, similar to PySpark. "
"Create a SparkSession first: spark = SparkSession('app_name')"
)
[docs]
@staticmethod
def current_timestamp() -> ColumnOperation:
"""Get current timestamp.
Returns:
ColumnOperation representing the current_timestamp function.
Raises:
RuntimeError: If no active SparkSession is available
"""
DateTimeFunctions._require_active_session("current_timestamp function")
# Create a ColumnOperation without a column (None for functions without input)
operation = ColumnOperation(
None, "current_timestamp", name="current_timestamp()"
)
return operation
[docs]
@staticmethod
def current_date() -> ColumnOperation:
"""Get current date.
Returns:
ColumnOperation representing the current_date function.
Raises:
RuntimeError: If no active SparkSession is available
"""
DateTimeFunctions._require_active_session("current_date function")
# Create a ColumnOperation without a column (None for functions without input)
operation = ColumnOperation(None, "current_date", name="current_date()")
return operation
[docs]
@staticmethod
def now() -> ColumnOperation:
"""Alias for current_timestamp - Get current timestamp.
Returns:
ColumnOperation representing the now function.
"""
return DateTimeFunctions.current_timestamp()
[docs]
@staticmethod
def curdate() -> ColumnOperation:
"""Alias for current_date - Get current date.
Returns:
ColumnOperation representing the curdate function.
"""
return DateTimeFunctions.current_date()
[docs]
@staticmethod
def days(column: Union[Column, str, int]) -> ColumnOperation:
"""Convert number to days interval.
Args:
column: The number of days (can be column or literal).
Returns:
ColumnOperation representing the days function.
"""
if isinstance(column, (str, int)):
from sparkless.functions.base import Column
column = Column(str(column)) if isinstance(column, int) else Column(column)
operation = ColumnOperation(column, "days", name=f"days({column.name})")
return operation
[docs]
@staticmethod
def hours(column: Union[Column, str, int]) -> ColumnOperation:
"""Convert number to hours interval.
Args:
column: The number of hours (can be column or literal).
Returns:
ColumnOperation representing the hours function.
"""
if isinstance(column, (str, int)):
from sparkless.functions.base import Column
column = Column(str(column)) if isinstance(column, int) else Column(column)
operation = ColumnOperation(column, "hours", name=f"hours({column.name})")
return operation
[docs]
@staticmethod
def months(column: Union[Column, str, int]) -> ColumnOperation:
"""Convert number to months interval.
Args:
column: The number of months (can be column or literal).
Returns:
ColumnOperation representing the months function.
"""
if isinstance(column, (str, int)):
from sparkless.functions.base import Column
column = Column(str(column)) if isinstance(column, int) else Column(column)
operation = ColumnOperation(column, "months", name=f"months({column.name})")
return operation
[docs]
@staticmethod
def years(column: Union[Column, str, int]) -> ColumnOperation:
"""Convert number to years interval.
Args:
column: The number of years (can be column or literal).
Returns:
ColumnOperation representing the years function.
"""
if isinstance(column, (str, int)):
from sparkless.functions.base import Column
column = Column(str(column)) if isinstance(column, int) else Column(column)
operation = ColumnOperation(column, "years", name=f"years({column.name})")
return operation
[docs]
@staticmethod
def localtimestamp() -> ColumnOperation:
"""Get local timestamp (without timezone).
Returns:
ColumnOperation representing the localtimestamp function.
"""
operation = ColumnOperation(None, "localtimestamp", name="localtimestamp()")
return operation
[docs]
@staticmethod
def dateadd(
date_part: str, value: Union[Column, str, int], date: Union[Column, str]
) -> ColumnOperation:
"""SQL Server style date addition.
Args:
date_part: The date part to add (year, month, day, etc.).
value: The value to add.
date: The date column.
Returns:
ColumnOperation representing the dateadd function.
"""
from sparkless.functions.base import Column
if isinstance(date, str):
date = Column(date)
if isinstance(value, (str, int)):
value = Column(str(value)) if isinstance(value, int) else Column(value)
operation = ColumnOperation(
date,
"dateadd",
(date_part, value),
name=f"dateadd({date_part}, {value.name if hasattr(value, 'name') else value}, {date.name})",
)
return operation
[docs]
@staticmethod
def datepart(date_part: str, date: Union[Column, str]) -> ColumnOperation:
"""SQL Server style date part extraction.
Args:
date_part: The date part to extract (year, month, day, etc.).
date: The date column.
Returns:
ColumnOperation representing the datepart function.
"""
if isinstance(date, str):
date = Column(date)
operation = ColumnOperation(
date, "datepart", date_part, name=f"datepart({date_part}, {date.name})"
)
return operation
[docs]
@staticmethod
def make_timestamp(
year: Union[Column, str, int],
month: Union[Column, str, int],
day: Union[Column, str, int],
hour: Union[Column, str, int] = 0,
minute: Union[Column, str, int] = 0,
second: Union[Column, str, int] = 0,
) -> ColumnOperation:
"""Create timestamp from components.
Args:
year: Year component.
month: Month component.
day: Day component.
hour: Hour component (default 0).
minute: Minute component (default 0).
second: Second component (default 0).
Returns:
ColumnOperation representing the make_timestamp function.
"""
# Convert all to Column if needed
from sparkless.functions.base import Column
year_col = (
Column(str(year))
if isinstance(year, int)
else (Column(year) if isinstance(year, str) else year)
)
month_col = (
Column(str(month))
if isinstance(month, int)
else (Column(month) if isinstance(month, str) else month)
)
day_col = (
Column(str(day))
if isinstance(day, int)
else (Column(day) if isinstance(day, str) else day)
)
hour_col = (
Column(str(hour))
if isinstance(hour, int)
else (Column(hour) if isinstance(hour, str) else hour)
)
minute_col = (
Column(str(minute))
if isinstance(minute, int)
else (Column(minute) if isinstance(minute, str) else minute)
)
second_col = (
Column(str(second))
if isinstance(second, int)
else (Column(second) if isinstance(second, str) else second)
)
operation = ColumnOperation(
year_col,
"make_timestamp",
(month_col, day_col, hour_col, minute_col, second_col),
name="make_timestamp(...)",
)
return operation
[docs]
@staticmethod
def make_timestamp_ltz(
year: Union[Column, str, int],
month: Union[Column, str, int],
day: Union[Column, str, int],
hour: Union[Column, str, int] = 0,
minute: Union[Column, str, int] = 0,
second: Union[Column, str, int] = 0,
timezone: Optional[str] = None,
) -> ColumnOperation:
"""Create timestamp with local timezone.
Args:
year: Year component.
month: Month component.
day: Day component.
hour: Hour component (default 0).
minute: Minute component (default 0).
second: Second component (default 0).
timezone: Optional timezone string.
Returns:
ColumnOperation representing the make_timestamp_ltz function.
"""
from sparkless.functions.base import Column
year_col = (
Column(str(year))
if isinstance(year, int)
else (Column(year) if isinstance(year, str) else year)
)
month_col = (
Column(str(month))
if isinstance(month, int)
else (Column(month) if isinstance(month, str) else month)
)
day_col = (
Column(str(day))
if isinstance(day, int)
else (Column(day) if isinstance(day, str) else day)
)
hour_col = (
Column(str(hour))
if isinstance(hour, int)
else (Column(hour) if isinstance(hour, str) else hour)
)
minute_col = (
Column(str(minute))
if isinstance(minute, int)
else (Column(minute) if isinstance(minute, str) else minute)
)
second_col = (
Column(str(second))
if isinstance(second, int)
else (Column(second) if isinstance(second, str) else second)
)
operation = ColumnOperation(
year_col,
"make_timestamp_ltz",
(month_col, day_col, hour_col, minute_col, second_col, timezone),
name="make_timestamp_ltz(...)",
)
return operation
[docs]
@staticmethod
def make_timestamp_ntz(
year: Union[Column, str, int],
month: Union[Column, str, int],
day: Union[Column, str, int],
hour: Union[Column, str, int] = 0,
minute: Union[Column, str, int] = 0,
second: Union[Column, str, int] = 0,
) -> ColumnOperation:
"""Create timestamp with no timezone.
Args:
year: Year component.
month: Month component.
day: Day component.
hour: Hour component (default 0).
minute: Minute component (default 0).
second: Second component (default 0).
Returns:
ColumnOperation representing the make_timestamp_ntz function.
"""
from sparkless.functions.base import Column
year_col = (
Column(str(year))
if isinstance(year, int)
else (Column(year) if isinstance(year, str) else year)
)
month_col = (
Column(str(month))
if isinstance(month, int)
else (Column(month) if isinstance(month, str) else month)
)
day_col = (
Column(str(day))
if isinstance(day, int)
else (Column(day) if isinstance(day, str) else day)
)
hour_col = (
Column(str(hour))
if isinstance(hour, int)
else (Column(hour) if isinstance(hour, str) else hour)
)
minute_col = (
Column(str(minute))
if isinstance(minute, int)
else (Column(minute) if isinstance(minute, str) else minute)
)
second_col = (
Column(str(second))
if isinstance(second, int)
else (Column(second) if isinstance(second, str) else second)
)
operation = ColumnOperation(
year_col,
"make_timestamp_ntz",
(month_col, day_col, hour_col, minute_col, second_col),
name="make_timestamp_ntz(...)",
)
return operation
[docs]
@staticmethod
def make_interval(
years: Union[Column, str, int] = 0,
months: Union[Column, str, int] = 0,
weeks: Union[Column, str, int] = 0,
days: Union[Column, str, int] = 0,
hours: Union[Column, str, int] = 0,
mins: Union[Column, str, int] = 0,
secs: Union[Column, str, int] = 0,
) -> ColumnOperation:
"""Create interval from components.
Args:
years: Years component (default 0).
months: Months component (default 0).
weeks: Weeks component (default 0).
days: Days component (default 0).
hours: Hours component (default 0).
mins: Minutes component (default 0).
secs: Seconds component (default 0).
Returns:
ColumnOperation representing the make_interval function.
"""
from sparkless.functions.base import Column
# Convert all to Column if needed
years_col = (
Column(str(years))
if isinstance(years, int)
else (Column(years) if isinstance(years, str) else years)
)
months_col = (
Column(str(months))
if isinstance(months, int)
else (Column(months) if isinstance(months, str) else months)
)
weeks_col = (
Column(str(weeks))
if isinstance(weeks, int)
else (Column(weeks) if isinstance(weeks, str) else weeks)
)
days_col = (
Column(str(days))
if isinstance(days, int)
else (Column(days) if isinstance(days, str) else days)
)
hours_col = (
Column(str(hours))
if isinstance(hours, int)
else (Column(hours) if isinstance(hours, str) else hours)
)
mins_col = (
Column(str(mins))
if isinstance(mins, int)
else (Column(mins) if isinstance(mins, str) else mins)
)
secs_col = (
Column(str(secs))
if isinstance(secs, int)
else (Column(secs) if isinstance(secs, str) else secs)
)
operation = ColumnOperation(
years_col,
"make_interval",
(months_col, weeks_col, days_col, hours_col, mins_col, secs_col),
name="make_interval(...)",
)
return operation
[docs]
@staticmethod
def make_dt_interval(
days: Union[Column, str, int] = 0,
hours: Union[Column, str, int] = 0,
mins: Union[Column, str, int] = 0,
secs: Union[Column, str, int] = 0,
) -> ColumnOperation:
"""Create day-time interval.
Args:
days: Days component (default 0).
hours: Hours component (default 0).
mins: Minutes component (default 0).
secs: Seconds component (default 0).
Returns:
ColumnOperation representing the make_dt_interval function.
"""
from sparkless.functions.base import Column
days_col = (
Column(str(days))
if isinstance(days, int)
else (Column(days) if isinstance(days, str) else days)
)
hours_col = (
Column(str(hours))
if isinstance(hours, int)
else (Column(hours) if isinstance(hours, str) else hours)
)
mins_col = (
Column(str(mins))
if isinstance(mins, int)
else (Column(mins) if isinstance(mins, str) else mins)
)
secs_col = (
Column(str(secs))
if isinstance(secs, int)
else (Column(secs) if isinstance(secs, str) else secs)
)
operation = ColumnOperation(
days_col,
"make_dt_interval",
(hours_col, mins_col, secs_col),
name="make_dt_interval(...)",
)
return operation
[docs]
@staticmethod
def make_ym_interval(
years: Union[Column, str, int] = 0, months: Union[Column, str, int] = 0
) -> ColumnOperation:
"""Create year-month interval.
Args:
years: Years component (default 0).
months: Months component (default 0).
Returns:
ColumnOperation representing the make_ym_interval function.
"""
from sparkless.functions.base import Column
years_col = (
Column(str(years))
if isinstance(years, int)
else (Column(years) if isinstance(years, str) else years)
)
months_col = (
Column(str(months))
if isinstance(months, int)
else (Column(months) if isinstance(months, str) else months)
)
operation = ColumnOperation(
years_col, "make_ym_interval", months_col, name="make_ym_interval(...)"
)
return operation
[docs]
@staticmethod
def to_number(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert string to number.
Args:
column: The column to convert.
format: Optional format string.
Returns:
ColumnOperation representing the to_number function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "to_number", format, name=f"to_number({column.name})"
)
return operation
[docs]
@staticmethod
def to_binary(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert to binary format.
Args:
column: The column to convert.
format: Optional format string.
Returns:
ColumnOperation representing the to_binary function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "to_binary", format, name=f"to_binary({column.name})"
)
return operation
[docs]
@staticmethod
def to_unix_timestamp(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert to unix timestamp.
Args:
column: The column to convert.
format: Optional format string.
Returns:
ColumnOperation representing the to_unix_timestamp function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column,
"to_unix_timestamp",
format,
name=f"to_unix_timestamp({column.name})",
)
return operation
[docs]
@staticmethod
def unix_date(column: Union[Column, str]) -> ColumnOperation:
"""Convert unix timestamp to date.
Args:
column: The unix timestamp column.
Returns:
ColumnOperation representing the unix_date function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "unix_date", name=f"unix_date({column.name})"
)
return operation
[docs]
@staticmethod
def unix_seconds(column: Union[Column, str]) -> ColumnOperation:
"""Convert timestamp to unix seconds.
Args:
column: The timestamp column.
Returns:
ColumnOperation representing the unix_seconds function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "unix_seconds", name=f"unix_seconds({column.name})"
)
return operation
[docs]
@staticmethod
def unix_millis(column: Union[Column, str]) -> ColumnOperation:
"""Convert timestamp to unix milliseconds.
Args:
column: The timestamp column.
Returns:
ColumnOperation representing the unix_millis function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "unix_millis", name=f"unix_millis({column.name})"
)
return operation
[docs]
@staticmethod
def unix_micros(column: Union[Column, str]) -> ColumnOperation:
"""Convert timestamp to unix microseconds.
Args:
column: The timestamp column.
Returns:
ColumnOperation representing the unix_micros function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "unix_micros", name=f"unix_micros({column.name})"
)
return operation
[docs]
@staticmethod
def timestamp_millis(column: Union[Column, str]) -> ColumnOperation:
"""Create timestamp from unix milliseconds.
Args:
column: The unix milliseconds column.
Returns:
ColumnOperation representing the timestamp_millis function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "timestamp_millis", name=f"timestamp_millis({column.name})"
)
return operation
[docs]
@staticmethod
def timestamp_micros(column: Union[Column, str]) -> ColumnOperation:
"""Create timestamp from unix microseconds.
Args:
column: The unix microseconds column.
Returns:
ColumnOperation representing the timestamp_micros function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "timestamp_micros", name=f"timestamp_micros({column.name})"
)
return operation
[docs]
@staticmethod
def to_date(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert string, timestamp, or date to date.
Args:
column: The column to convert (StringType, TimestampType, or DateType).
format: Optional date format string (only used for StringType input).
Returns:
ColumnOperation representing the to_date function.
Raises:
TypeError: If input column type is not StringType, TimestampType, or DateType
"""
from sparkless.spark_types import StringType, TimestampType, DateType
if isinstance(column, str):
column = Column(column)
# PySpark accepts StringType, TimestampType, or DateType for to_date
# Check if we can determine the column type
input_type = getattr(column, "column_type", None)
if input_type is not None and not isinstance(
input_type, (StringType, TimestampType, DateType)
):
raise TypeError(
f"to_date() requires StringType, TimestampType, or DateType input, got {input_type}. "
f"Cast the column to string first: F.col('{column.name}').cast('string')"
)
name = (
f"to_date({column.name}, '{format}')"
if format is not None
else f"to_date({column.name})"
)
operation = ColumnOperation(column, "to_date", format, name=name)
return operation
[docs]
@staticmethod
def to_timestamp(
column: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert to timestamp.
Args:
column: The column to convert. Accepts StringType, TimestampType,
IntegerType, LongType, DateType, or DoubleType (matching PySpark behavior).
format: Optional timestamp format string (used for StringType input).
Returns:
ColumnOperation representing the to_timestamp function.
Raises:
TypeError: If input column type is not one of the supported types.
"""
from sparkless.spark_types import (
StringType,
TimestampType,
IntegerType,
LongType,
DateType,
DoubleType,
)
if isinstance(column, str):
column = Column(column)
# PySpark accepts multiple input types for to_timestamp:
# - StringType (with format parameter)
# - TimestampType (pass-through)
# - IntegerType/LongType (Unix timestamp in seconds)
# - DateType (convert Date to Timestamp)
# - DoubleType (Unix timestamp with decimal seconds)
input_type = getattr(column, "column_type", None)
if input_type is not None and not isinstance(
input_type,
(StringType, TimestampType, IntegerType, LongType, DateType, DoubleType),
):
raise TypeError(
f"to_timestamp() requires StringType, TimestampType, IntegerType, "
f"LongType, DateType, or DoubleType input, got {input_type}."
)
# Generate a simple name for the operation
name = f"to_timestamp_{column.name}"
operation = ColumnOperation(column, "to_timestamp", format, name=name)
return operation
[docs]
@staticmethod
def hour(column: Union[Column, str]) -> ColumnOperation:
"""Extract hour from timestamp.
Args:
column: The column to extract hour from.
Returns:
ColumnOperation representing the hour function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "hour", name=f"hour({column.name})")
return operation
[docs]
@staticmethod
def day(column: Union[Column, str]) -> ColumnOperation:
"""Extract day from date/timestamp.
Args:
column: The column to extract day from.
Returns:
ColumnOperation representing the day function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "day", name=f"day({column.name})")
return operation
[docs]
@staticmethod
def dayofmonth(column: Union[Column, str]) -> ColumnOperation:
"""Extract day of month from date/timestamp (alias for day).
Args:
column: The column to extract day from.
Returns:
ColumnOperation representing the dayofmonth function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "day", name=f"dayofmonth({column.name})")
return operation
[docs]
@staticmethod
def month(column: Union[Column, str]) -> ColumnOperation:
"""Extract month from date/timestamp.
Args:
column: The column to extract month from.
Returns:
ColumnOperation representing the month function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "month", name=f"month({column.name})")
return operation
[docs]
@staticmethod
def year(column: Union[Column, str]) -> ColumnOperation:
"""Extract year from date/timestamp.
Args:
column: The column to extract year from.
Returns:
ColumnOperation representing the year function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "year", name=f"year({column.name})")
return operation
[docs]
@staticmethod
def dayofweek(column: Union[Column, str]) -> ColumnOperation:
"""Extract day of week from date/timestamp.
Args:
column: The column to extract day of week from.
Returns:
ColumnOperation representing the dayofweek function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "dayofweek", name=f"dayofweek({column.name})"
)
return operation
[docs]
@staticmethod
def dayofyear(column: Union[Column, str]) -> ColumnOperation:
"""Extract day of year from date/timestamp.
Args:
column: The column to extract day of year from.
Returns:
ColumnOperation representing the dayofyear function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "dayofyear", name=f"dayofyear({column.name})"
)
return operation
[docs]
@staticmethod
def weekofyear(column: Union[Column, str]) -> ColumnOperation:
"""Extract week of year from date/timestamp.
Args:
column: The column to extract week of year from.
Returns:
ColumnOperation representing the weekofyear function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "weekofyear", name=f"weekofyear({column.name})"
)
return operation
[docs]
@staticmethod
def quarter(column: Union[Column, str]) -> ColumnOperation:
"""Extract quarter from date/timestamp.
Args:
column: The column to extract quarter from.
Returns:
ColumnOperation representing the quarter function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "quarter", name=f"quarter({column.name})")
return operation
[docs]
@staticmethod
def minute(column: Union[Column, str]) -> ColumnOperation:
"""Extract minute from timestamp.
Args:
column: The column to extract minute from.
Returns:
ColumnOperation representing the minute function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "minute", name=f"minute({column.name})")
return operation
[docs]
@staticmethod
def second(column: Union[Column, str]) -> ColumnOperation:
"""Extract second from timestamp.
Args:
column: The column to extract second from.
Returns:
ColumnOperation representing the second function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(column, "second", name=f"second({column.name})")
return operation
[docs]
@staticmethod
def add_months(column: Union[Column, str], num_months: int) -> ColumnOperation:
"""Add months to date/timestamp.
Args:
column: The column to add months to.
num_months: Number of months to add.
Returns:
ColumnOperation representing the add_months function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column,
"add_months",
num_months,
name=f"add_months({column.name}, {num_months})",
)
return operation
[docs]
@staticmethod
def months_between(
column1: Union[Column, str], column2: Union[Column, str]
) -> ColumnOperation:
"""Calculate months between two dates.
Args:
column1: The first date column.
column2: The second date column.
Returns:
ColumnOperation representing the months_between function.
"""
if isinstance(column1, str):
column1 = Column(column1)
if isinstance(column2, str):
column2 = Column(column2)
operation = ColumnOperation(
column1,
"months_between",
column2,
name=f"months_between({column1.name}, {column2.name}, true)",
)
return operation
[docs]
@staticmethod
def date_add(column: Union[Column, str], days: int) -> ColumnOperation:
"""Add days to date.
Args:
column: The column to add days to.
days: Number of days to add.
Returns:
ColumnOperation representing the date_add function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "date_add", days, name=f"date_add({column.name}, {days})"
)
return operation
[docs]
@staticmethod
def date_sub(column: Union[Column, str], days: int) -> ColumnOperation:
"""Subtract days from date.
Args:
column: The column to subtract days from.
days: Number of days to subtract.
Returns:
ColumnOperation representing the date_sub function.
"""
if isinstance(column, str):
column = Column(column)
operation = ColumnOperation(
column, "date_sub", days, name=f"date_sub({column.name}, {days})"
)
return operation
[docs]
@staticmethod
def from_unixtime(
column: Union[Column, str], format: str = "yyyy-MM-dd HH:mm:ss"
) -> ColumnOperation:
"""Convert unix timestamp to string.
Args:
column: The column with unix timestamp.
format: Date format string (default: 'yyyy-MM-dd HH:mm:ss').
Returns:
ColumnOperation representing the from_unixtime function.
"""
if isinstance(column, str):
column = Column(column)
# PySpark doesn't quote the format string in the column name
operation = ColumnOperation(
column,
"from_unixtime",
format,
name=f"from_unixtime({column.name}, {format})",
)
return operation
[docs]
@staticmethod
def timestampadd(
unit: str, quantity: Union[int, Column], timestamp: Union[str, Column]
) -> ColumnOperation:
"""Add time units to a timestamp.
Args:
unit: Time unit (YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND).
quantity: Number of units to add (can be column or integer).
timestamp: Timestamp column or literal.
Returns:
ColumnOperation representing the timestampadd function.
Example:
>>> df.select(F.timestampadd("DAY", 7, F.col("created_at")))
>>> df.select(F.timestampadd("HOUR", F.col("offset"), "2024-01-01"))
"""
if isinstance(timestamp, str):
timestamp = Column(timestamp)
# Handle quantity as column or literal
quantity_str = quantity.name if isinstance(quantity, Column) else str(quantity)
operation = ColumnOperation(
timestamp,
"timestampadd",
(unit, quantity),
name=f"timestampadd('{unit}', {quantity_str}, {timestamp.name})",
)
return operation
[docs]
@staticmethod
def timestampdiff(
unit: str, start: Union[str, Column], end: Union[str, Column]
) -> ColumnOperation:
"""Calculate difference between two timestamps.
Args:
unit: Time unit (YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND).
start: Start timestamp column or literal.
end: End timestamp column or literal.
Returns:
ColumnOperation representing the timestampdiff function.
Example:
>>> df.select(F.timestampdiff("DAY", F.col("start_date"), F.col("end_date")))
>>> df.select(F.timestampdiff("HOUR", "2024-01-01", F.col("end_time")))
"""
if isinstance(start, str):
start = Column(start)
if isinstance(end, str):
end = Column(end)
operation = ColumnOperation(
start,
"timestampdiff",
(unit, end),
name=f"timestampdiff('{unit}', {start.name}, {end.name})",
)
return operation
# Timezone Functions (PySpark 3.2+)
[docs]
@staticmethod
def convert_timezone(
sourceTz: str, targetTz: str, sourceTs: Union[Column, str]
) -> ColumnOperation:
"""Convert timestamp from source to target timezone."""
if isinstance(sourceTs, str):
sourceTs = Column(sourceTs)
return ColumnOperation(
sourceTs,
"convert_timezone",
(sourceTz, targetTz, sourceTs),
name=f"convert_timezone('{sourceTz}', '{targetTz}', {sourceTs.name})",
)
[docs]
@staticmethod
def current_timezone() -> ColumnOperation:
"""Get current timezone.
Raises:
RuntimeError: If no active SparkSession is available
"""
DateTimeFunctions._require_active_session("current_timezone function")
# Create a literal for functions without column input
from sparkless.functions.core.literals import Literal
dummy = Literal(1) # Use literal 1 as dummy input
return ColumnOperation(
dummy,
"current_timezone",
name="current_timezone()",
)
[docs]
@staticmethod
def from_utc_timestamp(ts: Union[Column, str], tz: str) -> ColumnOperation:
"""Convert UTC timestamp to given timezone."""
if isinstance(ts, str):
ts = Column(ts)
# PySpark doesn't quote the timezone string in the column name
# Extract timezone string if it's a Literal
tz_str = tz
if hasattr(tz, "value") and hasattr(tz, "data_type"):
tz_str = str(tz.value)
elif isinstance(tz, str):
tz_str = tz
return ColumnOperation(
ts,
"from_utc_timestamp",
tz,
name=f"from_utc_timestamp({ts.name}, {tz_str})",
)
[docs]
@staticmethod
def to_utc_timestamp(ts: Union[Column, str], tz: str) -> ColumnOperation:
"""Convert timestamp from given timezone to UTC."""
if isinstance(ts, str):
ts = Column(ts)
# PySpark doesn't quote the timezone string in the column name
# Extract timezone string if it's a Literal
tz_str = tz
if hasattr(tz, "value") and hasattr(tz, "data_type"):
tz_str = str(tz.value)
elif isinstance(tz, str):
tz_str = tz
return ColumnOperation(
ts,
"to_utc_timestamp",
tz,
name=f"to_utc_timestamp({ts.name}, {tz_str})",
)
# Date/Time Part Functions (PySpark 3.2+)
[docs]
@staticmethod
def date_part(field: str, source: Union[Column, str]) -> ColumnOperation:
"""Extract a field from a date/timestamp.
Args:
field: Field to extract (YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, etc.).
source: Date/timestamp column.
Returns:
ColumnOperation representing the date_part function.
Example:
>>> df.select(F.date_part("YEAR", F.col("date")))
"""
if isinstance(source, str):
source = Column(source)
return ColumnOperation(
source,
"date_part",
field,
name=f"date_part('{field}', {source.name})",
)
[docs]
@staticmethod
def dayname(date: Union[Column, str]) -> ColumnOperation:
"""Get the name of the day of the week.
Args:
date: Date column.
Returns:
ColumnOperation representing the dayname function.
Example:
>>> df.select(F.dayname(F.col("date")))
"""
if isinstance(date, str):
date = Column(date)
return ColumnOperation(date, "dayname", name=f"dayname({date.name})")
[docs]
@staticmethod
def make_date(
year: Union[
Column, int, str, Literal
], # str and Literal may be passed at runtime
month: Union[Column, int, str, Literal],
day: Union[Column, int, str, Literal],
) -> ColumnOperation:
"""Construct a date from year, month, day integers (PySpark 3.0+).
Args:
year: Year column or integer
month: Month column or integer (1-12)
day: Day column or integer (1-31)
Returns:
ColumnOperation representing the make_date function
Example:
>>> df.select(F.make_date(F.lit(2024), F.lit(3), F.lit(15)))
"""
# normalize_date_input and get_expression_name are imported at module level
year_col = normalize_date_input(year)
month_col = normalize_date_input(month)
day_col = normalize_date_input(day)
return ColumnOperation(
year_col,
"make_date",
value=(month_col, day_col),
name=f"make_date({get_expression_name(year_col)}, {get_expression_name(month_col)}, {get_expression_name(day_col)})",
)
[docs]
@staticmethod
def date_trunc(format: str, timestamp: Union[Column, str]) -> ColumnOperation:
"""Truncate timestamp to specified unit (year, month, day, hour, etc.).
Args:
format: Truncation unit ('year', 'month', 'day', 'hour', 'minute', 'second')
timestamp: Timestamp column to truncate
Returns:
ColumnOperation representing the date_trunc function
Example:
>>> df.select(F.date_trunc('month', F.col('timestamp')))
"""
if isinstance(timestamp, str):
timestamp = Column(timestamp)
return ColumnOperation(
timestamp,
"date_trunc",
value=format,
name=f"date_trunc({format}, {timestamp.name})",
)
[docs]
@staticmethod
def datediff(
end: Union[Column, str, Literal], start: Union[Column, str, Literal]
) -> ColumnOperation:
"""Returns number of days between two dates.
Args:
end: End date column or literal
start: Start date column or literal
Returns:
ColumnOperation representing the datediff function
Example:
>>> df.select(F.datediff(F.col('end_date'), F.lit('2024-01-01')))
"""
# Handle Literal objects
end_col: Union[Literal, Column]
if isinstance(end, Literal):
# For literals, use the literal as-is
end_col = end
elif isinstance(end, str):
# Check if it looks like a date string (not a column name)
# If it contains dashes and looks like a date, treat as literal
# Simple heuristic: "YYYY-MM-DD"
end_col = Literal(end) if "-" in end and len(end) == 10 else Column(end)
elif isinstance(end, Column):
# Already a Column, use as-is
end_col = end
else:
# Fallback: for ColumnOperation or other types, create Column from name
end_col = Column(str(end)) # type: ignore[unreachable]
start_col: Union[Literal, Column]
if isinstance(start, Literal):
start_col = start
elif isinstance(start, str):
# Check if it looks like a date string (not a column name)
if "-" in start and len(start) == 10: # Simple heuristic: "YYYY-MM-DD"
start_col = Literal(start)
else:
start_col = Column(start)
elif isinstance(start, Column):
# Already a Column, use as-is
start_col = start
else:
# Fallback: for ColumnOperation or other types, create Column from name
start_col = Column(str(start)) # type: ignore[unreachable]
end_name = (
end_col.name
if hasattr(end_col, "name")
else str(end_col.value)
if isinstance(end_col, Literal)
else str(end_col)
)
start_name = (
start_col.name
if hasattr(start_col, "name")
else str(start_col.value)
if isinstance(start_col, Literal)
else str(start_col)
)
return ColumnOperation(
end_col,
"datediff",
value=start_col,
name=f"datediff({end_name}, {start_name})",
)
[docs]
@staticmethod
def date_diff(
end: Union[Column, str], start: Union[Column, str]
) -> ColumnOperation:
"""Alias for datediff - Returns number of days between two dates.
Args:
end: End date column
start: Start date column
Returns:
ColumnOperation representing the date_diff function
Example:
>>> df.select(F.date_diff(F.col('end_date'), F.col('start_date')))
"""
# Call datediff directly (same implementation)
if isinstance(end, str):
end = Column(end)
if isinstance(start, str):
start = Column(start)
return ColumnOperation(
end, "datediff", value=start, name=f"date_diff({end.name}, {start.name})"
)
[docs]
@staticmethod
def unix_timestamp(
timestamp: Optional[Union[Column, str]] = None,
format: str = "yyyy-MM-dd HH:mm:ss",
) -> ColumnOperation:
"""Convert timestamp string to Unix timestamp (seconds since epoch).
Args:
timestamp: Timestamp column (optional, defaults to current timestamp)
format: Date/time format string
Returns:
ColumnOperation representing the unix_timestamp function
Example:
>>> df.select(F.unix_timestamp(F.col('timestamp'), 'yyyy-MM-dd'))
"""
from sparkless.functions.core.literals import Literal
timestamp_col: Union[Literal, Column]
if timestamp is None:
timestamp_col = Literal("current_timestamp")
elif isinstance(timestamp, str):
timestamp_col = Column(timestamp)
elif isinstance(timestamp, (Column, Literal)):
timestamp_col = timestamp
else:
# For ColumnOperation or other types, create Column from name
timestamp_col = Column(str(timestamp)) # type: ignore[unreachable]
# Get name safely
if hasattr(timestamp_col, "name"):
timestamp_name = timestamp_col.name
else:
timestamp_name = "current_timestamp"
return ColumnOperation(
timestamp_col,
"unix_timestamp",
value=format,
name=f"unix_timestamp({timestamp_name}, {format})",
)
[docs]
@staticmethod
def last_day(date: Union[Column, str]) -> ColumnOperation:
"""Returns the last day of the month for a given date.
Args:
date: Date column
Returns:
ColumnOperation representing the last_day function
Example:
>>> df.select(F.last_day(F.col('date')))
"""
if isinstance(date, str):
date = Column(date)
return ColumnOperation(date, "last_day", name=f"last_day({date.name})")
[docs]
@staticmethod
def next_day(date: Union[Column, str], dayOfWeek: str) -> ColumnOperation:
"""Returns the first date which is later than the value of the date column that is on the specified day of the week.
Args:
date: Date column
dayOfWeek: Day of week string (e.g., 'Mon', 'Monday')
Returns:
ColumnOperation representing the next_day function
Example:
>>> df.select(F.next_day(F.col('date'), 'Monday'))
"""
if isinstance(date, str):
date = Column(date)
return ColumnOperation(
date,
"next_day",
value=dayOfWeek,
name=f"next_day({date.name}, {dayOfWeek})",
)
[docs]
@staticmethod
def trunc(date: Union[Column, str], format: str) -> ColumnOperation:
"""Truncate date to specified unit (year, month, etc.).
Args:
date: Date column
format: Truncation format ('year', 'yyyy', 'yy', 'month', 'mon', 'mm')
Returns:
ColumnOperation representing the trunc function
Example:
>>> df.select(F.trunc(F.col('date'), 'year'))
"""
if isinstance(date, str):
date = Column(date)
return ColumnOperation(
date, "trunc", value=format, name=f"trunc({date.name}, {format})"
)
[docs]
@staticmethod
def timestamp_seconds(col: Union[Column, str, int]) -> ColumnOperation:
"""Convert seconds since epoch to timestamp (PySpark 3.1+).
Args:
col: Column or integer representing seconds since epoch
Returns:
ColumnOperation representing the timestamp
Example:
>>> df.select(F.timestamp_seconds(F.col("seconds")))
"""
# Normalize input - preserve Literal, convert int/str to Column
col_obj = normalize_date_input(col)
# Extract value/name for ColumnOperation name generation
col_name = get_expression_name(col_obj)
return ColumnOperation(
col_obj,
"timestamp_seconds",
name=f"timestamp_seconds({col_name})",
)
[docs]
@staticmethod
def weekday(col: Union[Column, str]) -> ColumnOperation:
"""Get the day of week as an integer (0 = Monday, 6 = Sunday) (PySpark 3.5+).
Args:
col: Column or column name containing date/timestamp values.
Returns:
ColumnOperation representing the weekday function.
Note:
Returns 0 for Monday through 6 for Sunday.
"""
column = Column(col) if isinstance(col, str) else col
return ColumnOperation(column, "weekday", name=f"weekday({column.name})")
[docs]
@staticmethod
def date_from_unix_date(days: Union[Column, str, int]) -> ColumnOperation:
"""Convert unix date (days since epoch) to date (PySpark 3.5+).
Args:
days: Column or integer representing days since epoch (1970-01-01).
Returns:
ColumnOperation representing the date_from_unix_date function.
Example:
>>> df.select(F.date_from_unix_date(F.col("days")))
"""
if isinstance(days, (str, int)):
from sparkless.functions.base import Column
days = Column(str(days)) if isinstance(days, int) else Column(days)
operation = ColumnOperation(
days, "date_from_unix_date", name=f"date_from_unix_date({days.name})"
)
return operation
[docs]
@staticmethod
def to_timestamp_ltz(
timestamp_str: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert string to timestamp with local timezone (PySpark 3.5+).
Args:
timestamp_str: Column or string containing timestamp.
format: Optional format string for parsing.
Returns:
ColumnOperation representing the to_timestamp_ltz function.
Example:
>>> df.select(F.to_timestamp_ltz(F.col("ts_str"), "yyyy-MM-dd HH:mm:ss"))
"""
if isinstance(timestamp_str, str):
timestamp_str = Column(timestamp_str)
if format is not None:
operation = ColumnOperation(
timestamp_str,
"to_timestamp_ltz",
value=format,
name=f"to_timestamp_ltz({timestamp_str.name}, '{format}')",
)
else:
operation = ColumnOperation(
timestamp_str,
"to_timestamp_ltz",
name=f"to_timestamp_ltz({timestamp_str.name})",
)
return operation
[docs]
@staticmethod
def to_timestamp_ntz(
timestamp_str: Union[Column, str], format: Optional[str] = None
) -> ColumnOperation:
"""Convert string to timestamp with no timezone (PySpark 3.5+).
Args:
timestamp_str: Column or string containing timestamp.
format: Optional format string for parsing.
Returns:
ColumnOperation representing the to_timestamp_ntz function.
Example:
>>> df.select(F.to_timestamp_ntz(F.col("ts_str"), "yyyy-MM-dd HH:mm:ss"))
"""
if isinstance(timestamp_str, str):
timestamp_str = Column(timestamp_str)
if format is not None:
operation = ColumnOperation(
timestamp_str,
"to_timestamp_ntz",
value=format,
name=f"to_timestamp_ntz({timestamp_str.name}, '{format}')",
)
else:
operation = ColumnOperation(
timestamp_str,
"to_timestamp_ntz",
name=f"to_timestamp_ntz({timestamp_str.name})",
)
return operation