SQL Operations Guideο
This guide provides comprehensive documentation for Sparklessβs SQL operations, including parsing, validation, optimization, and execution.
Overviewο
Sparkless provides a complete SQL processing pipeline that mirrors PySparkβs SQL capabilities. Session-aware literals and schema tracking (see Getting started) apply to SQL execution: the active sessionβs current database and catalog are used when resolving table names and running queries.
SQL Parser - Converts SQL queries into Abstract Syntax Trees (AST)
SQL Validator - Validates query syntax and semantics
SQL Optimizer - Optimizes query execution plans
SQL Executor - Executes optimized queries against data
SQL Parserο
The SQLParser class provides SQL parsing functionality with support for common SQL operations.
Basic Usageο
from sparkless.session.sql import SQLParser
# Create parser instance
parser = SQLParser()
# Parse a simple SELECT query
ast = parser.parse("SELECT name, age FROM users WHERE age > 18")
print(ast.query_type) # 'SELECT'
print(ast.components['select_columns']) # ['name', 'age']
print(ast.components['from_tables']) # ['users']
print(ast.components['where_conditions']) # ['age > 18']
Supported Query Typesο
The parser supports the following SQL query types:
SELECT Queriesο
# Basic SELECT
ast = parser.parse("SELECT * FROM employees")
# SELECT with WHERE clause
ast = parser.parse("SELECT name, salary FROM employees WHERE department = 'Engineering'")
# SELECT with GROUP BY
ast = parser.parse("SELECT department, COUNT(*) FROM employees GROUP BY department")
# SELECT with ORDER BY
ast = parser.parse("SELECT name, salary FROM employees ORDER BY salary DESC")
# SELECT with LIMIT
ast = parser.parse("SELECT * FROM employees LIMIT 10")
DDL Operationsο
# CREATE TABLE
ast = parser.parse("CREATE TABLE users (id INT, name STRING, age INT)")
# DROP TABLE
ast = parser.parse("DROP TABLE users")
# ALTER TABLE
ast = parser.parse("ALTER TABLE users ADD COLUMN email STRING")
DML Operationsο
# INSERT
ast = parser.parse("INSERT INTO users VALUES (1, 'Alice', 25)")
# UPDATE
ast = parser.parse("UPDATE users SET age = 26 WHERE name = 'Alice'")
# DELETE
ast = parser.parse("DELETE FROM users WHERE age < 18")
AST Structureο
The SQLAST object contains the following components:
class SQLAST:
def __init__(self, query_type: str, components: Dict[str, Any]):
self.query_type = query_type # Type of SQL query
self.components = components # Parsed query components
# Available components for SELECT queries:
components = {
'original_query': str, # Original SQL query
'query_type': str, # Query type (SELECT, INSERT, etc.)
'tokens': List[str], # Tokenized query
'tables': List[str], # Referenced tables
'columns': List[str], # Referenced columns
'conditions': List[str], # WHERE conditions
'joins': List[Dict], # JOIN information
'group_by': List[str], # GROUP BY columns
'order_by': List[str], # ORDER BY columns
'limit': Optional[int], # LIMIT value
'offset': Optional[int], # OFFSET value
'select_columns': List[str], # SELECT columns
'from_tables': List[str], # FROM tables
'where_conditions': List[str], # WHERE conditions
'group_by_columns': List[str], # GROUP BY columns
'having_conditions': List[str], # HAVING conditions
'order_by_columns': List[str], # ORDER BY columns
'limit_value': Optional[int] # LIMIT value
}
Error Handlingο
The parser raises ParseException for invalid SQL:
from sparkless.core.exceptions.analysis import ParseException
try:
ast = parser.parse("INVALID SQL SYNTAX")
except ParseException as e:
print(f"Parse error: {e}")
SQL Validatorο
The SQLValidator class validates parsed SQL queries for syntax and semantic correctness.
Basic Usageο
from sparkless.session.sql import SQLValidator
validator = SQLValidator()
# Validate a parsed AST
ast = parser.parse("SELECT name FROM users WHERE age > 18")
is_valid = validator.validate(ast)
print(is_valid) # True or False
# Get validation errors
errors = validator.get_errors()
for error in errors:
print(f"Validation error: {error}")
Validation Rulesο
The validator checks for:
Syntax correctness - Valid SQL syntax
Table existence - Referenced tables exist
Column existence - Referenced columns exist in tables
Type compatibility - Column types are compatible
Join validity - JOIN conditions are valid
Aggregate usage - Aggregates used correctly with GROUP BY
Custom Validationο
# Add custom validation rules
validator.add_rule("custom_rule", lambda ast: check_custom_condition(ast))
# Validate with custom rules
is_valid = validator.validate(ast, custom_rules=True)
SQL Optimizerο
The SQLQueryOptimizer class optimizes SQL queries for better performance.
Basic Usageο
from sparkless.session.sql import SQLQueryOptimizer
optimizer = SQLQueryOptimizer()
# Optimize a parsed AST
ast = parser.parse("SELECT * FROM users WHERE age > 18 ORDER BY name")
optimized_ast = optimizer.optimize(ast)
print(optimized_ast.components)
Optimization Strategiesο
The optimizer applies various optimization strategies:
Predicate pushdown - Move WHERE conditions closer to data source
Column pruning - Remove unused columns
Join reordering - Optimize JOIN order
Limit pushdown - Move LIMIT closer to data source
Constant folding - Evaluate constant expressions
Custom Optimizationο
# Add custom optimization rules
optimizer.add_rule("custom_optimization", custom_optimization_function)
# Optimize with custom rules
optimized_ast = optimizer.optimize(ast, custom_rules=True)
SQL Executorο
The SQLExecutor class executes optimized SQL queries against data.
Basic Usageο
from sparkless.session.sql import SQLExecutor
executor = SQLExecutor(spark_session)
# Execute a parsed and optimized AST
result = executor.execute(optimized_ast)
print(result.collect())
Execution Contextο
The executor maintains execution context including:
Session state - Current Spark session
Table registry - Available tables
Schema registry - Table schemas
Configuration - Execution configuration
Custom Executionο
# Add custom execution handlers
executor.add_handler("custom_operation", custom_handler)
# Execute with custom handlers
result = executor.execute(ast, custom_handlers=True)
Complete SQL Pipeline Exampleο
Hereβs a complete example of using the SQL pipeline:
from sparkless.sql import SparkSession
from sparkless.session.sql import SQLParser, SQLValidator, SQLQueryOptimizer, SQLExecutor
# Create Spark session
spark = SparkSession("SQLExample")
# Create sample data
data = [
{"name": "Alice", "age": 25, "department": "Engineering"},
{"name": "Bob", "age": 30, "department": "Marketing"},
{"name": "Charlie", "age": 35, "department": "Engineering"}
]
df = spark.createDataFrame(data)
df.createOrReplaceTempView("employees")
# SQL Pipeline
parser = SQLParser()
validator = SQLValidator()
optimizer = SQLQueryOptimizer()
executor = SQLExecutor(spark)
# Parse SQL query
query = "SELECT department, COUNT(*) as count FROM employees WHERE age > 25 GROUP BY department ORDER BY count DESC"
ast = parser.parse(query)
# Validate query
if validator.validate(ast):
# Optimize query
optimized_ast = optimizer.optimize(ast)
# Execute query
result = executor.execute(optimized_ast)
# Display results
result.show()
else:
print("Query validation failed:")
for error in validator.get_errors():
print(f" - {error}")
Advanced Featuresο
Custom SQL Functionsο
# Register custom SQL function
def custom_function(value):
return value.upper()
spark.udf.register("custom_upper", custom_function)
# Use in SQL
result = spark.sql("SELECT custom_upper(name) FROM employees")
Query Hintsο
# Add query hints for optimization
query = """
SELECT /*+ BROADCAST(employees) */
e.name, d.department_name
FROM employees e
JOIN departments d ON e.department_id = d.id
"""
ast = parser.parse(query)
Performance Monitoringο
# Enable query performance monitoring
executor.enable_profiling()
# Execute query with profiling
result = executor.execute(ast)
# Get execution statistics
stats = executor.get_execution_stats()
print(f"Execution time: {stats['execution_time']}ms")
print(f"Rows processed: {stats['rows_processed']}")
Best Practicesο
Always validate queries before execution
Use parameterized queries to prevent SQL injection
Optimize queries for better performance
Monitor execution for performance bottlenecks
Handle errors gracefully with proper exception handling
Troubleshootingο
Common Issuesο
Parse errors - Check SQL syntax
Validation errors - Verify table/column existence
Execution errors - Check data types and constraints
Performance issues - Use query optimization
Debug Modeο
# Enable debug mode for detailed logging
parser.set_debug(True)
validator.set_debug(True)
optimizer.set_debug(True)
executor.set_debug(True)
This comprehensive SQL operations guide provides everything you need to work with Sparklessβs SQL capabilities effectively.