Source code for sparkless.functions.xml

"""XML functions for PySpark 3.2+ compatibility."""

from typing import Union, cast
from sparkless.functions.core.column import Column
from sparkless.functions.base import ColumnOperation


[docs] class XMLFunctions: """XML parsing and manipulation functions."""
[docs] @staticmethod def from_xml(col: Union[Column, str], schema: str) -> ColumnOperation: """Parse XML string to struct based on schema. Args: col: Column containing XML strings. schema: Schema definition string. Returns: ColumnOperation representing the from_xml function. Example: >>> df.select(F.from_xml(F.col("xml"), "name STRING, age INT")) """ if isinstance(col, str): col = Column(col) return ColumnOperation( col, "from_xml", schema, name=f"from_xml({col.name}, '{schema}')", )
[docs] @staticmethod def to_xml( col: Union[Column, ColumnOperation, str], ) -> ColumnOperation: # str may be passed at runtime """Convert struct column to XML string. Args: col: Struct column to convert. Returns: ColumnOperation representing the to_xml function. Example: >>> df.select(F.to_xml(F.struct(F.col("name"), F.col("age")))) """ if isinstance(col, str): col_obj: Union[Column, ColumnOperation] = Column(col) elif isinstance(col, (Column, ColumnOperation)): col_obj = col else: # For other types, convert to Column col_obj = Column(str(col)) # type: ignore[unreachable] if isinstance(col_obj, Column): base_col = col_obj else: # At this point, col_obj must be ColumnOperation base_col = cast("ColumnOperation", col_obj).column # type: ignore[unreachable] return ColumnOperation( base_col, "to_xml", col if isinstance(col, ColumnOperation) else None, name=f"to_xml({getattr(col, 'name', 'struct')})", )
[docs] @staticmethod def schema_of_xml(col: Union[Column, str]) -> ColumnOperation: """Infer schema from XML string. Args: col: Column containing XML strings. Returns: ColumnOperation representing the schema_of_xml function. Example: >>> df.select(F.schema_of_xml(F.col("xml"))) """ if isinstance(col, str): col = Column(col) return ColumnOperation( col, "schema_of_xml", name=f"schema_of_xml({col.name})", )
[docs] @staticmethod def xpath(xml: Union[Column, str], path: str) -> ColumnOperation: """Extract array of values from XML using XPath. Args: xml: Column containing XML strings. path: XPath expression. Returns: ColumnOperation representing the xpath function. Example: >>> df.select(F.xpath(F.col("xml"), "/root/item")) """ if isinstance(xml, str): xml = Column(xml) return ColumnOperation( xml, "xpath", path, name=f"xpath({xml.name}, '{path}')", )
[docs] @staticmethod def xpath_boolean(xml: Union[Column, str], path: str) -> ColumnOperation: """Evaluate XPath expression to boolean. Args: xml: Column containing XML strings. path: XPath expression. Returns: ColumnOperation representing the xpath_boolean function. Example: >>> df.select(F.xpath_boolean(F.col("xml"), "/root/active='true'")) """ if isinstance(xml, str): xml = Column(xml) return ColumnOperation( xml, "xpath_boolean", path, name=f"xpath_boolean({xml.name}, '{path}')", )
[docs] @staticmethod def xpath_double(xml: Union[Column, str], path: str) -> ColumnOperation: """Extract double value from XML using XPath. Args: xml: Column containing XML strings. path: XPath expression. Returns: ColumnOperation representing the xpath_double function. Example: >>> df.select(F.xpath_double(F.col("xml"), "/root/value")) """ if isinstance(xml, str): xml = Column(xml) return ColumnOperation( xml, "xpath_double", path, name=f"xpath_double({xml.name}, '{path}')", )
[docs] @staticmethod def xpath_float(xml: Union[Column, str], path: str) -> ColumnOperation: """Extract float value from XML using XPath. Args: xml: Column containing XML strings. path: XPath expression. Returns: ColumnOperation representing the xpath_float function. Example: >>> df.select(F.xpath_float(F.col("xml"), "/root/price")) """ if isinstance(xml, str): xml = Column(xml) return ColumnOperation( xml, "xpath_float", path, name=f"xpath_float({xml.name}, '{path}')", )
[docs] @staticmethod def xpath_int(xml: Union[Column, str], path: str) -> ColumnOperation: """Extract integer value from XML using XPath. Args: xml: Column containing XML strings. path: XPath expression. Returns: ColumnOperation representing the xpath_int function. Example: >>> df.select(F.xpath_int(F.col("xml"), "/root/age")) """ if isinstance(xml, str): xml = Column(xml) return ColumnOperation( xml, "xpath_int", path, name=f"xpath_int({xml.name}, '{path}')", )
[docs] @staticmethod def xpath_long(xml: Union[Column, str], path: str) -> ColumnOperation: """Extract long value from XML using XPath. Args: xml: Column containing XML strings. path: XPath expression. Returns: ColumnOperation representing the xpath_long function. Example: >>> df.select(F.xpath_long(F.col("xml"), "/root/value")) """ if isinstance(xml, str): xml = Column(xml) return ColumnOperation( xml, "xpath_long", path, name=f"xpath_long({xml.name}, '{path}')", )
[docs] @staticmethod def xpath_short(xml: Union[Column, str], path: str) -> ColumnOperation: """Extract short value from XML using XPath. Args: xml: Column containing XML strings. path: XPath expression. Returns: ColumnOperation representing the xpath_short function. Example: >>> df.select(F.xpath_short(F.col("xml"), "/root/count")) """ if isinstance(xml, str): xml = Column(xml) return ColumnOperation( xml, "xpath_short", path, name=f"xpath_short({xml.name}, '{path}')", )
[docs] @staticmethod def xpath_string(xml: Union[Column, str], path: str) -> ColumnOperation: """Extract string value from XML using XPath. Args: xml: Column containing XML strings. path: XPath expression. Returns: ColumnOperation representing the xpath_string function. Example: >>> df.select(F.xpath_string(F.col("xml"), "/root/name")) """ if isinstance(xml, str): xml = Column(xml) return ColumnOperation( xml, "xpath_string", path, name=f"xpath_string({xml.name}, '{path}')", )