From fc75f16c30c465ed3d2fd2d4e6846f4d021eac91 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 3 May 2026 00:17:16 -0700 Subject: [PATCH] test(s3tables): expand Dremio Iceberg catalog test coverage (#9303) * test(s3tables): expand Dremio Iceberg catalog test coverage Restructure TestDremioIcebergCatalog into subtests and add three new checks that go beyond a connectivity smoke test: - ColumnProjection: SELECT id, label proves Dremio parsed the schema served by the SeaweedFS REST catalog (the previous SELECT COUNT(*) passed without exercising any column metadata). - InformationSchemaColumns: verifies the table's columns are listed in Dremio's INFORMATION_SCHEMA.COLUMNS in the expected ordinal order. - InformationSchemaTables: verifies the table is registered in INFORMATION_SCHEMA.TABLES. All subtests share a single Dremio container startup, so total runtime is unchanged. * test(s3tables): exercise multi-level Iceberg namespaces from Dremio Seed a 2-level Iceberg namespace (and a table inside it) via the REST catalog before bootstrapping Dremio, then add a MultiLevelNamespace subtest that scans the nested table by its dot-separated reference. This relies on isRecursiveAllowedNamespaces=true (already set in the Dremio source config) to surface the nested levels as folders. A regression in either the SeaweedFS namespace path encoding (#8959-style) or Dremio's recursive-namespace discovery would surface here. Adds two helpers to keep the existing single-level call sites unchanged: - createIcebergNamespaceLevels: namespace creation with []string levels - createIcebergTableInLevels: table creation with []string levels and unit-separator (0x1F) URL encoding for the namespace path component * test(s3tables): verify Dremio reads PyIceberg-written rows The previous Dremio subtests only scanned empty tables, so they did not exercise the data path - just the catalog/metadata path. Add a PyIceberg-based writer that materializes parquet files plus a snapshot on a separate table before Dremio bootstraps, and two new subtests: - ReadWrittenDataCount: SELECT COUNT(*) returns 3. - ReadWrittenDataValues: SELECT id, label ORDER BY id returns the three written rows with the expected (id, label) pairs. The writer runs in a small image (Dockerfile.writer) built locally on demand. It pip-installs pyiceberg+pyarrow once and reuses the layer cache on subsequent runs. The CI workflow pre-pulls python:3.11-slim to keep cold runs predictable. The writer authenticates via the OAuth2 client_credentials flow that SeaweedFS already exposes at /v1/oauth/tokens, mirroring the Go-side helper used for REST-API table creation. * test(s3tables): fix Dremio writer required-field schema mismatch PyIceberg's append() compatibility check rejects an arrow column whose nullability does not match the Iceberg field. The table schema declares id as `required long`, but the default pyarrow int64 column is nullable - so the writer failed with: 1: id: required long vs. 1: id: optional long Declare an explicit pyarrow schema with nullable=False on id and nullable=True on label to match the Iceberg side. --- .github/workflows/s3-tables-tests.yml | 3 + .../s3tables/catalog_dremio/Dockerfile.writer | 12 + test/s3tables/catalog_dremio/README.md | 11 +- test/s3tables/catalog_dremio/append_rows.py | 87 ++++++ .../catalog_dremio/dremio_catalog_test.go | 281 +++++++++++++++++- 5 files changed, 380 insertions(+), 14 deletions(-) create mode 100644 test/s3tables/catalog_dremio/Dockerfile.writer create mode 100644 test/s3tables/catalog_dremio/append_rows.py diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 8f90dafc0..6dec28130 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -211,6 +211,9 @@ jobs: - name: Pre-pull Dremio image run: docker pull dremio/dremio-oss:25.2.0 + - name: Pre-pull Python image for PyIceberg writer + run: docker pull python:3.11-slim + - name: Run go mod tidy run: go mod tidy diff --git a/test/s3tables/catalog_dremio/Dockerfile.writer b/test/s3tables/catalog_dremio/Dockerfile.writer new file mode 100644 index 000000000..4e293de05 --- /dev/null +++ b/test/s3tables/catalog_dremio/Dockerfile.writer @@ -0,0 +1,12 @@ +# Iceberg writer used by the Dremio integration test to populate a table with +# data files via PyIceberg, so the downstream Dremio SELECT verifies the read +# path against non-empty results rather than just COUNT(*) = 0. +FROM python:3.11-slim + +WORKDIR /app + +RUN pip install --no-cache-dir "pyiceberg[s3fs]" pyarrow + +COPY append_rows.py /app/ + +ENTRYPOINT ["python3", "/app/append_rows.py"] diff --git a/test/s3tables/catalog_dremio/README.md b/test/s3tables/catalog_dremio/README.md index 1150db22f..fcec295bf 100644 --- a/test/s3tables/catalog_dremio/README.md +++ b/test/s3tables/catalog_dremio/README.md @@ -13,7 +13,16 @@ This directory contains a Dremio integration smoke test for SeaweedFS's Iceberg 5. Bootstraps a Dremio admin user and logs in. 6. Creates a Dremio `RESTCATALOG` source that points at the SeaweedFS catalog. 7. Submits Dremio SQL through `/api/v3/sql`, polls the job API, and reads job results. -8. Queries the SeaweedFS-backed Iceberg table from Dremio. +8. Runs subtests against the SeaweedFS-backed Iceberg table: + - `BasicSelect`: Dremio is alive and answering SQL. + - `CountEmptyTable`: catalog-to-table resolution and a scan of an empty table. + - `ColumnProjection`: `SELECT id, label` succeeds and the response schema reports both columns. Failure here means Dremio could not parse the schema returned by the SeaweedFS catalog. + - `InformationSchemaColumns`: the table's columns are exposed through Dremio's metadata layer with the expected ordinal order. + - `InformationSchemaTables`: the table is registered in Dremio's `INFORMATION_SCHEMA`. + - `MultiLevelNamespace`: a 2-level Iceberg namespace (created via the REST API) is exposed by Dremio as nested folders, and a table inside it is queryable with dot-separated identifiers. + - `ReadWrittenDataCount` and `ReadWrittenDataValues`: a separate table is populated with three rows by a PyIceberg writer container (`Dockerfile.writer` + `append_rows.py`) before Dremio bootstraps; Dremio reads the data back and the values are verified. This exercises the actual data path, not just metadata. + +The PyIceberg writer image is built on demand via Docker layer caching. The first build pulls `python:3.11-slim` and pip-installs PyIceberg + PyArrow (~1-2 min in CI); subsequent invocations are cheap. ## Running Locally diff --git a/test/s3tables/catalog_dremio/append_rows.py b/test/s3tables/catalog_dremio/append_rows.py new file mode 100644 index 000000000..643a6e397 --- /dev/null +++ b/test/s3tables/catalog_dremio/append_rows.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +"""Append rows to an existing Iceberg table via the SeaweedFS REST catalog. + +Used by the Dremio integration test to materialize data files so a downstream +SELECT from Dremio verifies the read path against non-empty results. + +Usage: + python3 append_rows.py \\ + --catalog-url http://localhost:8181 \\ + --warehouse s3://my-bucket \\ + --prefix my-bucket \\ + --s3-endpoint http://localhost:8333 \\ + --access-key AKIA... --secret-key wJalr... \\ + --namespace foo --namespace bar \\ + --table events +""" + +import argparse +import sys + +import pyarrow as pa +from pyiceberg.catalog import load_catalog + + +def main() -> int: + p = argparse.ArgumentParser() + p.add_argument("--catalog-url", required=True) + p.add_argument("--warehouse", required=True, help="s3://") + p.add_argument("--prefix", required=True, help="REST catalog prefix (table bucket name)") + p.add_argument("--s3-endpoint", required=True, help="http://host:port") + p.add_argument("--access-key", required=True) + p.add_argument("--secret-key", required=True) + p.add_argument("--region", default="us-east-1") + p.add_argument( + "--namespace", + action="append", + required=True, + help="One per level (e.g. --namespace foo --namespace bar for foo.bar).", + ) + p.add_argument("--table", required=True) + args = p.parse_args() + + # `credential` triggers OAuth2 client_credentials against + # /v1/oauth/tokens, matching the helper Go test uses for + # REST-API table creation. The s3.* keys are needed for parquet writes. + catalog = load_catalog( + "rest", + **{ + "type": "rest", + "uri": args.catalog_url, + "warehouse": args.warehouse, + "prefix": args.prefix, + "credential": f"{args.access_key}:{args.secret_key}", + "s3.access-key-id": args.access_key, + "s3.secret-access-key": args.secret_key, + "s3.endpoint": args.s3_endpoint, + "s3.region": args.region, + "s3.path-style-access": "true", + }, + ) + + table_id = tuple(args.namespace) + (args.table,) + table = catalog.load_table(table_id) + + # Match the Iceberg table schema: id is `required long`, label is + # `optional string`. Default pyarrow columns are nullable, which fails + # PyIceberg's required-field compatibility check. + arrow_schema = pa.schema( + [ + pa.field("id", pa.int64(), nullable=False), + pa.field("label", pa.string(), nullable=True), + ] + ) + arrow_table = pa.Table.from_pydict( + { + "id": [1, 2, 3], + "label": ["one", "two", "three"], + }, + schema=arrow_schema, + ) + table.append(arrow_table) + print(f"appended {arrow_table.num_rows} rows to {'.'.join(table_id)}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/test/s3tables/catalog_dremio/dremio_catalog_test.go b/test/s3tables/catalog_dremio/dremio_catalog_test.go index e5ff07566..09eed0055 100644 --- a/test/s3tables/catalog_dremio/dremio_catalog_test.go +++ b/test/s3tables/catalog_dremio/dremio_catalog_test.go @@ -53,8 +53,24 @@ type TestEnvironment struct { } // TestDremioIcebergCatalog starts Dremio, registers SeaweedFS as an Iceberg -// REST catalog source, and runs Dremio SQL against a table served by the -// SeaweedFS catalog. +// REST catalog source, and runs Dremio SQL against tables served by the +// SeaweedFS catalog. The table and namespace are seeded via the Iceberg REST +// API before Dremio bootstraps so they are visible on the source's first scan. +// +// Subtests cover: +// - BasicSelect: Dremio is alive and answering SQL. +// - CountEmptyTable: catalog→table resolution and a scan of an empty table. +// - ColumnProjection: the column names from the SeaweedFS-issued schema are +// usable in Dremio (failure here means Dremio could not parse the schema). +// - InformationSchemaColumns: the table's columns are exposed through +// Dremio's metadata layer with the expected name and ordinal positions. +// - InformationSchemaTables: the table is registered in Dremio's INFORMATION_SCHEMA. +// - MultiLevelNamespace: a 2-level Iceberg namespace is surfaced as nested +// folders and a table inside it is queryable with dot-separated identifiers. +// - ReadWrittenDataCount / ReadWrittenDataValues: a separate table populated +// via PyIceberg before Dremio bootstrap is read back through Dremio with +// correct row count and values, exercising the actual data path (not just +// metadata). func TestDremioIcebergCatalog(t *testing.T) { requireDremioRuntime(t) @@ -78,17 +94,122 @@ func TestDremioIcebergCatalog(t *testing.T) { createIcebergNamespace(t, env, icebergToken, tableBucket, namespace) createIcebergTable(t, env, icebergToken, tableBucket, namespace, tableName) + // Seed a true multi-level namespace and a table inside it. Created before + // Dremio bootstraps so the source's first scan with + // isRecursiveAllowedNamespaces=true discovers both levels. + multiLevelNs := []string{ + "ml_parent_" + randomString(4), + "ml_child_" + randomString(4), + } + multiLevelTable := "nested_" + randomString(6) + createIcebergNamespaceLevels(t, env, icebergToken, tableBucket, multiLevelNs[:1]) + createIcebergNamespaceLevels(t, env, icebergToken, tableBucket, multiLevelNs) + createIcebergTableInLevels(t, env, icebergToken, tableBucket, multiLevelNs, multiLevelTable) + + // Seed a populated table by creating an empty one through the REST API + // and then appending rows via PyIceberg. Done before Dremio bootstrap so + // the snapshot is part of the source's first scan. + populatedTable := "populated_" + randomString(6) + createIcebergTable(t, env, icebergToken, tableBucket, namespace, populatedTable) + buildDremioWriterImage(t) + writeIcebergRows(t, env, tableBucket, []string{namespace}, populatedTable) + configDir := env.writeDremioConfig(t, tableBucket) env.startDremioContainer(t, configDir) waitForDremio(t, env.dremioContainer, 180*time.Second) env.bootstrapDremio(t, tableBucket) - selectOutput := runDremioSQL(t, env, "SELECT 1 AS ok") - assertSingleNumericValue(t, selectOutput, 1) - tableRef := dremioObjectName(dremioSourceName, namespace, tableName) - countOutput := runDremioSQL(t, env, fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", tableRef)) - assertSingleNumericValue(t, countOutput, 0) + + t.Run("BasicSelect", func(t *testing.T) { + out := runDremioSQL(t, env, "SELECT 1 AS ok") + assertSingleNumericValue(t, out, 1) + }) + + t.Run("CountEmptyTable", func(t *testing.T) { + out := runDremioSQL(t, env, fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", tableRef)) + assertSingleNumericValue(t, out, 0) + }) + + t.Run("ColumnProjection", func(t *testing.T) { + // SELECT COUNT(*) does not exercise the schema. A projection by + // column name fails fast with "column not found" if the schema + // from the SeaweedFS catalog response was not parsed. + out := runDremioSQL(t, env, fmt.Sprintf("SELECT id, label FROM %s", tableRef)) + schema, rows := parseDremioResponseSchemaRows(t, out) + if len(rows) != 0 { + t.Fatalf("Expected empty result set, got %d rows: %v", len(rows), rows) + } + assertSchemaContainsAll(t, schema, "id", "label") + }) + + t.Run("InformationSchemaColumns", func(t *testing.T) { + // Filter only by TABLE_NAME (which is randomized and globally unique + // in this run) to avoid depending on the exact TABLE_SCHEMA path + // Dremio synthesizes for a nested REST-catalog folder. + query := fmt.Sprintf( + "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '%s' ORDER BY ORDINAL_POSITION", + tableName, + ) + out := runDremioSQL(t, env, query) + _, rows := parseDremioResponseSchemaRows(t, out) + columnNames := extractColumnNames(t, rows) + expected := []string{"id", "label"} + if !equalStringSlices(columnNames, expected) { + t.Fatalf("INFORMATION_SCHEMA.COLUMNS for table %s = %v, want %v", + tableName, columnNames, expected) + } + }) + + t.Run("InformationSchemaTables", func(t *testing.T) { + query := fmt.Sprintf( + "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.\"TABLES\" WHERE TABLE_NAME = '%s'", + tableName, + ) + out := runDremioSQL(t, env, query) + _, rows := parseDremioResponseSchemaRows(t, out) + if len(rows) == 0 { + t.Fatalf("INFORMATION_SCHEMA.TABLES did not list %s; raw response: %s", tableName, out) + } + }) + + t.Run("MultiLevelNamespace", func(t *testing.T) { + // Reference is "iceberg"."".""."", relying on + // isRecursiveAllowedNamespaces=true in the Dremio source config to + // surface nested namespaces as nested folders. + parts := append([]string{dremioSourceName}, append(append([]string{}, multiLevelNs...), multiLevelTable)...) + ref := dremioObjectName(parts...) + out := runDremioSQL(t, env, fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", ref)) + assertSingleNumericValue(t, out, 0) + }) + + populatedRef := dremioObjectName(dremioSourceName, namespace, populatedTable) + + t.Run("ReadWrittenDataCount", func(t *testing.T) { + out := runDremioSQL(t, env, fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", populatedRef)) + assertSingleNumericValue(t, out, 3) + }) + + t.Run("ReadWrittenDataValues", func(t *testing.T) { + out := runDremioSQL(t, env, fmt.Sprintf("SELECT id, label FROM %s ORDER BY id", populatedRef)) + schema, rows := parseDremioResponseSchemaRows(t, out) + assertSchemaContainsAll(t, schema, "id", "label") + if len(rows) != 3 { + t.Fatalf("expected 3 rows from %s, got %d: %v", populatedRef, len(rows), rows) + } + expected := [][2]string{{"1", "one"}, {"2", "two"}, {"3", "three"}} + for i, row := range rows { + if len(row) != 2 { + t.Fatalf("row %d has %d columns, want 2: %v", i, len(row), row) + } + gotID := fmt.Sprintf("%v", row[0]) + gotLabel := fmt.Sprintf("%v", row[1]) + if gotID != expected[i][0] || gotLabel != expected[i][1] { + t.Errorf("row %d = (%s, %s), want (%s, %s)", + i, gotID, gotLabel, expected[i][0], expected[i][1]) + } + } + }) } // NewTestEnvironment creates a new test environment with allocated ports and configuration. @@ -648,6 +769,17 @@ func (env *TestEnvironment) waitForDremioJob(t *testing.T, jobID, sql string) { // parseDremioResponse parses the JSON response from Dremio and extracts rows. func parseDremioResponse(t *testing.T, output string) [][]interface{} { t.Helper() + _, rows := parseDremioResponseSchemaRows(t, output) + return rows +} + +// parseDremioResponseSchemaRows parses Dremio's job-results JSON and returns +// both the response schema (column names in declaration order) and the rows. +// Tests that need to assert on column metadata (column projection, +// INFORMATION_SCHEMA queries) use the schema; tests that only need values use +// parseDremioResponse. +func parseDremioResponseSchemaRows(t *testing.T, output string) ([]string, [][]interface{}) { + t.Helper() var response map[string]interface{} decoder := json.NewDecoder(strings.NewReader(output)) @@ -703,7 +835,59 @@ func parseDremioResponse(t *testing.T, output string) [][]interface{} { result = append(result, values) } } - return result + return schemaNames, result +} + +// assertSchemaContainsAll fails the test if any expected column name is +// missing from the response schema. Order is not checked. +func assertSchemaContainsAll(t *testing.T, schema []string, expected ...string) { + t.Helper() + + present := make(map[string]bool, len(schema)) + for _, name := range schema { + present[strings.ToLower(name)] = true + } + var missing []string + for _, name := range expected { + if !present[strings.ToLower(name)] { + missing = append(missing, name) + } + } + if len(missing) > 0 { + t.Fatalf("Dremio response schema %v missing expected columns %v", schema, missing) + } +} + +// extractColumnNames pulls the first value of each row as a string. Used to +// turn a `SELECT some_name FROM ...` result into a flat slice. +func extractColumnNames(t *testing.T, rows [][]interface{}) []string { + t.Helper() + + names := make([]string, 0, len(rows)) + for _, row := range rows { + if len(row) == 0 { + t.Fatalf("Dremio result row is empty: %v", rows) + } + switch v := row[0].(type) { + case string: + names = append(names, v) + default: + names = append(names, fmt.Sprintf("%v", v)) + } + } + return names +} + +func equalStringSlices(got, want []string) bool { + if len(got) != len(want) { + return false + } + for i := range got { + if !strings.EqualFold(got[i], want[i]) { + return false + } + } + return true } func assertSingleNumericValue(t *testing.T, output string, expected float64) { @@ -768,17 +952,34 @@ func requestIcebergOAuthToken(t *testing.T, env *TestEnvironment) string { func createIcebergNamespace(t *testing.T, env *TestEnvironment, token, bucketName, namespace string) { t.Helper() - - doIcebergJSONRequest(t, env, token, http.MethodPost, fmt.Sprintf("/v1/%s/namespaces", url.PathEscape(bucketName)), map[string]any{ - "namespace": []string{namespace}, - }, http.StatusOK, http.StatusConflict) + createIcebergNamespaceLevels(t, env, token, bucketName, []string{namespace}) } func createIcebergTable(t *testing.T, env *TestEnvironment, token, bucketName, namespace, tableName string) { t.Helper() + createIcebergTableInLevels(t, env, token, bucketName, []string{namespace}, tableName) +} +// createIcebergNamespaceLevels creates a multi-level Iceberg namespace via the +// REST catalog. Single-element levels create a flat namespace; multi-element +// levels create the nested form (e.g. ["analytics", "daily"]). +func createIcebergNamespaceLevels(t *testing.T, env *TestEnvironment, token, bucketName string, levels []string) { + t.Helper() + + doIcebergJSONRequest(t, env, token, http.MethodPost, fmt.Sprintf("/v1/%s/namespaces", url.PathEscape(bucketName)), map[string]any{ + "namespace": levels, + }, http.StatusOK, http.StatusConflict) +} + +// createIcebergTableInLevels creates a table in a (possibly multi-level) +// namespace. The namespace path component is encoded with the unit-separator +// (0x1F) convention used by SeaweedFS's Iceberg REST API. +func createIcebergTableInLevels(t *testing.T, env *TestEnvironment, token, bucketName string, levels []string, tableName string) { + t.Helper() + + encodedNs := strings.Join(levels, "\x1F") doIcebergJSONRequest(t, env, token, http.MethodPost, - fmt.Sprintf("/v1/%s/namespaces/%s/tables", url.PathEscape(bucketName), url.PathEscape(namespace)), + fmt.Sprintf("/v1/%s/namespaces/%s/tables", url.PathEscape(bucketName), url.PathEscape(encodedNs)), map[string]any{ "name": tableName, "schema": map[string]any{ @@ -792,6 +993,60 @@ func createIcebergTable(t *testing.T, env *TestEnvironment, token, bucketName, n }, http.StatusOK) } +const dremioWriterImage = "seaweedfs-dremio-writer" + +// buildDremioWriterImage builds the local PyIceberg writer image. Layer +// caching makes repeat invocations cheap; the first build pulls +// python:3.11-slim and pip-installs pyiceberg+pyarrow (~1-2 min in CI). +func buildDremioWriterImage(t *testing.T) { + t.Helper() + + wd, err := os.Getwd() + if err != nil { + t.Fatalf("Failed to get working directory: %v", err) + } + + cmd := exec.Command("docker", "build", + "-t", dremioWriterImage, + "-f", filepath.Join(wd, "Dockerfile.writer"), + wd, + ) + if out, err := cmd.CombinedOutput(); err != nil { + t.Fatalf("Failed to build %s image: %v\n%s", dremioWriterImage, err, out) + } +} + +// writeIcebergRows runs the PyIceberg writer container, which loads the +// already-created table and appends three rows. The container reaches +// SeaweedFS via host.docker.internal (matching the Dremio container's path). +func writeIcebergRows(t *testing.T, env *TestEnvironment, bucketName string, namespace []string, tableName string) { + t.Helper() + + args := []string{ + "run", "--rm", + "--add-host", "host.docker.internal:host-gateway", + dremioWriterImage, + "--catalog-url", fmt.Sprintf("http://host.docker.internal:%d", env.icebergPort), + "--warehouse", "s3://" + bucketName, + "--prefix", bucketName, + "--s3-endpoint", fmt.Sprintf("http://host.docker.internal:%d", env.s3Port), + "--access-key", env.accessKey, + "--secret-key", env.secretKey, + "--region", "us-west-2", + "--table", tableName, + } + for _, level := range namespace { + args = append(args, "--namespace", level) + } + + cmd := exec.Command("docker", args...) + out, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("PyIceberg writer failed: %v\n%s", err, out) + } + t.Logf("PyIceberg writer output: %s", strings.TrimSpace(string(out))) +} + func doIcebergJSONRequest(t *testing.T, env *TestEnvironment, token, method, path string, payload any, expectedStatuses ...int) string { t.Helper()