SQL Operations Guide

This guide provides comprehensive documentation for Sparkless’s SQL operations, including parsing, validation, optimization, and execution.

Overview

Sparkless provides a complete SQL processing pipeline that mirrors PySpark’s SQL capabilities. Session-aware literals and schema tracking (see Getting started) apply to SQL execution: the active session’s current database and catalog are used when resolving table names and running queries.

  • SQL Parser - Converts SQL queries into Abstract Syntax Trees (AST)

  • SQL Validator - Validates query syntax and semantics

  • SQL Optimizer - Optimizes query execution plans

  • SQL Executor - Executes optimized queries against data

SQL Parser

The SQLParser class provides SQL parsing functionality with support for common SQL operations.

Basic Usage

from sparkless.session.sql import SQLParser

# Create parser instance
parser = SQLParser()

# Parse a simple SELECT query
ast = parser.parse("SELECT name, age FROM users WHERE age > 18")
print(ast.query_type)  # 'SELECT'
print(ast.components['select_columns'])  # ['name', 'age']
print(ast.components['from_tables'])  # ['users']
print(ast.components['where_conditions'])  # ['age > 18']

Supported Query Types

The parser supports the following SQL query types:

SELECT Queries

# Basic SELECT
ast = parser.parse("SELECT * FROM employees")

# SELECT with WHERE clause
ast = parser.parse("SELECT name, salary FROM employees WHERE department = 'Engineering'")

# SELECT with GROUP BY
ast = parser.parse("SELECT department, COUNT(*) FROM employees GROUP BY department")

# SELECT with ORDER BY
ast = parser.parse("SELECT name, salary FROM employees ORDER BY salary DESC")

# SELECT with LIMIT
ast = parser.parse("SELECT * FROM employees LIMIT 10")

DDL Operations

# CREATE TABLE
ast = parser.parse("CREATE TABLE users (id INT, name STRING, age INT)")

# DROP TABLE
ast = parser.parse("DROP TABLE users")

# ALTER TABLE
ast = parser.parse("ALTER TABLE users ADD COLUMN email STRING")

DML Operations

# INSERT
ast = parser.parse("INSERT INTO users VALUES (1, 'Alice', 25)")

# UPDATE
ast = parser.parse("UPDATE users SET age = 26 WHERE name = 'Alice'")

# DELETE
ast = parser.parse("DELETE FROM users WHERE age < 18")

AST Structure

The SQLAST object contains the following components:

class SQLAST:
    def __init__(self, query_type: str, components: Dict[str, Any]):
        self.query_type = query_type  # Type of SQL query
        self.components = components  # Parsed query components

# Available components for SELECT queries:
components = {
    'original_query': str,      # Original SQL query
    'query_type': str,          # Query type (SELECT, INSERT, etc.)
    'tokens': List[str],        # Tokenized query
    'tables': List[str],        # Referenced tables
    'columns': List[str],       # Referenced columns
    'conditions': List[str],    # WHERE conditions
    'joins': List[Dict],        # JOIN information
    'group_by': List[str],      # GROUP BY columns
    'order_by': List[str],      # ORDER BY columns
    'limit': Optional[int],     # LIMIT value
    'offset': Optional[int],    # OFFSET value
    'select_columns': List[str], # SELECT columns
    'from_tables': List[str],   # FROM tables
    'where_conditions': List[str], # WHERE conditions
    'group_by_columns': List[str], # GROUP BY columns
    'having_conditions': List[str], # HAVING conditions
    'order_by_columns': List[str], # ORDER BY columns
    'limit_value': Optional[int]   # LIMIT value
}

Error Handling

The parser raises ParseException for invalid SQL:

from sparkless.core.exceptions.analysis import ParseException

try:
    ast = parser.parse("INVALID SQL SYNTAX")
except ParseException as e:
    print(f"Parse error: {e}")

SQL Validator

The SQLValidator class validates parsed SQL queries for syntax and semantic correctness.

Basic Usage

from sparkless.session.sql import SQLValidator

validator = SQLValidator()

# Validate a parsed AST
ast = parser.parse("SELECT name FROM users WHERE age > 18")
is_valid = validator.validate(ast)
print(is_valid)  # True or False

# Get validation errors
errors = validator.get_errors()
for error in errors:
    print(f"Validation error: {error}")

Validation Rules

The validator checks for:

  • Syntax correctness - Valid SQL syntax

  • Table existence - Referenced tables exist

  • Column existence - Referenced columns exist in tables

  • Type compatibility - Column types are compatible

  • Join validity - JOIN conditions are valid

  • Aggregate usage - Aggregates used correctly with GROUP BY

