"""
Storage manager module.
This module provides a unified storage manager that can use different backends.
"""
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
import contextlib
import gc
import psutil
import os
from ..core.interfaces.storage import IStorageManager
from .backends.memory import MemoryStorageManager
from .backends.file import FileStorageManager
from sparkless.backend.polars import PolarsStorageManager
from sparkless.spark_types import StructType, StructField
[docs]
class StorageManagerFactory:
"""Factory for creating storage managers."""
[docs]
@staticmethod
def create_memory_manager() -> IStorageManager:
"""Create a memory storage manager.
Returns:
Memory storage manager instance.
"""
return MemoryStorageManager()
[docs]
@staticmethod
def create_file_manager(base_path: str = "sparkless_storage") -> IStorageManager:
"""Create a file storage manager.
Args:
base_path: Base path for storage files.
Returns:
File storage manager instance.
"""
return FileStorageManager(base_path)
[docs]
@staticmethod
def create_polars_manager(db_path: Optional[str] = None) -> IStorageManager:
"""Create a Polars storage manager (default in v3.0.0+).
Args:
db_path: Optional path for persistent storage. If None, uses in-memory storage.
Returns:
Polars storage manager instance.
"""
return PolarsStorageManager(db_path=db_path)
[docs]
class UnifiedStorageManager(IStorageManager):
"""Unified storage manager that can switch between backends."""
[docs]
def __init__(self, backend: IStorageManager):
"""Initialize unified storage manager.
Args:
backend: Storage backend to use.
"""
self.backend = backend
self._table_metadata: Dict[str, TableMetadata] = {}
self._current_schema: str = "default"
try:
self._current_schema = backend.get_current_schema()
except Exception: # pragma: no cover - backend without state support
self._current_schema = "default"
[docs]
def create_schema(self, schema: str) -> None:
"""Create a new schema.
Args:
schema: Name of the schema to create.
"""
self.backend.create_schema(schema)
[docs]
def schema_exists(self, schema: str) -> bool:
"""Check if schema exists.
Args:
schema: Name of the schema to check.
Returns:
True if schema exists, False otherwise.
"""
return self.backend.schema_exists(schema)
[docs]
def drop_schema(self, schema_name: str, cascade: bool = False) -> None:
"""Drop a schema.
Args:
schema_name: Name of the schema to drop.
cascade: Whether to cascade the drop operation.
"""
self.backend.drop_schema(schema_name, cascade)
if self._current_schema == schema_name:
self._current_schema = "default"
[docs]
def list_schemas(self) -> List[str]:
"""List all schemas.
Returns:
List of schema names.
"""
return self.backend.list_schemas()
[docs]
def get_current_schema(self) -> str:
"""Return the current schema used for unqualified table references."""
try:
backend_schema = self.backend.get_current_schema()
except Exception:
backend_schema = None
if backend_schema:
self._current_schema = backend_schema
return self._current_schema
[docs]
def set_current_schema(self, schema_name: str) -> None:
"""Set the current schema used for unqualified table references."""
if not self.schema_exists(schema_name):
raise ValueError(f"Schema '{schema_name}' does not exist")
with contextlib.suppress(Exception):
self.backend.set_current_schema(schema_name)
self._current_schema = schema_name
[docs]
def table_exists(self, schema: str, table: str) -> bool:
"""Check if table exists.
Args:
schema: Name of the schema.
table: Name of the table.
Returns:
True if table exists, False otherwise.
"""
return self.backend.table_exists(schema, table)
[docs]
def create_table(
self,
schema: str,
table: str,
columns: Union[List[StructField], StructType],
) -> None:
"""Create a new table.
Args:
schema: Name of the schema.
table: Name of the table.
columns: Table columns definition.
"""
self.backend.create_table(schema, table, columns)
[docs]
def drop_table(self, schema: str, table: str) -> None:
"""Drop a table.
Args:
schema: Name of the schema.
table: Name of the table.
"""
self.backend.drop_table(schema, table)
[docs]
def insert_data(
self, schema: str, table: str, data: List[Dict[str, Any]], mode: str = "append"
) -> None:
"""Insert data into table.
Args:
schema: Name of the schema.
table: Name of the table.
data: Data to insert.
mode: Insert mode ("append", "overwrite", "ignore").
"""
self.backend.insert_data(schema, table, data)
[docs]
def query_table(
self, schema: str, table: str, filter_expr: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Query data from table.
Args:
schema: Name of the schema.
table: Name of the table.
filter_expr: Optional filter expression.
Returns:
List of data rows.
"""
return self.backend.query_table(schema, table, filter_expr)
[docs]
def get_table_schema(
self, schema_name: str, table_name: str
) -> Union[Any, StructType]:
"""Get table schema.
Args:
schema_name: Name of the schema.
table_name: Name of the table.
Returns:
Table schema.
"""
return self.backend.get_table_schema(schema_name, table_name)
[docs]
def get_data(self, schema: str, table: str) -> List[Dict[str, Any]]:
"""Get all data from table.
Args:
schema: Name of the schema.
table: Name of the table.
Returns:
List of data rows.
"""
return self.backend.get_data(schema, table)
[docs]
def create_temp_view(self, name: str, dataframe: Any) -> None:
"""Create a temporary view from a DataFrame.
Args:
name: Name of the temporary view.
dataframe: DataFrame to create view from.
"""
self.backend.create_temp_view(name, dataframe)
[docs]
def list_tables(self, schema_name: Optional[str] = None) -> List[str]:
"""List tables in schema.
Args:
schema_name: Name of the schema. If None, list tables in all schemas.
Returns:
List of table names.
"""
return self.backend.list_tables(schema_name)
[docs]
def switch_backend(self, backend: IStorageManager) -> None:
"""Switch to a different storage backend.
Args:
backend: New storage backend to use.
"""
self.backend = backend
[docs]
def cleanup_temp_tables(self) -> None:
"""Clean up temporary tables to free memory."""
try:
if hasattr(self.backend, "cleanup_temp_tables"):
self.backend.cleanup_temp_tables()
except Exception:
# Ignore errors during cleanup
pass
[docs]
def optimize_storage(self) -> None:
"""Optimize storage by cleaning up and compacting data."""
try:
if hasattr(self.backend, "optimize_storage"):
self.backend.optimize_storage()
except Exception:
# Ignore errors during optimization
pass
[docs]
def get_memory_usage(self) -> Dict[str, Any]:
"""Get current memory usage statistics.
Returns:
Dictionary with memory usage information.
"""
try:
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return {
"rss": memory_info.rss, # Resident Set Size
"vms": memory_info.vms, # Virtual Memory Size
"percent": process.memory_percent(),
"available": psutil.virtual_memory().available,
"total": psutil.virtual_memory().total,
}
except Exception:
return {"rss": 0, "vms": 0, "percent": 0, "available": 0, "total": 0}
[docs]
def force_garbage_collection(self) -> None:
"""Force garbage collection to free memory."""
gc.collect()
[docs]
def get_table_sizes(self) -> Dict[str, int]:
"""Get estimated sizes of all tables.
Returns:
Dictionary mapping table names to estimated sizes.
"""
sizes = {}
for qualified_name, metadata in self._table_metadata.items():
try:
# Estimate size based on metadata
estimated_size = (
len(metadata.table_schema.fields) * 100
) # Rough estimate
sizes[qualified_name] = estimated_size
except Exception:
sizes[qualified_name] = 0
return sizes
[docs]
def cleanup_old_tables(self, max_age_hours: int = 24) -> int:
"""Clean up tables older than specified age.
Args:
max_age_hours: Maximum age in hours before cleanup.
Returns:
Number of tables cleaned up.
"""
from datetime import timedelta
cutoff_time = datetime.now() - timedelta(hours=max_age_hours)
cleaned_count = 0
for qualified_name, metadata in list(self._table_metadata.items()):
if metadata.created_at < cutoff_time:
try:
# Remove from metadata
del self._table_metadata[qualified_name]
# Remove from backend if possible
if hasattr(self.backend, "drop_table"):
schema, table = qualified_name.split(".", 1)
self.backend.drop_table(schema, table)
cleaned_count += 1
except Exception:
# Ignore errors during cleanup
pass
return cleaned_count