docs(parquet-design): add dev plan with milestones and open questions

Companion to PARQUET_PUSHDOWN_DESIGN.md. Maps the design's six phases
to thirteen shippable milestones, with the architectural decisions
needed to start coding (residency, wire protocol, trust mode,
predicate engine scope, side-index path, library choices) called out
as defaults that the open-questions section asks to confirm.

No code changes — this commit only adds the plan document. M0 does
not start until the open questions are resolved.
This commit is contained in:
Chris Lu
2026-04-25 01:58:49 -07:00
parent 89ed588b34
commit 06b4e46f4f

View File

@@ -0,0 +1,188 @@
# Parquet Pushdown Dev Plan
Companion to [PARQUET_PUSHDOWN_DESIGN.md](./PARQUET_PUSHDOWN_DESIGN.md). The design defines *what* the system does; this doc defines *how it lands in this codebase, in what order, with what milestones*.
## Status
Draft. Open questions remain — see [Open Questions](#open-questions). Coding does not start until those are resolved.
## Goals for the implementation
1. Ship the design's Phase 1 in production-quality form: a pushdown service that, given an Iceberg snapshot resolution, returns column-chunk byte ranges and prunes row groups using parsed-footer statistics.
2. Establish the package layout, wire protocol, and trust-mode plumbing so later phases (scalar indexes, page-level pruning, deletes, vector search) attach without reshaping the surface.
3. Avoid breaking any existing read path — pushdown is additive; standard S3 reads keep working.
## What already exists in the codebase
| Component | Path | Reuse |
|---|---|---|
| Iceberg-Go client and table/manifest types | `weed/s3api/iceberg/`, `weed/plugin/worker/iceberg/` | Catalog reads, manifest parsing, planning |
| Parquet reader (`parquet-go/parquet-go v0.28.0`) | `weed/mq/logstore/`, `weed/query/engine/parquet_scanner.go` | Footer parse, ColumnIndex/OffsetIndex |
| SQL/value type conversion | `weed/query/engine/parquet_scanner.go`, `weed/query/sqltypes/` | Predicate constant typing |
| Iceberg compaction worker (uses iceberg-go for planning) | `weed/plugin/worker/iceberg/planning_index.go` | Reference for snapshot → file-list resolution |
| Filer gRPC + protobuf scaffolding | `weed/pb/filer_pb/`, `weed/server/filer_grpc_server.go` | Wire-protocol pattern, auth |
| S3 IAM / bucket ACL | `weed/s3api/`, `weed/iam/` | Trust enforcement on data files |
| Substrait Go SDK (already a transitive dep) | `github.com/substrait-io/substrait-go/v7` | Predicate decode in v2 |
The dependencies needed for v1 (`parquet-go`, `iceberg-go`, `RoaringBitmap/roaring`, `substrait-go/v7`) are already in `go.mod` (some as indirect; will become direct).
## Architectural decisions for v1
These are commitments unless an open question below changes them.
- **Residency: filer-managed.** The pushdown service runs inside the filer process. Rationale: simplest operational story; reuses filer auth, replication, and metadata; matches Phase 1's I/O profile (read footers and small side indexes). Volume-server colocation and a separate index-serving tier are deferred to Phase 2/3 once we have measurements.
- **Wire protocol: gRPC.** New service definition `parquet_pushdown_pb.ParquetPushdown` in `weed/pb/parquet_pushdown_pb/`. REST/HTTP shim only if a connector requires it. JSON variant deferred.
- **Trust mode for v1: connector-trusted.** The pushdown endpoint is reachable only inside the trust boundary. `PushdownStats.TrustMode = "connector-trusted"`. Catalog-validated mode is built as Phase 1.5, behind a flag, before any external exposure. Manifest-signed is not on the roadmap.
- **Predicate engine for v1: built-in subset evaluator.** The wire format accepts Substrait `ExtendedExpression`, but v1 implements only: comparisons (`=, !=, <, <=, >, >=`), `IN`, `BETWEEN`, `IS NULL`, and boolean `AND/OR/NOT` over them. Anything outside this subset returns `unsupported predicate`, and the connector falls back to a standard scan. Full Substrait evaluation lands in Phase 2.
- **Side-index physical location:** under a separate filer namespace `/__pushdown__/<table_uuid>/...`, governed by the same filer ACLs. Out of any user-visible bucket prefix. Path layout matches the design's `<system-prefix>/<table_uuid>/.../<identity>/` shape.
- **Parquet library: `parquet-go/parquet-go`** (already in use elsewhere in the repo). Apache `arrow-go/v18/parquet` is *not* introduced.
- **Bitmap library: `RoaringBitmap/roaring`.** Used for position-delete bitmaps and DV decoding.
## Code layout
New code lives under `weed/parquet_pushdown/`:
```text
weed/parquet_pushdown/
service.go # gRPC service entry; dispatches to subsystems
request_validation.go # request-shape limits, trust-mode enforcement
catalog/ # Phase 1.5: catalog-validated mode
validator.go # cross-checks request DataFiles against manifest
footer/
cache.go # parsed-footer LRU, keyed by file identity
parser.go # footer + ColumnIndex/OffsetIndex extraction
index/
types.go # SideIndex interface, registry
bloom/ # Phase 2
bitmap/ # Phase 2
btree/ # Phase 2
page/ # Phase 3
vector/ivf/ # Phase 5
predicate/
substrait_decode.go # decode Substrait ExtendedExpression
iceberg_decode.go # decode Iceberg Expression JSON
eval_subset.go # v1 built-in evaluator (comparisons + boolean)
rowgroup_prune.go # zone-map pruning for v1
delete/
position_bitmap.go # Phase 4 — v2 position-delete merge + cache
deletion_vector.go # Phase 4 — v3 Puffin DV reader + cache
equality_eval.go # Phase 4 — equality-delete evaluator
storage/
filer_blob_store.go # read/write side-index blobs through filer
stats.go # PushdownStats accumulation
weed/pb/parquet_pushdown_pb/
parquet_pushdown.proto # request/response messages
parquet_pushdown.pb.go # generated
parquet_pushdown_grpc.pb.go # generated
weed/server/filer_grpc_pushdown.go # filer mounts the pushdown service
weed/command/ # `weed pushdown` subcommands (build/inspect indexes)
```
Tests live alongside the code (`*_test.go`). Integration tests for end-to-end paths land in `test/parquet_pushdown/`.
## Milestones
Each milestone is a shippable PR (or small chain of PRs). M0M2 together implement the design's Phase 1.
### M0 — Infrastructure (1 PR)
- `parquet_pushdown.proto` with `Pushdown` RPC, request/response messages mirroring [API Sketch](./PARQUET_PUSHDOWN_DESIGN.md#api-sketch). Generate Go bindings.
- Empty service implementation that validates request shape (size limits, MaxRowIds cap, trust-mode tag) and returns `unimplemented` for the actual work.
- Filer wiring: register the gRPC service on the filer's existing gRPC server. Add a feature flag `-pushdown.enabled` so the service is off by default in the filer binary.
- `weed pushdown ping` CLI command for smoke testing.
- Acceptance: integration test calls `Pushdown` over gRPC, gets the expected `unimplemented` error path with stats populated.
### M1 — Parsed-footer cache + file/range-level pushdown (1 PR)
- `footer/parser.go`: open Parquet file via the filer chunk reader, parse footer, ColumnIndex, OffsetIndex.
- `footer/cache.go`: LRU keyed by `(path, size, etag-or-recordcount)` per [Index Consistency](./PARQUET_PUSHDOWN_DESIGN.md#index-consistency).
- Service handles requests with `Predicate == nil`: return `FileRanges` covering the requested `Columns` for each `DataFile` (one entry per column chunk), no pruning yet.
- Acceptance:
- Round-trip integration test against a 3-file Iceberg table built in `test/parquet_pushdown/fixtures/`.
- Footer cache hit-rate test (second call avoids re-parse; verified via `PushdownStats.FooterCacheHits`).
### M2 — Row-group zone-map pruning (1 PR)
- `predicate/eval_subset.go`: evaluator for the v1 predicate subset.
- `predicate/rowgroup_prune.go`: read row-group statistics from the parsed footer, drop row groups where the predicate cannot be true, return per-data-file `RowGroupRef` list.
- `predicate/substrait_decode.go`: decode just enough Substrait to feed the subset evaluator; everything else returns `unsupported`.
- Acceptance:
- Predicate `WHERE timestamp BETWEEN t1 AND t2` against a 5M-row partitioned Iceberg table prunes to the expected row groups (verified by counting `RowGroupRef`).
- `unsupported predicate` returns a status the connector can recognize and fall back from.
**End of design Phase 1.**
### M3 — Catalog-validated trust mode (1 PR; optional before external exposure)
- `catalog/validator.go`: read the Iceberg snapshot, verify `(Path, SizeBytes, RecordCount, DataSequenceNumber, PartitionSpecId, PartitionValues)` for each `DataFile` and that the `Deletes` list matches the manifest's attached delete files.
- Configurable per-endpoint: `pushdown.trust=connector-trusted|catalog-validated`.
- Cache validated planning result keyed by `SnapshotId` to amortize manifest reads.
- Acceptance: a tampered request (omitted delete file, wrong sequence number, file not in manifest) is rejected with `permission_denied`.
### M4 — Bloom-filter side index (12 PRs; design Phase 2 starts)
- `index/bloom/`: builder reads Parquet column chunks, produces a SplitBlock Bloom Filter (Parquet's standard) per column chunk, stores via `storage/filer_blob_store.go` keyed by `bloom.fid_<id>`.
- Predicate evaluator consults bloom for `=` / `IN` predicates.
- Background builder triggered on new Iceberg snapshot (hooks into the existing iceberg compaction worker — TODO: confirm hook point).
- Acceptance: equality predicate with no matches against a 10M-row column reads zero data needles; cache miss and miss-build also tested.
### M5 — Bitmap and B-tree side indexes (1 PR each)
- `index/bitmap/` for low/medium-cardinality columns; integrates with `RoaringBitmap`.
- `index/btree/` for sortable columns; range and point-lookup predicates.
- Acceptance: micro-benchmarks vs M2's row-group-only pruning showing ≥10× row-group reduction on the standard test workload.
### M6 — Page-level pruning (1 PR; design Phase 3)
- `index/page/`: wraps Parquet's ColumnIndex + OffsetIndex; row-range translation across columns; dictionary-page byte-range inclusion per [Page-Level Index](./PARQUET_PUSHDOWN_DESIGN.md#3-page-level-index).
- Service returns `PageRef` entries with byte ranges for each projected column.
- Acceptance: predicate selectivity ≥0.99 returns ≤1 page per surviving row group across all projected columns.
### M7M9 — Iceberg deletes (3 PRs; design Phase 4)
- M7: position-delete bitmap caching, keyed per [Position-delete bitmap cache key](./PARQUET_PUSHDOWN_DESIGN.md#position-delete-bitmap-cache-key).
- M8: Puffin DV reader + cache (use `iceberg-go` Puffin support if present, otherwise vendor a minimal Puffin parser).
- M9: equality-delete evaluation accelerated by Phase 2 indexes.
- Acceptance: reference table with both delete forms returns identical row sets to a Spark scan of the same snapshot.
### M10M11 — Vector indexes (design Phase 5)
- M10: partitioned IVF index per [Filtered-ANN strategy](./PARQUET_PUSHDOWN_DESIGN.md#filtered-ann-strategy); per-tenant partitions, top-K merge, score correctness vs brute-force.
- M11: hybrid scalar+vector pushdown; planner selects partitioned-IVF vs post-filter+overscan based on selectivity estimate.
- Acceptance: recall ≥0.95 vs brute force at top-20, 10× speedup on the standard test set.
### M12M13 — Connector integrations (design Phase 6)
- M12: DuckDB extension as PoC; consumes the gRPC API directly.
- M13: Trino + Spark connectors; both use the same gRPC client.
## Test strategy
- **Unit tests:** every package, with parquet-go in-memory fixtures.
- **Integration tests:** `test/parquet_pushdown/` — spins up filer + a small Iceberg catalog (reuse `weed/s3api/iceberg/` infrastructure), generates Parquet data with `parquet-go` writers, compares pushdown results to a brute-force scan.
- **Correctness oracle:** every milestone validates that pushdown output is a *strict subset* of a brute-force Iceberg+Parquet scan with the same predicate, and that the projected-column byte ranges decode to the same rows. The oracle is the safety net that lets us refactor index internals freely.
- **Performance suite:** runs in CI weekly, not on every PR. Tracks footer-cache hit rate, row-group reduction, page reduction, end-to-end query latency vs a baseline standard scan.
## Open Questions
These need answers before M0 starts. Most have defaults proposed above; confirm or override.
1. **Residency for v1 — confirm filer-managed?** Alternative: a separate `weed pushdown` daemon. Filer-managed is simpler but couples pushdown availability to filer availability. *Default: filer-managed.*
2. **Service surface — extend `filer_pb` or new `parquet_pushdown_pb` package?** New package is cleaner but adds a discovery hop for clients. *Default: new package.*
3. **Trust mode for the very first integration tests — connector-trusted only, or do we need M3 (catalog-validated) before any test exercises the trust boundary?** *Default: connector-trusted for M0M2, M3 lands before any external connector PoC (M12).*
4. **Side-index path root.** `/__pushdown__/` is my proposal; is there an existing convention I should match? (I see `/.snapshots/`, `/.iam/`, etc. in other parts of the repo.)
5. **Predicate evaluator scope for v1 — confirm the subset listed above (`=, !=, <, <=, >, >=`, `IN`, `BETWEEN`, `IS NULL`, boolean ops)?** Anything else you want supported up-front (e.g. `LIKE`)? *Default: subset above; `LIKE` deferred to inverted-index milestone.*
6. **Iceberg snapshot integration in M3 — use `iceberg-go` directly, or call into the existing `weed/s3api/iceberg/` and `weed/plugin/worker/iceberg/` code?** The latter has table cache + lock support already. *Default: route through the existing internal package, do not add a parallel iceberg-go integration.*
7. **Auth surface for the gRPC endpoint — reuse the filer's existing auth (mTLS / JWT), or add a pushdown-specific token?** *Default: reuse filer auth; the request principal must have read access to every `DataFile` listed.*
8. **Test corpus for M1.** Is there an existing Iceberg-with-deletes fixture in `test/` we should standardize on, or do we generate one from scratch in `test/parquet_pushdown/fixtures/`? *Default: generate from scratch, parameterized over (with-position-deletes, with-equality-deletes, with-DVs, partitioned/unpartitioned).*
9. **Phase 1 acceptance bar.** What's the definition of done that lets us call Phase 1 shipped — passing tests in CI, a benchmark showing measurable I/O reduction on a reference query, or a connector PoC? *Default: tests + I/O-reduction benchmark; PoC slips to M12.*
10. **Performance regression budget.** Pushdown adds work on the read path. What overhead is acceptable when the predicate misses everything (worst case for pushdown)? *Default: ≤10% added latency vs a direct ranged GET on a 10 MiB single-file scan.*
## Risk register
- **Iceberg-Go maturity for v3 deletion vectors / Puffin.** If `iceberg-go v0.5.0` does not expose Puffin DV blobs, M8 needs a vendored Puffin parser. Spike during M0 to verify.
- **parquet-go ColumnIndex/OffsetIndex completeness.** M6 depends on full ColumnIndex/OffsetIndex support. If the library returns nil for some files, fall back to row-group-only pruning per the design.
- **Catalog read latency.** M3's per-request manifest read must be cacheable per snapshot, otherwise the catalog-validated mode adds >100ms per request. Cache eviction strategy is part of M3's spec.
- **Trust boundary leakage.** A pushdown response must not return data references that a direct read would deny. The auth check (open question 7) is load-bearing; integration tests in M0 must include negative cases.