Custom Validation

# Add custom validation rules
validator.add_rule("custom_rule", lambda ast: check_custom_condition(ast))

# Validate with custom rules
is_valid = validator.validate(ast, custom_rules=True)

SQL Optimizer

The SQLQueryOptimizer class optimizes SQL queries for better performance.

Basic Usage

from sparkless.session.sql import SQLQueryOptimizer

optimizer = SQLQueryOptimizer()

# Optimize a parsed AST
ast = parser.parse("SELECT * FROM users WHERE age > 18 ORDER BY name")
optimized_ast = optimizer.optimize(ast)
print(optimized_ast.components)

Optimization Strategies

The optimizer applies various optimization strategies:

  • Predicate pushdown - Move WHERE conditions closer to data source

  • Column pruning - Remove unused columns

  • Join reordering - Optimize JOIN order

  • Limit pushdown - Move LIMIT closer to data source

  • Constant folding - Evaluate constant expressions

Custom Optimization

# Add custom optimization rules
optimizer.add_rule("custom_optimization", custom_optimization_function)

# Optimize with custom rules
optimized_ast = optimizer.optimize(ast, custom_rules=True)

SQL Executor

The SQLExecutor class executes optimized SQL queries against data.

Basic Usage

from sparkless.session.sql import SQLExecutor

executor = SQLExecutor(spark_session)

# Execute a parsed and optimized AST
result = executor.execute(optimized_ast)
print(result.collect())

Execution Context

The executor maintains execution context including:

  • Session state - Current Spark session

  • Table registry - Available tables

  • Schema registry - Table schemas

  • Configuration - Execution configuration

Custom Execution

# Add custom execution handlers
executor.add_handler("custom_operation", custom_handler)

# Execute with custom handlers
result = executor.execute(ast, custom_handlers=True)

Complete SQL Pipeline Example

Here’s a complete example of using the SQL pipeline:

from sparkless.sql import SparkSession
from sparkless.session.sql import SQLParser, SQLValidator, SQLQueryOptimizer, SQLExecutor

# Create Spark session
spark = SparkSession("SQLExample")

# Create sample data
data = [
    {"name": "Alice", "age": 25, "department": "Engineering"},
    {"name": "Bob", "age": 30, "department": "Marketing"},
    {"name": "Charlie", "age": 35, "department": "Engineering"}
]
df = spark.createDataFrame(data)
df.createOrReplaceTempView("employees")

# SQL Pipeline
parser = SQLParser()
validator = SQLValidator()
optimizer = SQLQueryOptimizer()
executor = SQLExecutor(spark)

# Parse SQL query
query = "SELECT department, COUNT(*) as count FROM employees WHERE age > 25 GROUP BY department ORDER BY count DESC"
ast = parser.parse(query)

# Validate query
if validator.validate(ast):
    # Optimize query
    optimized_ast = optimizer.optimize(ast)
    
    # Execute query
    result = executor.execute(optimized_ast)
    
    # Display results
    result.show()
else:
    print("Query validation failed:")
    for error in validator.get_errors():
        print(f"  - {error}")

Advanced Features

Custom SQL Functions

# Register custom SQL function
def custom_function(value):
    return value.upper()

spark.udf.register("custom_upper", custom_function)

# Use in SQL
result = spark.sql("SELECT custom_upper(name) FROM employees")

Query Hints

# Add query hints for optimization
query = """
SELECT /*+ BROADCAST(employees) */ 
    e.name, d.department_name 
FROM employees e 
JOIN departments d ON e.department_id = d.id
"""
ast = parser.parse(query)

Performance Monitoring

# Enable query performance monitoring
executor.enable_profiling()

# Execute query with profiling
result = executor.execute(ast)

# Get execution statistics
stats = executor.get_execution_stats()
print(f"Execution time: {stats['execution_time']}ms")
print(f"Rows processed: {stats['rows_processed']}")

Best Practices

  1. Always validate queries before execution

  2. Use parameterized queries to prevent SQL injection

  3. Optimize queries for better performance

  4. Monitor execution for performance bottlenecks

  5. Handle errors gracefully with proper exception handling

Troubleshooting

Common Issues

  1. Parse errors - Check SQL syntax

  2. Validation errors - Verify table/column existence

  3. Execution errors - Check data types and constraints

  4. Performance issues - Use query optimization

Debug Mode

# Enable debug mode for detailed logging
parser.set_debug(True)
validator.set_debug(True)
optimizer.set_debug(True)
executor.set_debug(True)

This comprehensive SQL operations guide provides everything you need to work with Sparkless’s SQL capabilities effectively.