"""
Map functions for Sparkless.
This module provides comprehensive map manipulation functions that match PySpark's
map function API. Includes operations for extracting keys, values, entries, and
combining maps for working with map columns in DataFrames.
Key Features:
- Complete PySpark map function API compatibility
- Key/value extraction (map_keys, map_values)
- Entry operations (map_entries)
- Map combination (map_concat, map_from_arrays)
- Type-safe operations with proper return types
- Support for both column references and map literals
Example:
>>> from sparkless.sql import SparkSession, functions as F
>>> spark = SparkSession("test")
>>> data = [{"properties": {"key1": "val1", "key2": "val2"}}]
>>> df = spark.createDataFrame(data)
>>> df.select(F.map_keys(F.col("properties"))).show()
DataFrame[1 rows, 1 columns]
map_keys(properties)
['key1', 'key2']
"""
from typing import Any, Callable, Optional, Union
from sparkless.functions.base import (
Column,
ColumnOperation,
MockLambdaExpression,
)
[docs]
class MapFunctions:
"""Collection of map manipulation functions."""
[docs]
@staticmethod
def map_keys(column: Union[Column, str]) -> ColumnOperation:
"""Return an array of all keys in the map.
Args:
column: The map column.
Returns:
ColumnOperation representing the map_keys function.
Example:
>>> df.select(F.map_keys(F.col("properties")))
"""
if isinstance(column, str):
column = Column(column)
return ColumnOperation(column, "map_keys", name=f"map_keys({column.name})")
[docs]
@staticmethod
def map_values(column: Union[Column, str]) -> ColumnOperation:
"""Return an array of all values in the map.
Args:
column: The map column.
Returns:
ColumnOperation representing the map_values function.
Example:
>>> df.select(F.map_values(F.col("properties")))
"""
if isinstance(column, str):
column = Column(column)
return ColumnOperation(column, "map_values", name=f"map_values({column.name})")
[docs]
@staticmethod
def map_entries(column: Union[Column, str]) -> ColumnOperation:
"""Return an array of structs with key-value pairs.
Args:
column: The map column.
Returns:
ColumnOperation representing the map_entries function.
Example:
>>> df.select(F.map_entries(F.col("properties")))
"""
if isinstance(column, str):
column = Column(column)
return ColumnOperation(
column, "map_entries", name=f"map_entries({column.name})"
)
[docs]
@staticmethod
def map_concat(*columns: Union[Column, str]) -> ColumnOperation:
"""Concatenate multiple maps into a single map.
Args:
*columns: Map columns to concatenate.
Returns:
ColumnOperation representing the map_concat function.
Example:
>>> df.select(F.map_concat(F.col("map1"), F.col("map2"), F.col("map3")))
"""
if not columns:
raise ValueError("At least one column must be provided")
base_column = Column(columns[0]) if isinstance(columns[0], str) else columns[0]
column_names = [
col.name if hasattr(col, "name") else str(col) for col in columns
]
return ColumnOperation(
base_column,
"map_concat",
columns[1:],
name=f"map_concat({', '.join(column_names)})",
)
[docs]
@staticmethod
def map_from_arrays(
keys: Union[Column, str], values: Union[Column, str]
) -> ColumnOperation:
"""Create a map from two arrays (keys and values).
Args:
keys: Array column containing keys.
values: Array column containing values.
Returns:
ColumnOperation representing the map_from_arrays function.
Example:
>>> df.select(F.map_from_arrays(F.col("keys"), F.col("values")))
"""
if isinstance(keys, str):
keys = Column(keys)
if isinstance(values, str):
values = Column(values)
return ColumnOperation(
keys,
"map_from_arrays",
values,
name=f"map_from_arrays({keys.name}, {values.name})",
)
# Advanced Map Functions (PySpark 3.2+)
[docs]
@staticmethod
def create_map(*cols: Union[Column, str, Any]) -> ColumnOperation:
"""Create a map from key-value pairs.
Args:
*cols: Alternating key-value columns/literals. If no arguments are provided,
returns an empty map {}.
Returns:
ColumnOperation representing the create_map function.
Example:
>>> df.select(F.create_map(F.col("k1"), F.col("v1"), F.col("k2"), F.col("v2")))
>>> df.select(F.create_map()) # Returns empty map {}
"""
from .core.literals import Literal
# Allow 0 arguments (empty map) - PySpark returns {} for create_map()
if len(cols) == 0:
# Return empty map literal
base_col = Column("__create_map_base__")
return ColumnOperation(
base_col,
"create_map",
value=(), # Empty tuple for no arguments
name="map()",
)
# Allow create_map([]) - PySpark returns {} when given empty list (Issue #365)
if len(cols) == 1 and isinstance(cols[0], (list, tuple)) and len(cols[0]) == 0:
base_col = Column("__create_map_base__")
return ColumnOperation(
base_col,
"create_map",
value=(),
name="map()",
)
# Allow create_map([k1, v1, k2, v2, ...]) - PySpark accepts single list (Issue #440)
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
cols = tuple(cols[0])
if len(cols) < 2 or len(cols) % 2 != 0:
raise ValueError(
"create_map requires an even number of arguments (key-value pairs)"
)
# Use first column/literal as base, store ALL arguments as value
# If it's a Literal, create a dummy Column for the base
if isinstance(cols[0], Literal):
# Create a dummy column for validation purposes
base_col = Column("__create_map_base__")
elif isinstance(cols[0], Column):
base_col = cols[0]
else:
base_col = Column(str(cols[0]))
# Generate PySpark-compatible name: map(key, value) or map(key1, value1, key2, value2, ...)
from .core.literals import Literal
name_parts = []
for col in cols:
if isinstance(col, Literal):
name_parts.append(str(col.value))
elif isinstance(col, Column):
name_parts.append(col.name)
else:
name_parts.append(str(col))
name = f"map({', '.join(name_parts)})"
return ColumnOperation(
base_col,
"create_map",
value=cols, # Store all arguments, not just cols[1:]
name=name,
)
[docs]
@staticmethod
def map_contains_key(column: Union[Column, str], key: Any) -> ColumnOperation:
"""Check if map contains a specific key.
Args:
column: The map column.
key: The key to check for.
Returns:
ColumnOperation representing the map_contains_key function.
Example:
>>> df.select(F.map_contains_key(F.col("map"), "key"))
"""
if isinstance(column, str):
column = Column(column)
return ColumnOperation(
column,
"map_contains_key",
key,
name=f"map_contains_key({column.name}, {key!r})",
)
[docs]
@staticmethod
def map_from_entries(column: Union[Column, str]) -> ColumnOperation:
"""Convert array of key-value structs to map.
Args:
column: Array column containing structs with 'key' and 'value' fields.
Returns:
ColumnOperation representing the map_from_entries function.
Example:
>>> df.select(F.map_from_entries(F.col("entries")))
"""
if isinstance(column, str):
column = Column(column)
return ColumnOperation(
column, "map_from_entries", name=f"map_from_entries({column.name})"
)
[docs]
@staticmethod
def map_filter(
column: Union[Column, str], function: Callable[[Any, Any], bool]
) -> ColumnOperation:
"""Filter map entries based on key-value predicate.
This is a higher-order function that filters map entries using
the provided lambda function.
Args:
column: The map column to filter.
function: Lambda function (key, value) -> bool that returns True for entries to keep.
Returns:
ColumnOperation representing the map_filter function.
Example:
>>> df.select(F.map_filter(F.col("map"), lambda k, v: v > 10))
"""
if isinstance(column, str):
column = Column(column)
# Wrap the lambda function
lambda_expr = MockLambdaExpression(function)
return ColumnOperation(
column,
"map_filter",
value=lambda_expr,
name=f"map_filter({column.name}, <lambda>)",
)
[docs]
@staticmethod
def map_zip_with(
col1: Union[Column, str],
col2: Union[Column, str],
function: Callable[[Any, Any, Any], Any],
) -> ColumnOperation:
"""Merge two maps into a single map using a function (PySpark 3.1+).
This is a higher-order function that combines two maps by applying
the provided lambda function to matching keys.
Args:
col1: The first map column.
col2: The second map column.
function: Lambda function (key, value1, value2) -> new_value to combine values.
Returns:
ColumnOperation representing the map_zip_with function.
Example:
>>> df.select(F.map_zip_with(F.col("map1"), F.col("map2"), lambda k, v1, v2: v1 + v2))
"""
if isinstance(col1, str):
col1 = Column(col1)
if isinstance(col2, str):
col2 = Column(col2)
# Wrap the lambda function
lambda_expr = MockLambdaExpression(function)
# Store col2 as a tuple with the lambda
return ColumnOperation(
col1,
"map_zip_with",
value=(col2, lambda_expr),
name=f"map_zip_with({col1.name}, {col2.name}, <lambda>)",
)
[docs]
@staticmethod
def str_to_map(
column: Union[Column, str],
pair_delim: Optional[str] = ",",
key_value_delim: Optional[str] = ":",
) -> ColumnOperation:
"""Convert string to map using delimiters.
Args:
column: The string column to convert.
pair_delim: Delimiter between key-value pairs (default ',').
key_value_delim: Delimiter between key and value (default ':').
Returns:
ColumnOperation representing the str_to_map function.
"""
column = Column(column) if isinstance(column, str) else column
operation = ColumnOperation(
column,
"str_to_map",
(pair_delim, key_value_delim),
name=f"str_to_map({column.name}, '{pair_delim}', '{key_value_delim}')",
)
return operation