Improving Sparkless for Robin-Sparklessο
This document lists concrete ways to improve Sparkless so it works better with the Robin backend (robin-sparkless). It is based on the Robin mode failure analysis and test report.
1. Summaryο
Area |
Priority |
Owner |
Status / notes |
|---|---|---|---|
Join: support Column expression as |
High |
Sparkless |
Done β materializer accepts |
Join: fix 0-row result (inner/left/right/outer) |
High |
Sparkless / robin-sparkless |
Robin materializer runs join but returns 0 rows; debug robin |
Filter: support AND/OR and more expressions |
High |
Sparkless |
Done β |
groupBy + agg in Robin materializer |
High |
Sparkless |
Add |
Semi/anti join support in Robin |
Medium |
Sparkless / robin-sparkless |
Done β |
Column comparison / TypeError in fixtures |
Medium |
Sparkless |
Avoid comparing or ordering raw |
Parquet/table append semantics for Robin |
Medium |
Sparkless |
If append/visibility under Robin storage is wrong, fix storage/session logic for the Robin delegate. |
Test/fixture expectations (robin in backend list) |
Low |
Sparkless |
Done β include |
Robin storage |
Low |
Sparkless |
Done β added so teardown no longer raises. |
2. Implementedο
Robin storage
db_pathβRobinStorageManagerexposesdb_path; parquet teardown ERRORs are fixed.Arbitrary schema β Materializer uses
create_dataframe_from_rows(data, schema)and_spark_type_to_robin_dtypefor any schema.orderBy, withColumn, join, union β Materializer supports these; join accepts both string/list and Column expression (
col == col) via_join_on_to_column_names().Join on Column expression β
_can_handle_joinand join materialization use_join_on_to_column_names(on)sodf.join(other, df.a == other.a, "inner")is handled by Robin instead of raising unsupported.Filter AND/OR β
_simple_filter_to_robinhandlesColumnOperation("&", left, right)andColumnOperation("|", left, right)recursively so combined conditions are translated to Robin.Semi/anti join β
_can_handle_joinacceptsleft_semi,left_anti,semi,anti; materialize passes them through to robin (robin may support or raise at runtime).Join Row conversion β Materializer uses
SchemaManager.project_schema_with_operationsto build a final schema and passes it toRow(d, schema=final_schema)so column order and duplicate names after join are correct.Tests/fixtures β Backend list includes
'robin'where needed (e.g. unified infrastructure example).
3. Recommended next steps (in order)ο
3.1 Fix join result (0 rows)ο
Robin join path runs but returns 0 rows for inner/left/right/outer.
Sparkless: Check how we convert
df.collect()toList[Row](column names, schema, duplicate names after join). Ensure weβre not dropping or mis-mapping columns.robin-sparkless: Verify
join(other, on=["dept_id"], how="inner")with two DataFrames that both havedept_idreturns the expected rows; if not, report upstream or adapt our call (e.g. left_on/right_on if the API supports it).
3.2 Extend filter translation (AND/OR)ο
Many failures are SparkUnsupportedOperationError for filter.
In
sparkless/backend/robin/materializer.py, extend_simple_filter_to_robin(or add a wrapper) to handle:ColumnOperation("&", left, right)β_simple_filter_to_robin(left) and _simple_filter_to_robin(right)(and map to robinβs&or chained.filter()).ColumnOperation("|", left, right)β same for OR.
Recursively support nested AND/OR so more real-world filters are handled by Robin.
3.3 Add groupBy + agg to Robin materializerο
Add
"groupBy"toRobinMaterializer.SUPPORTED_OPERATIONS.In the operations queue, groupBy is typically followed by a GroupedData agg; the payload may be
(group_by_columns, agg_exprs)or similar (see how Polars materializer receives it).Translate to robin_sparkless:
df.group_by([...]).agg(...)(or equivalent GroupedData API). Start with simple aggs (count, sum, min, max, avg).
3.4 Semi/anti joinο
Check robin_sparkless API for left_semi / left_anti (or equivalent). If present, add
_can_handle_joinforhow in ("left_semi", "left_anti")and translate in the join branch.If absent, keep raising unsupported for semi/anti or document as limitation.
4. Code locationsο
Change |
File(s) |
|---|---|
Join on expression, filter AND/OR, groupBy |
|
Column comparison in tests |
|
Robin storage |
|
5. How to validateο
Run full suite in Robin mode and compare pass/fail counts to the last report:
SPARKLESS_TEST_BACKEND=robin SPARKLESS_BACKEND=robin pytest tests/ --ignore=tests/archive -n 10 --dist loadfile -v --tb=short 2>&1 | tee tests/robin_mode_test_results.txt
Run only parity join tests to confirm join path and row counts:
SPARKLESS_TEST_BACKEND=robin SPARKLESS_BACKEND=robin pytest tests/parity/dataframe/test_join.py -v --tb=short
After adding filter AND/OR or groupBy, run filter/groupby parity tests in Robin mode and check that they pass or fail with a clear, non-unsupported error.
6. robin-sparkless (upstream)ο
Robin-sparkless already exposes the APIs we need (arbitrary schema, filter, select, with_column, order_by, join, union, group_by, GroupedData). No upstream feature request is required for βmore operationsβ or βflexible schema.β Concrete bugs (wrong result, wrong row count, missing behavior) should be reported upstream with a minimal repro; see robin_sparkless_issues.md.