test(s3tables): expand Dremio Iceberg catalog test coverage (#9303)

* test(s3tables): expand Dremio Iceberg catalog test coverage

Restructure TestDremioIcebergCatalog into subtests and add three new
checks that go beyond a connectivity smoke test:

- ColumnProjection: SELECT id, label proves Dremio parsed the schema
  served by the SeaweedFS REST catalog (the previous SELECT COUNT(*)
  passed without exercising any column metadata).
- InformationSchemaColumns: verifies the table's columns are listed in
  Dremio's INFORMATION_SCHEMA.COLUMNS in the expected ordinal order.
- InformationSchemaTables: verifies the table is registered in
  INFORMATION_SCHEMA.TABLES.

All subtests share a single Dremio container startup, so total
runtime is unchanged.

* test(s3tables): exercise multi-level Iceberg namespaces from Dremio

Seed a 2-level Iceberg namespace (and a table inside it) via the REST
catalog before bootstrapping Dremio, then add a MultiLevelNamespace
subtest that scans the nested table by its dot-separated reference.

This relies on isRecursiveAllowedNamespaces=true (already set in the
Dremio source config) to surface the nested levels as folders. A
regression in either the SeaweedFS namespace path encoding (#8959-style)
or Dremio's recursive-namespace discovery would surface here.

Adds two helpers to keep the existing single-level call sites unchanged:

- createIcebergNamespaceLevels: namespace creation with []string levels
- createIcebergTableInLevels: table creation with []string levels and
  unit-separator (0x1F) URL encoding for the namespace path component

* test(s3tables): verify Dremio reads PyIceberg-written rows

The previous Dremio subtests only scanned empty tables, so they did not
exercise the data path - just the catalog/metadata path. Add a
PyIceberg-based writer that materializes parquet files plus a snapshot
on a separate table before Dremio bootstraps, and two new subtests:

- ReadWrittenDataCount: SELECT COUNT(*) returns 3.
- ReadWrittenDataValues: SELECT id, label ORDER BY id returns the three
  written rows with the expected (id, label) pairs.

The writer runs in a small image (Dockerfile.writer) built locally on
demand. It pip-installs pyiceberg+pyarrow once and reuses the layer
cache on subsequent runs. The CI workflow pre-pulls python:3.11-slim
to keep cold runs predictable.

The writer authenticates via the OAuth2 client_credentials flow that
SeaweedFS already exposes at /v1/oauth/tokens, mirroring the Go-side
helper used for REST-API table creation.

* test(s3tables): fix Dremio writer required-field schema mismatch

PyIceberg's append() compatibility check rejects an arrow column whose
nullability does not match the Iceberg field. The table schema declares
id as `required long`, but the default pyarrow int64 column is nullable
- so the writer failed with:

    1: id: required long  vs.  1: id: optional long

Declare an explicit pyarrow schema with nullable=False on id and
nullable=True on label to match the Iceberg side.
This commit is contained in:
Chris Lu
2026-05-03 00:17:16 -07:00
committed by GitHub
parent f16353de0b
commit fc75f16c30
5 changed files with 380 additions and 14 deletions

View File

@@ -211,6 +211,9 @@ jobs:
- name: Pre-pull Dremio image
run: docker pull dremio/dremio-oss:25.2.0
- name: Pre-pull Python image for PyIceberg writer
run: docker pull python:3.11-slim
- name: Run go mod tidy
run: go mod tidy

View File

@@ -0,0 +1,12 @@
# Iceberg writer used by the Dremio integration test to populate a table with
# data files via PyIceberg, so the downstream Dremio SELECT verifies the read
# path against non-empty results rather than just COUNT(*) = 0.
FROM python:3.11-slim
WORKDIR /app
RUN pip install --no-cache-dir "pyiceberg[s3fs]" pyarrow
COPY append_rows.py /app/
ENTRYPOINT ["python3", "/app/append_rows.py"]

View File

@@ -13,7 +13,16 @@ This directory contains a Dremio integration smoke test for SeaweedFS's Iceberg
5. Bootstraps a Dremio admin user and logs in.
6. Creates a Dremio `RESTCATALOG` source that points at the SeaweedFS catalog.
7. Submits Dremio SQL through `/api/v3/sql`, polls the job API, and reads job results.
8. Queries the SeaweedFS-backed Iceberg table from Dremio.
8. Runs subtests against the SeaweedFS-backed Iceberg table:
- `BasicSelect`: Dremio is alive and answering SQL.
- `CountEmptyTable`: catalog-to-table resolution and a scan of an empty table.
- `ColumnProjection`: `SELECT id, label` succeeds and the response schema reports both columns. Failure here means Dremio could not parse the schema returned by the SeaweedFS catalog.
- `InformationSchemaColumns`: the table's columns are exposed through Dremio's metadata layer with the expected ordinal order.
- `InformationSchemaTables`: the table is registered in Dremio's `INFORMATION_SCHEMA`.
- `MultiLevelNamespace`: a 2-level Iceberg namespace (created via the REST API) is exposed by Dremio as nested folders, and a table inside it is queryable with dot-separated identifiers.
- `ReadWrittenDataCount` and `ReadWrittenDataValues`: a separate table is populated with three rows by a PyIceberg writer container (`Dockerfile.writer` + `append_rows.py`) before Dremio bootstraps; Dremio reads the data back and the values are verified. This exercises the actual data path, not just metadata.
The PyIceberg writer image is built on demand via Docker layer caching. The first build pulls `python:3.11-slim` and pip-installs PyIceberg + PyArrow (~1-2 min in CI); subsequent invocations are cheap.
## Running Locally

View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python3
"""Append rows to an existing Iceberg table via the SeaweedFS REST catalog.
Used by the Dremio integration test to materialize data files so a downstream
SELECT from Dremio verifies the read path against non-empty results.
Usage:
python3 append_rows.py \\
--catalog-url http://localhost:8181 \\
--warehouse s3://my-bucket \\
--prefix my-bucket \\
--s3-endpoint http://localhost:8333 \\
--access-key AKIA... --secret-key wJalr... \\
--namespace foo --namespace bar \\
--table events
"""
import argparse
import sys
import pyarrow as pa
from pyiceberg.catalog import load_catalog
def main() -> int:
p = argparse.ArgumentParser()
p.add_argument("--catalog-url", required=True)
p.add_argument("--warehouse", required=True, help="s3://<bucket-name>")
p.add_argument("--prefix", required=True, help="REST catalog prefix (table bucket name)")
p.add_argument("--s3-endpoint", required=True, help="http://host:port")
p.add_argument("--access-key", required=True)
p.add_argument("--secret-key", required=True)
p.add_argument("--region", default="us-east-1")
p.add_argument(
"--namespace",
action="append",
required=True,
help="One per level (e.g. --namespace foo --namespace bar for foo.bar).",
)
p.add_argument("--table", required=True)
args = p.parse_args()
# `credential` triggers OAuth2 client_credentials against
# <catalog_uri>/v1/oauth/tokens, matching the helper Go test uses for
# REST-API table creation. The s3.* keys are needed for parquet writes.
catalog = load_catalog(
"rest",
**{
"type": "rest",
"uri": args.catalog_url,
"warehouse": args.warehouse,
"prefix": args.prefix,
"credential": f"{args.access_key}:{args.secret_key}",
"s3.access-key-id": args.access_key,
"s3.secret-access-key": args.secret_key,
"s3.endpoint": args.s3_endpoint,
"s3.region": args.region,
"s3.path-style-access": "true",
},
)
table_id = tuple(args.namespace) + (args.table,)
table = catalog.load_table(table_id)
# Match the Iceberg table schema: id is `required long`, label is
# `optional string`. Default pyarrow columns are nullable, which fails
# PyIceberg's required-field compatibility check.
arrow_schema = pa.schema(
[
pa.field("id", pa.int64(), nullable=False),
pa.field("label", pa.string(), nullable=True),
]
)
arrow_table = pa.Table.from_pydict(
{
"id": [1, 2, 3],
"label": ["one", "two", "three"],
},
schema=arrow_schema,
)
table.append(arrow_table)
print(f"appended {arrow_table.num_rows} rows to {'.'.join(table_id)}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -53,8 +53,24 @@ type TestEnvironment struct {
}
// TestDremioIcebergCatalog starts Dremio, registers SeaweedFS as an Iceberg
// REST catalog source, and runs Dremio SQL against a table served by the
// SeaweedFS catalog.
// REST catalog source, and runs Dremio SQL against tables served by the
// SeaweedFS catalog. The table and namespace are seeded via the Iceberg REST
// API before Dremio bootstraps so they are visible on the source's first scan.
//
// Subtests cover:
// - BasicSelect: Dremio is alive and answering SQL.
// - CountEmptyTable: catalog→table resolution and a scan of an empty table.
// - ColumnProjection: the column names from the SeaweedFS-issued schema are
// usable in Dremio (failure here means Dremio could not parse the schema).
// - InformationSchemaColumns: the table's columns are exposed through
// Dremio's metadata layer with the expected name and ordinal positions.
// - InformationSchemaTables: the table is registered in Dremio's INFORMATION_SCHEMA.
// - MultiLevelNamespace: a 2-level Iceberg namespace is surfaced as nested
// folders and a table inside it is queryable with dot-separated identifiers.
// - ReadWrittenDataCount / ReadWrittenDataValues: a separate table populated
// via PyIceberg before Dremio bootstrap is read back through Dremio with
// correct row count and values, exercising the actual data path (not just
// metadata).
func TestDremioIcebergCatalog(t *testing.T) {
requireDremioRuntime(t)
@@ -78,17 +94,122 @@ func TestDremioIcebergCatalog(t *testing.T) {
createIcebergNamespace(t, env, icebergToken, tableBucket, namespace)
createIcebergTable(t, env, icebergToken, tableBucket, namespace, tableName)
// Seed a true multi-level namespace and a table inside it. Created before
// Dremio bootstraps so the source's first scan with
// isRecursiveAllowedNamespaces=true discovers both levels.
multiLevelNs := []string{
"ml_parent_" + randomString(4),
"ml_child_" + randomString(4),
}
multiLevelTable := "nested_" + randomString(6)
createIcebergNamespaceLevels(t, env, icebergToken, tableBucket, multiLevelNs[:1])
createIcebergNamespaceLevels(t, env, icebergToken, tableBucket, multiLevelNs)
createIcebergTableInLevels(t, env, icebergToken, tableBucket, multiLevelNs, multiLevelTable)
// Seed a populated table by creating an empty one through the REST API
// and then appending rows via PyIceberg. Done before Dremio bootstrap so
// the snapshot is part of the source's first scan.
populatedTable := "populated_" + randomString(6)
createIcebergTable(t, env, icebergToken, tableBucket, namespace, populatedTable)
buildDremioWriterImage(t)
writeIcebergRows(t, env, tableBucket, []string{namespace}, populatedTable)
configDir := env.writeDremioConfig(t, tableBucket)
env.startDremioContainer(t, configDir)
waitForDremio(t, env.dremioContainer, 180*time.Second)
env.bootstrapDremio(t, tableBucket)
selectOutput := runDremioSQL(t, env, "SELECT 1 AS ok")
assertSingleNumericValue(t, selectOutput, 1)
tableRef := dremioObjectName(dremioSourceName, namespace, tableName)
countOutput := runDremioSQL(t, env, fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", tableRef))
assertSingleNumericValue(t, countOutput, 0)
t.Run("BasicSelect", func(t *testing.T) {
out := runDremioSQL(t, env, "SELECT 1 AS ok")
assertSingleNumericValue(t, out, 1)
})
t.Run("CountEmptyTable", func(t *testing.T) {
out := runDremioSQL(t, env, fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", tableRef))
assertSingleNumericValue(t, out, 0)
})
t.Run("ColumnProjection", func(t *testing.T) {
// SELECT COUNT(*) does not exercise the schema. A projection by
// column name fails fast with "column not found" if the schema
// from the SeaweedFS catalog response was not parsed.
out := runDremioSQL(t, env, fmt.Sprintf("SELECT id, label FROM %s", tableRef))
schema, rows := parseDremioResponseSchemaRows(t, out)
if len(rows) != 0 {
t.Fatalf("Expected empty result set, got %d rows: %v", len(rows), rows)
}
assertSchemaContainsAll(t, schema, "id", "label")
})
t.Run("InformationSchemaColumns", func(t *testing.T) {
// Filter only by TABLE_NAME (which is randomized and globally unique
// in this run) to avoid depending on the exact TABLE_SCHEMA path
// Dremio synthesizes for a nested REST-catalog folder.
query := fmt.Sprintf(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '%s' ORDER BY ORDINAL_POSITION",
tableName,
)
out := runDremioSQL(t, env, query)
_, rows := parseDremioResponseSchemaRows(t, out)
columnNames := extractColumnNames(t, rows)
expected := []string{"id", "label"}
if !equalStringSlices(columnNames, expected) {
t.Fatalf("INFORMATION_SCHEMA.COLUMNS for table %s = %v, want %v",
tableName, columnNames, expected)
}
})
t.Run("InformationSchemaTables", func(t *testing.T) {
query := fmt.Sprintf(
"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.\"TABLES\" WHERE TABLE_NAME = '%s'",
tableName,
)
out := runDremioSQL(t, env, query)
_, rows := parseDremioResponseSchemaRows(t, out)
if len(rows) == 0 {
t.Fatalf("INFORMATION_SCHEMA.TABLES did not list %s; raw response: %s", tableName, out)
}
})
t.Run("MultiLevelNamespace", func(t *testing.T) {
// Reference is "iceberg"."<level1>"."<level2>"."<table>", relying on
// isRecursiveAllowedNamespaces=true in the Dremio source config to
// surface nested namespaces as nested folders.
parts := append([]string{dremioSourceName}, append(append([]string{}, multiLevelNs...), multiLevelTable)...)
ref := dremioObjectName(parts...)
out := runDremioSQL(t, env, fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", ref))
assertSingleNumericValue(t, out, 0)
})
populatedRef := dremioObjectName(dremioSourceName, namespace, populatedTable)
t.Run("ReadWrittenDataCount", func(t *testing.T) {
out := runDremioSQL(t, env, fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", populatedRef))
assertSingleNumericValue(t, out, 3)
})
t.Run("ReadWrittenDataValues", func(t *testing.T) {
out := runDremioSQL(t, env, fmt.Sprintf("SELECT id, label FROM %s ORDER BY id", populatedRef))
schema, rows := parseDremioResponseSchemaRows(t, out)
assertSchemaContainsAll(t, schema, "id", "label")
if len(rows) != 3 {
t.Fatalf("expected 3 rows from %s, got %d: %v", populatedRef, len(rows), rows)
}
expected := [][2]string{{"1", "one"}, {"2", "two"}, {"3", "three"}}
for i, row := range rows {
if len(row) != 2 {
t.Fatalf("row %d has %d columns, want 2: %v", i, len(row), row)
}
gotID := fmt.Sprintf("%v", row[0])
gotLabel := fmt.Sprintf("%v", row[1])
if gotID != expected[i][0] || gotLabel != expected[i][1] {
t.Errorf("row %d = (%s, %s), want (%s, %s)",
i, gotID, gotLabel, expected[i][0], expected[i][1])
}
}
})
}
// NewTestEnvironment creates a new test environment with allocated ports and configuration.
@@ -648,6 +769,17 @@ func (env *TestEnvironment) waitForDremioJob(t *testing.T, jobID, sql string) {
// parseDremioResponse parses the JSON response from Dremio and extracts rows.
func parseDremioResponse(t *testing.T, output string) [][]interface{} {
t.Helper()
_, rows := parseDremioResponseSchemaRows(t, output)
return rows
}
// parseDremioResponseSchemaRows parses Dremio's job-results JSON and returns
// both the response schema (column names in declaration order) and the rows.
// Tests that need to assert on column metadata (column projection,
// INFORMATION_SCHEMA queries) use the schema; tests that only need values use
// parseDremioResponse.
func parseDremioResponseSchemaRows(t *testing.T, output string) ([]string, [][]interface{}) {
t.Helper()
var response map[string]interface{}
decoder := json.NewDecoder(strings.NewReader(output))
@@ -703,7 +835,59 @@ func parseDremioResponse(t *testing.T, output string) [][]interface{} {
result = append(result, values)
}
}
return result
return schemaNames, result
}
// assertSchemaContainsAll fails the test if any expected column name is
// missing from the response schema. Order is not checked.
func assertSchemaContainsAll(t *testing.T, schema []string, expected ...string) {
t.Helper()
present := make(map[string]bool, len(schema))
for _, name := range schema {
present[strings.ToLower(name)] = true
}
var missing []string
for _, name := range expected {
if !present[strings.ToLower(name)] {
missing = append(missing, name)
}
}
if len(missing) > 0 {
t.Fatalf("Dremio response schema %v missing expected columns %v", schema, missing)
}
}
// extractColumnNames pulls the first value of each row as a string. Used to
// turn a `SELECT some_name FROM ...` result into a flat slice.
func extractColumnNames(t *testing.T, rows [][]interface{}) []string {
t.Helper()
names := make([]string, 0, len(rows))
for _, row := range rows {
if len(row) == 0 {
t.Fatalf("Dremio result row is empty: %v", rows)
}
switch v := row[0].(type) {
case string:
names = append(names, v)
default:
names = append(names, fmt.Sprintf("%v", v))
}
}
return names
}
func equalStringSlices(got, want []string) bool {
if len(got) != len(want) {
return false
}
for i := range got {
if !strings.EqualFold(got[i], want[i]) {
return false
}
}
return true
}
func assertSingleNumericValue(t *testing.T, output string, expected float64) {
@@ -768,17 +952,34 @@ func requestIcebergOAuthToken(t *testing.T, env *TestEnvironment) string {
func createIcebergNamespace(t *testing.T, env *TestEnvironment, token, bucketName, namespace string) {
t.Helper()
doIcebergJSONRequest(t, env, token, http.MethodPost, fmt.Sprintf("/v1/%s/namespaces", url.PathEscape(bucketName)), map[string]any{
"namespace": []string{namespace},
}, http.StatusOK, http.StatusConflict)
createIcebergNamespaceLevels(t, env, token, bucketName, []string{namespace})
}
func createIcebergTable(t *testing.T, env *TestEnvironment, token, bucketName, namespace, tableName string) {
t.Helper()
createIcebergTableInLevels(t, env, token, bucketName, []string{namespace}, tableName)
}
// createIcebergNamespaceLevels creates a multi-level Iceberg namespace via the
// REST catalog. Single-element levels create a flat namespace; multi-element
// levels create the nested form (e.g. ["analytics", "daily"]).
func createIcebergNamespaceLevels(t *testing.T, env *TestEnvironment, token, bucketName string, levels []string) {
t.Helper()
doIcebergJSONRequest(t, env, token, http.MethodPost, fmt.Sprintf("/v1/%s/namespaces", url.PathEscape(bucketName)), map[string]any{
"namespace": levels,
}, http.StatusOK, http.StatusConflict)
}
// createIcebergTableInLevels creates a table in a (possibly multi-level)
// namespace. The namespace path component is encoded with the unit-separator
// (0x1F) convention used by SeaweedFS's Iceberg REST API.
func createIcebergTableInLevels(t *testing.T, env *TestEnvironment, token, bucketName string, levels []string, tableName string) {
t.Helper()
encodedNs := strings.Join(levels, "\x1F")
doIcebergJSONRequest(t, env, token, http.MethodPost,
fmt.Sprintf("/v1/%s/namespaces/%s/tables", url.PathEscape(bucketName), url.PathEscape(namespace)),
fmt.Sprintf("/v1/%s/namespaces/%s/tables", url.PathEscape(bucketName), url.PathEscape(encodedNs)),
map[string]any{
"name": tableName,
"schema": map[string]any{
@@ -792,6 +993,60 @@ func createIcebergTable(t *testing.T, env *TestEnvironment, token, bucketName, n
}, http.StatusOK)
}
const dremioWriterImage = "seaweedfs-dremio-writer"
// buildDremioWriterImage builds the local PyIceberg writer image. Layer
// caching makes repeat invocations cheap; the first build pulls
// python:3.11-slim and pip-installs pyiceberg+pyarrow (~1-2 min in CI).
func buildDremioWriterImage(t *testing.T) {
t.Helper()
wd, err := os.Getwd()
if err != nil {
t.Fatalf("Failed to get working directory: %v", err)
}
cmd := exec.Command("docker", "build",
"-t", dremioWriterImage,
"-f", filepath.Join(wd, "Dockerfile.writer"),
wd,
)
if out, err := cmd.CombinedOutput(); err != nil {
t.Fatalf("Failed to build %s image: %v\n%s", dremioWriterImage, err, out)
}
}
// writeIcebergRows runs the PyIceberg writer container, which loads the
// already-created table and appends three rows. The container reaches
// SeaweedFS via host.docker.internal (matching the Dremio container's path).
func writeIcebergRows(t *testing.T, env *TestEnvironment, bucketName string, namespace []string, tableName string) {
t.Helper()
args := []string{
"run", "--rm",
"--add-host", "host.docker.internal:host-gateway",
dremioWriterImage,
"--catalog-url", fmt.Sprintf("http://host.docker.internal:%d", env.icebergPort),
"--warehouse", "s3://" + bucketName,
"--prefix", bucketName,
"--s3-endpoint", fmt.Sprintf("http://host.docker.internal:%d", env.s3Port),
"--access-key", env.accessKey,
"--secret-key", env.secretKey,
"--region", "us-west-2",
"--table", tableName,
}
for _, level := range namespace {
args = append(args, "--namespace", level)
}
cmd := exec.Command("docker", args...)
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("PyIceberg writer failed: %v\n%s", err, out)
}
t.Logf("PyIceberg writer output: %s", strings.TrimSpace(string(out)))
}
func doIcebergJSONRequest(t *testing.T, env *TestEnvironment, token, method, path string, payload any, expectedStatuses ...int) string {
t.Helper()