From a769c938eccbca1ec50fb2b384c774cf0db18136 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 4 May 2026 21:14:22 -0700 Subject: [PATCH] test(s3tables): Unity Catalog OSS integration tests against SeaweedFS (#9308) * test(s3tables): add Unity Catalog OSS integration test against SeaweedFS Mirrors the configuration used by the upstream playground at data-engineering-helpers/mds-in-a-box/unitycatalog-playground. Three test variants under test/s3tables/unity_catalog: - TestUnityCatalogDeltaIntegration: aws.masterRoleArn empty / static keys; catalog/schema/EXTERNAL Delta CRUD + temporary-table-credentials S3 round-trip (the playground's working configuration). - TestUnityCatalogMasterRoleIntegration: aws.masterRoleArn set to a SeaweedFS-side role with a permissive trust policy; UC's StsClient is pinned at SeaweedFS via AWS_ENDPOINT_URL_STS, and the test asserts the vended creds carry a session_token and a non-static access key, proving the role-vended path the playground notes as not-yet-working actually does work today. - TestUnityCatalogDeltaRsRoundTrip: writes/reads a real Delta table at the registered storage_location using delta-rs in a slim Python container, with temporary credentials fetched from UC. All three self-skip without Docker or a weed binary, matching the sibling lakekeeper / polaris tests. * test(s3tables): tighten Unity Catalog tests against actual UC OSS behavior After running the suite locally, ground the assertions in what the upstream UC OSS Docker image actually does against SeaweedFS today. - Static-key playground configuration (TestUnityCatalogDeltaIntegration): catalog/schema/EXTERNAL Delta CRUD pass against the SeaweedFS-backed warehouse. The temporary-table- credentials subtest is renamed and inverted to assert the failure mode the playground reports -- UC's AwsCredentialVendor falls through to an internal StsClient.assumeRole when masterRoleArn and sessionToken are both empty, which has no real STS to talk to. Bucket path is also fixed to match UC's getStorageBase() lookup (s3://lakehouse vs the playground's s3://lakehouse/warehouse, which the upstream code never matches). - Master-role variant (TestUnityCatalogMasterRoleIntegration): split into two passing slices. Slice 1 proves SeaweedFS' STS endpoint vending UnityCatalogVendedRole works via the Go AWS SDK and the vended creds round-trip on S3. Slice 2 boots UC with aws.masterRoleArn set and verifies catalog/schema/Delta CRUD. The third hop -- UC's Java StsClient actually reaching SeaweedFS' STS handler during /temporary-table-credentials -- is logged but not asserted, since the AWS Java SDK's STS request currently lands on a SeaweedFS S3 path rather than the STS handler. - Delta-RS round-trip (TestUnityCatalogDeltaRsRoundTrip): gated on UC_DELTA_RS_RUN=1 since it depends on the master-role STS handoff above. The Dockerfile / writer script stay in tree so the test runs end-to-end the moment that hop is fixed. README rewritten to be explicit about what each test validates today and what is still pending. Result: `go test -run TestUnityCatalog ./test/s3tables/unity_catalog/...` passes cleanly with weed + Docker available, and self-skips otherwise. * test(s3tables): exercise unity catalog integrations * ci: run Unity Catalog integration tests on PRs Adds a unity-catalog-integration-tests job to s3-tables-tests.yml, modeled on the existing lakekeeper / dremio jobs. Pre-pulls the UC image and python:3.11-slim (used by the delta-rs writer container) and runs `go test ./test/s3tables/unity_catalog`. Format-check and go-vet jobs already recurse into ./test/s3tables/... so the new package is covered there too. * test/ci: address PR review Tighten the UC readiness probe to require 200, not <500, so a 401/403/404 during startup surfaces immediately instead of being treated as ready (CodeRabbit). Pin the UC image to v0.4.0 in both the workflow and the test default, matching the pinned-tag convention the rest of s3-tables-tests.yml uses (CodeRabbit). Use UC_IMAGE=unitycatalog/unitycatalog:main to re-test against current upstream. * docs: separate UC static-key vs master-role failure modes The README mixed the two together. Static-key empty-sessionToken short-circuits with "S3 bucket configuration not found." before UC even fires an STS call; the AccessDenied I described is what happens in the master-role variant where UC's Java StsClient actually reaches SeaweedFS. Cross-link the playground PR that fixes the static-key vending side. Also drop the "what most playground users actually run" hand-wave under MANAGED tables. * docs: trim README Drop the playground cross-reference and the "two layers fail independently" framing. * docs: pin down what's actually pending Investigated the master-role STS handoff with a sniffer in front of SeaweedFS' STS port. UC's StsClient is constructed without an endpointOverride and never reads aws.endpoint or AWS_ENDPOINT_URL_STS; verified by pointing AWS_ENDPOINT_URL_STS at port 1 and seeing the same real-AWS InvalidClientTokenId 403 with zero traffic to SeaweedFS. The fix is upstream in UC. Updated the README and the master-role test's t.Logf to say so precisely, and dropped the stale "Spark client" bullet (delta-rs covers that path). * test(s3tables): use BaseEndpoint instead of deprecated resolver EndpointResolverWithOptions is deprecated in aws-sdk-go-v2; the supported way to override a service endpoint is via the per-service Options.BaseEndpoint. Switch the assume-role helper to that pattern so the test stops compiling against deprecated API and the resolver boilerplate disappears. Addresses gemini review on PR #9308. * test(s3tables): drop unused splitS3URI helper Helper had no callers; gemini caught it on PR #9308. Easy to bring back from git history if needed. * test(s3tables): extract last token of docker run output as container ID docker run -d may prefix the container ID with image-pull progress when the image isn't cached locally. strings.TrimSpace on the whole output then gave a multi-line string, not the ID. Take the last whitespace-separated token so the ID survives a fresh CI runner. Addresses gemini review on PR #9308. * test(s3tables): cap Unity Catalog response body reads at 10 MiB io.ReadAll without a limit could OOM the test runner if the UC container hands back an unexpectedly large body. 10 MiB is well above any well-formed catalog response and turns a misbehaving server into a test failure instead of a runner crash. Addresses gemini review on PR #9308. * docs: link UC fix PR and call out UC's mocked-Sts test pattern UC's own credential-vending tests substitute StsClient with an in-process EchoAwsStsClient (BaseCRUDTestWithMockCredentials) or Mockito.mockStatic (CloudCredentialVendorTest), so the wire path between UC's Java SDK and a real STS server is untested -- which is why the missing endpointOverride slipped through upstream. Linked the upstream fix at unitycatalog/unitycatalog#1532. --- .github/workflows/s3-tables-tests.yml | 71 ++++ .../unity_catalog/Dockerfile.delta-rs | 11 + test/s3tables/unity_catalog/README.md | 104 +++++ test/s3tables/unity_catalog/client_test.go | 184 +++++++++ test/s3tables/unity_catalog/delta_rs_test.go | 125 ++++++ .../s3tables/unity_catalog/delta_rs_writer.py | 137 +++++++ .../unity_catalog/master_role_test.go | 201 ++++++++++ test/s3tables/unity_catalog/setup_test.go | 379 ++++++++++++++++++ .../unity_catalog/unity_catalog_test.go | 183 +++++++++ 9 files changed, 1395 insertions(+) create mode 100644 test/s3tables/unity_catalog/Dockerfile.delta-rs create mode 100644 test/s3tables/unity_catalog/README.md create mode 100644 test/s3tables/unity_catalog/client_test.go create mode 100644 test/s3tables/unity_catalog/delta_rs_test.go create mode 100644 test/s3tables/unity_catalog/delta_rs_writer.py create mode 100644 test/s3tables/unity_catalog/master_role_test.go create mode 100644 test/s3tables/unity_catalog/setup_test.go create mode 100644 test/s3tables/unity_catalog/unity_catalog_test.go diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 2ea6f4b59..3bd245427 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -662,6 +662,77 @@ jobs: path: test/s3tables/lakekeeper/test-output.log retention-days: 3 + unity-catalog-integration-tests: + name: Unity Catalog Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Set up Docker + uses: docker/setup-buildx-action@v4 + + - name: Pre-pull Unity Catalog image + run: docker pull unitycatalog/unitycatalog:v0.4.0 + + - name: Pre-pull Python image for delta-rs writer + run: docker pull python:3.11-slim + + - name: Run go mod tidy + run: go mod tidy + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Run Unity Catalog Integration Tests + timeout-minutes: 25 + working-directory: test/s3tables/unity_catalog + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + docker info + echo "=== Starting Unity Catalog Tests ===" + + go test -v -timeout 20m . 2>&1 | tee test-output.log || { + echo "Unity Catalog integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3tables/unity_catalog + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker|unitycatalog)" || true + echo "=== Unity Catalog containers ===" + docker ps -a --filter "name=seaweed-unity-catalog" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v7 + with: + name: unity-catalog-integration-test-logs + path: test/s3tables/unity_catalog/test-output.log + retention-days: 3 + s3-tables-build-verification: name: S3 Tables Build Verification runs-on: ubuntu-22.04 diff --git a/test/s3tables/unity_catalog/Dockerfile.delta-rs b/test/s3tables/unity_catalog/Dockerfile.delta-rs new file mode 100644 index 000000000..18b7b3013 --- /dev/null +++ b/test/s3tables/unity_catalog/Dockerfile.delta-rs @@ -0,0 +1,11 @@ +FROM python:3.11-slim + +RUN pip install --no-cache-dir \ + "deltalake==0.21.0" \ + "pyarrow==17.0.0" \ + "requests==2.32.3" + +WORKDIR /app +COPY delta_rs_writer.py /app/delta_rs_writer.py + +ENTRYPOINT ["python3", "/app/delta_rs_writer.py"] diff --git a/test/s3tables/unity_catalog/README.md b/test/s3tables/unity_catalog/README.md new file mode 100644 index 000000000..d9510cbea --- /dev/null +++ b/test/s3tables/unity_catalog/README.md @@ -0,0 +1,104 @@ +# Unity Catalog OSS integration tests + +These tests run Unity Catalog OSS in Docker against an embedded SeaweedFS +S3 endpoint. The `server.properties` mirrors the upstream playground at +[`mds-in-a-box/unitycatalog-playground`](https://github.com/data-engineering-helpers/mds-in-a-box/tree/main/unitycatalog-playground). + +| Test | Variant | Status | +| --- | --- | --- | +| `TestUnityCatalogDeltaIntegration` | static keys, `aws.masterRoleArn=` empty | passes; covers catalog/schema/EXTERNAL Delta CRUD against SeaweedFS-backed warehouse and asserts that UC's `/temporary-table-credentials` *cannot* vend usable creds with this configuration -- exactly the gap the playground reports. | +| `TestUnityCatalogMasterRoleIntegration` | `aws.masterRoleArn=arn:aws:iam::000000000000:role/UnityCatalogVendedRole` | passes; proves SeaweedFS' STS endpoint accepts `sts:AssumeRole` for the role UC would use (Go SDK round-trip), and that UC starts and accepts CRUD when wired with the master-role config. UC's own StsClient still talks to real AWS regardless of `aws.endpoint` / `AWS_ENDPOINT_URL_STS` (UC bug, see below); that hop is logged via `t.Logf` rather than asserted. | +| `TestUnityCatalogDeltaRsRoundTrip` | static keys + `delta-rs` Python client | passes; resolves table metadata through UC and writes/reads a real Delta table at the registered `storage_location` using `python:3.11-slim + deltalake` with the SeaweedFS test credentials. | + +## Prerequisites + +- Docker available locally (the tests call `docker run` / `docker build` directly). +- A `weed` binary at the repo root (`weed/weed`) or on `$PATH`. + +## Run + +```bash +go test -timeout 15m \ + -run 'TestUnityCatalog' \ + ./test/s3tables/unity_catalog/... +``` + +Pin a specific Unity Catalog image (defaults to +`unitycatalog/unitycatalog:v0.4.0`): + +```bash +UC_IMAGE=unitycatalog/unitycatalog:main \ + go test -timeout 15m -run TestUnityCatalogDeltaIntegration \ + ./test/s3tables/unity_catalog/... +``` + +The tests self-skip when Docker is unavailable or no `weed` binary is on +the path; running under `-short` also skips them. + +## Why the static-key path can't vend usable creds + +UC OSS' `AwsCredentialVendor.createPerBucketCredentialGenerator`: + +```java +if (config.getSessionToken() != null && !config.getSessionToken().isEmpty()) { + return new AwsCredentialGenerator.StaticAwsCredentialGenerator(config); +} +return createStsCredentialGenerator(config); +``` + +With `aws.masterRoleArn=` empty and `s3.sessionToken.0=` empty (this +test's configuration), `/temporary-table-credentials` short-circuits with +`"S3 bucket configuration not found."` before UC fires any STS call. +Setting a stub `s3.sessionToken.0` switches UC to +`StaticAwsCredentialGenerator` and the endpoint returns the static keys, +but the response carries that stub session token -- SeaweedFS won't +recognize it on the next S3 call, so the vended creds aren't usable for +table I/O. Clients have to fall back to the static keys directly. + +With `aws.masterRoleArn` set, UC's `AwsCredentialGenerator.StsAwsCredentialGenerator` +builds the StsClient with only `.region(...)` and `.credentialsProvider(...)` -- +no `.endpointOverride()`. The SDK's generic env-var resolution doesn't kick in +for that builder shape, so even with `AWS_ENDPOINT_URL_STS=...` (or the +matching `aws.endpointUrlSts` Java property, or the catch-all +`AWS_ENDPOINT_URL=...`) the StsClient still targets real AWS and gets back +`InvalidClientTokenId`. Verified by pointing the env var at port 1: UC reports +the same AWS-issued 403 that it reports against SeaweedFS, and a sniffer in +front of SeaweedFS' STS port records zero traffic. SeaweedFS' STS handler +itself works -- the Go SDK round-trip in `assumeRoleViaSeaweedFS` proves that +against the same SeaweedFS instance. + +UC's own AWS credential-vending tests don't catch this because they mock +`StsClient` away entirely -- `BaseCRUDTestWithMockCredentials` injects a +custom `stsClientBuilderSupplier` returning an `EchoAwsStsClient` that +synthesizes credentials in-process, and `CloudCredentialVendorTest` uses +`Mockito.mockStatic(StsClient.class)`. No test ever exercises the wire +path between UC's Java SDK and a real STS endpoint, so the missing +`endpointOverride` slipped through. + +Fix is upstream in +[unitycatalog/unitycatalog#1532](https://github.com/unitycatalog/unitycatalog/pull/1532), +which adds an `aws.endpoint` property and applies it to both the StsClient +and the S3Client builders. Until that lands, the master-role test logs +the failure but does not assert it. + +## What the tests actually validate today + +- Unity Catalog accepts a SeaweedFS-backed `server.properties` and starts. +- Catalog / schema / EXTERNAL Delta table CRUD all work against the + SeaweedFS warehouse via the UC REST API. +- SeaweedFS' STS endpoint correctly issues `sts:AssumeRole` credentials + for the `UnityCatalogVendedRole` and those credentials are accepted on + S3 round-trips (Go AWS SDK). +- Delta-RS resolves a UC table's `storage_location` and can write/read Delta + data through the SeaweedFS S3 endpoint with the test credentials. + +## What is still pending + +Nothing on the SeaweedFS side. The remaining gap (UC's StsClient ignoring +endpoint config) needs a UC OSS patch upstream. + +## MANAGED tables + +Not exercised. UC OSS gates them behind `server.managed-table.enabled=true` +and a two-step staging flow (`POST /staging-tables` then `POST /tables`); +EXTERNAL Delta is the simpler path and what these tests cover. diff --git a/test/s3tables/unity_catalog/client_test.go b/test/s3tables/unity_catalog/client_test.go new file mode 100644 index 000000000..26075cb82 --- /dev/null +++ b/test/s3tables/unity_catalog/client_test.go @@ -0,0 +1,184 @@ +package unity_catalog + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +type ucClient struct { + base string + http *http.Client +} + +func newUCClient(base string) *ucClient { + return &ucClient{base: base + ucAPIBase, http: &http.Client{Timeout: 30 * time.Second}} +} + +type ucCreateCatalog struct { + Name string `json:"name"` + Comment string `json:"comment,omitempty"` + StorageRoot string `json:"storage_root,omitempty"` +} + +type ucCatalogInfo struct { + Name string `json:"name"` + StorageRoot string `json:"storage_root"` + ID string `json:"id"` +} + +type ucCreateSchema struct { + Name string `json:"name"` + CatalogName string `json:"catalog_name"` +} + +type ucSchemaInfo struct { + Name string `json:"name"` + CatalogName string `json:"catalog_name"` + FullName string `json:"full_name"` + SchemaID string `json:"schema_id"` +} + +type ucColumn struct { + Name string `json:"name"` + TypeText string `json:"type_text"` + TypeJSON string `json:"type_json,omitempty"` + TypeName string `json:"type_name"` + Position int `json:"position"` + Nullable bool `json:"nullable"` +} + +type ucCreateTable struct { + Name string `json:"name"` + CatalogName string `json:"catalog_name"` + SchemaName string `json:"schema_name"` + TableType string `json:"table_type"` + DataSourceFormat string `json:"data_source_format"` + Columns []ucColumn `json:"columns"` + StorageLocation string `json:"storage_location,omitempty"` +} + +type ucTableInfo struct { + Name string `json:"name"` + CatalogName string `json:"catalog_name"` + SchemaName string `json:"schema_name"` + TableType string `json:"table_type"` + DataSourceFormat string `json:"data_source_format"` + Columns []ucColumn `json:"columns"` + StorageLocation string `json:"storage_location"` + TableID string `json:"table_id"` +} + +type ucListTablesResp struct { + Tables []ucTableInfo `json:"tables"` +} + +type ucAwsCreds struct { + AccessKeyID string `json:"access_key_id"` + SecretAccessKey string `json:"secret_access_key"` + SessionToken string `json:"session_token"` +} + +type ucTempCreds struct { + AwsTempCredentials *ucAwsCreds `json:"aws_temp_credentials"` + ExpirationTime int64 `json:"expiration_time"` +} + +func (c *ucClient) createCatalog(ctx context.Context, in ucCreateCatalog) (*ucCatalogInfo, error) { + var out ucCatalogInfo + return &out, c.do(ctx, http.MethodPost, "/catalogs", in, &out) +} + +func (c *ucClient) deleteCatalog(ctx context.Context, name string) error { + q := url.Values{"force": []string{"true"}} + return c.do(ctx, http.MethodDelete, "/catalogs/"+url.PathEscape(name)+"?"+q.Encode(), nil, nil) +} + +func (c *ucClient) createSchema(ctx context.Context, in ucCreateSchema) (*ucSchemaInfo, error) { + var out ucSchemaInfo + return &out, c.do(ctx, http.MethodPost, "/schemas", in, &out) +} + +func (c *ucClient) deleteSchema(ctx context.Context, fullName string) error { + q := url.Values{"force": []string{"true"}} + return c.do(ctx, http.MethodDelete, "/schemas/"+url.PathEscape(fullName)+"?"+q.Encode(), nil, nil) +} + +func (c *ucClient) createTable(ctx context.Context, in ucCreateTable) (*ucTableInfo, error) { + var out ucTableInfo + return &out, c.do(ctx, http.MethodPost, "/tables", in, &out) +} + +func (c *ucClient) getTable(ctx context.Context, fullName string) (*ucTableInfo, error) { + var out ucTableInfo + return &out, c.do(ctx, http.MethodGet, "/tables/"+url.PathEscape(fullName), nil, &out) +} + +func (c *ucClient) listTables(ctx context.Context, catalog, schema string) ([]ucTableInfo, error) { + var out ucListTablesResp + q := url.Values{ + "catalog_name": []string{catalog}, + "schema_name": []string{schema}, + } + path := "/tables?" + q.Encode() + if err := c.do(ctx, http.MethodGet, path, nil, &out); err != nil { + return nil, err + } + return out.Tables, nil +} + +func (c *ucClient) deleteTable(ctx context.Context, fullName string) error { + return c.do(ctx, http.MethodDelete, "/tables/"+url.PathEscape(fullName), nil, nil) +} + +func (c *ucClient) generateTemporaryTableCredentials(ctx context.Context, tableID, op string) (*ucTempCreds, error) { + body := map[string]string{"table_id": tableID, "operation": op} + var out ucTempCreds + return &out, c.do(ctx, http.MethodPost, "/temporary-table-credentials", body, &out) +} + +func (c *ucClient) do(ctx context.Context, method, path string, in any, out any) error { + var body io.Reader + if in != nil { + buf, err := json.Marshal(in) + if err != nil { + return fmt.Errorf("marshal %s %s: %w", method, path, err) + } + body = bytes.NewReader(buf) + } + req, err := http.NewRequestWithContext(ctx, method, c.base+path, body) + if err != nil { + return err + } + if in != nil { + req.Header.Set("Content-Type", "application/json") + } + req.Header.Set("Accept", "application/json") + resp, err := c.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + // 10 MiB cap is well above any reasonable Unity Catalog response and + // keeps a runaway server from OOMing the test runner. + respBytes, err := io.ReadAll(io.LimitReader(resp.Body, 10*1024*1024)) + if err != nil { + return fmt.Errorf("read %s %s: %w", method, path, err) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("%s %s: status %d: %s", method, path, resp.StatusCode, strings.TrimSpace(string(respBytes))) + } + if out == nil || len(respBytes) == 0 { + return nil + } + if err := json.Unmarshal(respBytes, out); err != nil { + return fmt.Errorf("decode %s %s: %w (body=%s)", method, path, err, string(respBytes)) + } + return nil +} diff --git a/test/s3tables/unity_catalog/delta_rs_test.go b/test/s3tables/unity_catalog/delta_rs_test.go new file mode 100644 index 000000000..66335a6de --- /dev/null +++ b/test/s3tables/unity_catalog/delta_rs_test.go @@ -0,0 +1,125 @@ +package unity_catalog + +import ( + "context" + "fmt" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + + "github.com/seaweedfs/seaweedfs/test/testutil" +) + +// TestUnityCatalogDeltaRsRoundTrip writes and reads a real Delta table at +// the registered storage_location using the delta-rs Python library inside +// a Docker container. The script resolves table metadata from UC and uses the +// test SeaweedFS credentials as delta-rs storage_options. +func TestUnityCatalogDeltaRsRoundTrip(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in -short mode") + } + if !testutil.HasDocker() { + t.Skip("docker not available") + } + + env := newTestEnv(t) + defer env.cleanup(t) + + t.Log(">>> starting SeaweedFS for delta-rs test...") + env.startSeaweedFS(t, "") + t.Log(">>> SeaweedFS ready") + + ctx, cancel := context.WithTimeout(context.Background(), 12*time.Minute) + defer cancel() + + s3c := env.newHostS3Client(t, ctx) + if _, err := s3c.CreateBucket(ctx, &s3.CreateBucketInput{Bucket: aws.String(ucWarehouse)}); err != nil { + t.Fatalf("create warehouse bucket: %v", err) + } + + t.Log(">>> starting Unity Catalog server...") + env.startUnityCatalog(t, ctx, ucServerOpts{}) + t.Log(">>> Unity Catalog ready") + + uc := newUCClient(fmt.Sprintf("http://127.0.0.1:%d", env.ucHostPort)) + + suffix := time.Now().UnixNano() + catalogName := fmt.Sprintf("seaweed_uc_delta_rs_%d", suffix) + schemaName := fmt.Sprintf("ns_%d", suffix) + tableName := fmt.Sprintf("delta_rs_%d", suffix) + tableLocation := fmt.Sprintf("s3://%s/%s/%s/%s", ucWarehouse, ucWarehouseKey, schemaName, tableName) + + defer func() { + _ = uc.deleteTable(context.Background(), catalogName+"."+schemaName+"."+tableName) + _ = uc.deleteSchema(context.Background(), catalogName+"."+schemaName) + _ = uc.deleteCatalog(context.Background(), catalogName) + }() + + if _, err := uc.createCatalog(ctx, ucCreateCatalog{ + Name: catalogName, + StorageRoot: fmt.Sprintf("s3://%s/%s", ucWarehouse, ucWarehouseKey), + }); err != nil { + t.Fatalf("create catalog: %v", err) + } + if _, err := uc.createSchema(ctx, ucCreateSchema{Name: schemaName, CatalogName: catalogName}); err != nil { + t.Fatalf("create schema: %v", err) + } + created, err := uc.createTable(ctx, ucCreateTable{ + Name: tableName, + CatalogName: catalogName, + SchemaName: schemaName, + TableType: "EXTERNAL", + DataSourceFormat: "DELTA", + Columns: []ucColumn{ + {Name: "id", TypeText: "long", TypeName: "LONG", TypeJSON: `{"name":"id","type":"long","nullable":true,"metadata":{}}`, Position: 0, Nullable: true}, + {Name: "value", TypeText: "string", TypeName: "STRING", TypeJSON: `{"name":"value","type":"string","nullable":true,"metadata":{}}`, Position: 1, Nullable: true}, + }, + StorageLocation: tableLocation, + }) + if err != nil { + t.Fatalf("create table: %v", err) + } + + testDir, err := filepath.Abs(".") + if err != nil { + t.Fatalf("abs: %v", err) + } + imageTag := fmt.Sprintf("seaweed-uc-delta-rs:%d", time.Now().UnixNano()) + + build := exec.CommandContext(ctx, "docker", "build", "-t", imageTag, "-f", "Dockerfile.delta-rs", ".") + build.Dir = testDir + if out, err := build.CombinedOutput(); err != nil { + t.Fatalf("docker build delta-rs image: %v\n%s", err, out) + } + defer func() { _ = exec.Command("docker", "rmi", "-f", imageTag).Run() }() + + ucURLForContainer := fmt.Sprintf("http://host.docker.internal:%d", env.ucHostPort) + s3EndpointForContainer := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) + + cmd := exec.CommandContext(ctx, "docker", "run", "--rm", + "--add-host", "host.docker.internal:host-gateway", + "-e", "UC_URL="+ucURLForContainer, + "-e", "S3_ENDPOINT="+s3EndpointForContainer, + "-e", "UC_CATALOG="+catalogName, + "-e", "UC_SCHEMA="+schemaName, + "-e", "UC_TABLE="+tableName, + "-e", "UC_TABLE_ID="+created.TableID, + "-e", "AWS_ACCESS_KEY_ID="+env.accessKey, + "-e", "AWS_SECRET_ACCESS_KEY="+env.secretKey, + imageTag, + ) + out, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("delta-rs container failed: %v\noutput:\n%s", err, out) + } + got := string(out) + if !strings.Contains(got, "DELTA_RS_OK rows=3") { + t.Fatalf("expected DELTA_RS_OK rows=3 in output; got:\n%s", got) + } + t.Logf("delta-rs output: %s", strings.TrimSpace(got)) +} diff --git a/test/s3tables/unity_catalog/delta_rs_writer.py b/test/s3tables/unity_catalog/delta_rs_writer.py new file mode 100644 index 000000000..eded3d332 --- /dev/null +++ b/test/s3tables/unity_catalog/delta_rs_writer.py @@ -0,0 +1,137 @@ +"""Round-trip a Delta table through Unity Catalog using delta-rs. + +Reads connection settings from environment variables: + + UC_URL Unity Catalog REST base, e.g. http://host.docker.internal:8080 + S3_ENDPOINT SeaweedFS S3 endpoint reachable from the container + UC_CATALOG Catalog name to operate against + UC_SCHEMA Schema name + UC_TABLE Table name + UC_TABLE_ID Optional: pre-fetched table_id; falls back to GET /tables/{full_name} + AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY + Optional: direct S3 credentials. If omitted, the script + asks UC for temporary table credentials. + +Steps: + + 1. GET the table info from UC. + 2. Resolve S3 credentials from env or UC temporary-table-credentials. + 3. write_deltalake() a small pyarrow Table to storage_location. + 4. Read it back with DeltaTable() and assert row count. + +Prints `DELTA_RS_OK rows=` on success and exits 0; any failure exits 1. +""" + +from __future__ import annotations + +import os +import sys +import urllib.parse + +import pyarrow as pa +import requests +from deltalake import DeltaTable, write_deltalake + + +def must_env(name: str) -> str: + value = os.environ.get(name) + if not value: + print(f"ERROR: required env var {name} is not set", file=sys.stderr) + sys.exit(2) + return value + + +def main() -> int: + uc_url = must_env("UC_URL").rstrip("/") + s3_endpoint = must_env("S3_ENDPOINT") + catalog = must_env("UC_CATALOG") + schema = must_env("UC_SCHEMA") + table = must_env("UC_TABLE") + table_id = os.environ.get("UC_TABLE_ID", "").strip() + + full_name = f"{catalog}.{schema}.{table}" + api = f"{uc_url}/api/2.1/unity-catalog" + + encoded_full_name = urllib.parse.quote(full_name, safe="") + r = requests.get(f"{api}/tables/{encoded_full_name}", timeout=10) + r.raise_for_status() + info = r.json() + if not table_id: + table_id = info.get("table_id") or "" + storage_location = info.get("storage_location") or "" + + if not table_id: + print("ERROR: table_id is empty in UC response", file=sys.stderr) + return 1 + if not storage_location: + print("ERROR: storage_location is empty in UC response", file=sys.stderr) + return 1 + + access_key = os.environ.get("AWS_ACCESS_KEY_ID", "").strip() + secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY", "").strip() + session_token = os.environ.get("AWS_SESSION_TOKEN", "").strip() + if bool(access_key) != bool(secret_key): + print( + "ERROR: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY must be set together", + file=sys.stderr, + ) + return 1 + if not access_key: + creds_resp = requests.post( + f"{api}/temporary-table-credentials", + json={"table_id": table_id, "operation": "READ_WRITE"}, + timeout=15, + ) + creds_resp.raise_for_status() + creds = creds_resp.json().get("aws_temp_credentials") or {} + access_key = creds.get("access_key_id") or "" + secret_key = creds.get("secret_access_key") or "" + session_token = creds.get("session_token") or "" + if not access_key or not secret_key: + print( + "ERROR: missing aws_temp_credentials; " + f"response status {creds_resp.status_code}", + file=sys.stderr, + ) + return 1 + + storage_options = { + "AWS_ACCESS_KEY_ID": access_key, + "AWS_SECRET_ACCESS_KEY": secret_key, + "AWS_REGION": "us-east-1", + "AWS_ENDPOINT_URL": s3_endpoint, + "AWS_ALLOW_HTTP": "true", + # delta-rs requires this for non-S3 backends that lack atomic rename. + "AWS_S3_ALLOW_UNSAFE_RENAME": "true", + } + if session_token: + storage_options["AWS_SESSION_TOKEN"] = session_token + + arrow_table = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int64()), + "value": pa.array(["alpha", "beta", "gamma"], type=pa.string()), + } + ) + + write_deltalake( + storage_location, + arrow_table, + storage_options=storage_options, + mode="overwrite", + ) + + dt = DeltaTable(storage_location, storage_options=storage_options) + read_back = dt.to_pyarrow_table() + rows = read_back.num_rows + print(f"DELTA_RS_OK rows={rows} version={dt.version()}") + return 0 if rows == arrow_table.num_rows else 1 + + +if __name__ == "__main__": + exit_code = main() + sys.stdout.flush() + sys.stderr.flush() + # deltalake/pyarrow can abort during native teardown after a successful + # round-trip in the short-lived test container. Preserve main's result. + os._exit(exit_code) diff --git a/test/s3tables/unity_catalog/master_role_test.go b/test/s3tables/unity_catalog/master_role_test.go new file mode 100644 index 000000000..6f4a24db2 --- /dev/null +++ b/test/s3tables/unity_catalog/master_role_test.go @@ -0,0 +1,201 @@ +package unity_catalog + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sts" + + "github.com/seaweedfs/seaweedfs/test/testutil" +) + +// TestUnityCatalogMasterRoleIntegration covers the master-role configuration +// the upstream playground notes as not-yet-working +// (https://github.com/data-engineering-helpers/mds-in-a-box/blob/main/unitycatalog-playground/etc/conf/server.properties#L45). +// +// It verifies two things end-to-end: +// +// 1. SeaweedFS' STS endpoint accepts sts:AssumeRole for the +// UnityCatalogVendedRole, returning real STS-vended credentials. This +// mirrors what the lakekeeper / polaris tests already exercise via the Go +// AWS SDK and is the SeaweedFS-side prerequisite for UC's master-role +// flow. +// +// 2. Unity Catalog OSS starts and accepts catalog/schema/EXTERNAL Delta +// table CRUD when configured with `aws.masterRoleArn` set to that role +// and AWS_ENDPOINT_URL_STS pointed at SeaweedFS. +// +// What it does NOT verify is the third hop -- UC's Java StsClient +// reaching SeaweedFS' STS handler. UC's +// AwsCredentialGenerator.StsAwsCredentialGenerator builds the StsClient +// without an endpointOverride and doesn't propagate aws.endpoint to it, so +// the SDK's generic AWS_ENDPOINT_URL_STS resolution doesn't kick in and the +// request always targets real AWS, returning InvalidClientTokenId. Verified +// by pointing AWS_ENDPOINT_URL_STS at port 1: same 403 either way, and a +// sniffer in front of SeaweedFS records zero traffic. UC's own tests mock +// StsClient out (BaseCRUDTestWithMockCredentials' EchoAwsStsClient, +// Mockito.mockStatic in CloudCredentialVendorTest), so this never showed +// up upstream. Fix is in unitycatalog/unitycatalog#1532; that bit is +// logged via t.Logf and not asserted. +func TestUnityCatalogMasterRoleIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in -short mode") + } + if !testutil.HasDocker() { + t.Skip("docker not available") + } + + env := newTestEnv(t) + defer env.cleanup(t) + + t.Log(">>> starting SeaweedFS with STS-enabled IAM config...") + env.startSeaweedFS(t, stsEnabledIAMConfig(env.accessKey, env.secretKey)) + t.Log(">>> SeaweedFS ready") + + ctx, cancel := context.WithTimeout(context.Background(), 8*time.Minute) + defer cancel() + + s3c := env.newHostS3Client(t, ctx) + if _, err := s3c.CreateBucket(ctx, &s3.CreateBucketInput{Bucket: aws.String(ucWarehouse)}); err != nil { + t.Fatalf("create warehouse bucket: %v", err) + } + + // Slice 1: prove SeaweedFS STS works for the role UC would use. + t.Run("SeaweedFsAssumesUnityCatalogVendedRole", func(t *testing.T) { + stsEndpoint := fmt.Sprintf("http://127.0.0.1:%d", env.s3Port) + creds, err := assumeRoleViaSeaweedFS(ctx, stsEndpoint, env.accessKey, env.secretKey, ucVendedRoleArn) + if err != nil { + t.Fatalf("AssumeRole on SeaweedFS STS: %v", err) + } + if creds.SessionToken == "" { + t.Fatalf("expected non-empty session_token from STS, got %+v", creds) + } + if creds.AccessKeyID == env.accessKey { + t.Fatalf("STS returned the static admin access key; AssumeRole did not vend a new one") + } + + // Confirm the vended creds actually work for an S3 round-trip. + s3v := env.newHostS3ClientWithCreds(t, ctx, creds.AccessKeyID, creds.SecretAccessKey, creds.SessionToken) + probeKey := "ns-probe/master-role-probe.txt" + if _, err := s3v.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(ucWarehouse), + Key: aws.String(probeKey), + Body: nil, + }); err != nil { + t.Fatalf("PutObject with sts-vended creds: %v", err) + } + _, _ = s3v.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(ucWarehouse), + Key: aws.String(probeKey), + }) + }) + + // Slice 2: UC starts with master-role config and CRUD works. + stsEndpoint := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) + t.Log(">>> starting Unity Catalog server (master role)...") + env.startUnityCatalog(t, ctx, ucServerOpts{ + MasterRoleArn: ucVendedRoleArn, + ExtraEnv: map[string]string{ + "AWS_ENDPOINT_URL": stsEndpoint, + "AWS_ENDPOINT_URL_STS": stsEndpoint, + "AWS_ACCESS_KEY_ID": env.accessKey, + "AWS_SECRET_ACCESS_KEY": env.secretKey, + "AWS_REGION": "us-east-1", + }, + }) + t.Log(">>> Unity Catalog ready") + + uc := newUCClient(fmt.Sprintf("http://127.0.0.1:%d", env.ucHostPort)) + suffix := time.Now().UnixNano() + catalogName := fmt.Sprintf("seaweed_uc_role_%d", suffix) + schemaName := fmt.Sprintf("ns_%d", suffix) + tableName := fmt.Sprintf("events_%d", suffix) + tableLocation := fmt.Sprintf("s3://%s/%s/%s/%s", ucWarehouse, ucWarehouseKey, schemaName, tableName) + + defer func() { + _ = uc.deleteTable(context.Background(), catalogName+"."+schemaName+"."+tableName) + _ = uc.deleteSchema(context.Background(), catalogName+"."+schemaName) + _ = uc.deleteCatalog(context.Background(), catalogName) + }() + + t.Run("UnityCatalogAcceptsMasterRoleConfig", func(t *testing.T) { + if _, err := uc.createCatalog(ctx, ucCreateCatalog{ + Name: catalogName, + StorageRoot: fmt.Sprintf("s3://%s/%s", ucWarehouse, ucWarehouseKey), + }); err != nil { + t.Fatalf("create catalog: %v", err) + } + if _, err := uc.createSchema(ctx, ucCreateSchema{Name: schemaName, CatalogName: catalogName}); err != nil { + t.Fatalf("create schema: %v", err) + } + created, err := uc.createTable(ctx, ucCreateTable{ + Name: tableName, + CatalogName: catalogName, + SchemaName: schemaName, + TableType: "EXTERNAL", + DataSourceFormat: "DELTA", + Columns: []ucColumn{ + {Name: "id", TypeText: "long", TypeName: "LONG", TypeJSON: `{"name":"id","type":"long","nullable":true,"metadata":{}}`, Position: 0, Nullable: true}, + }, + StorageLocation: tableLocation, + }) + if err != nil { + t.Fatalf("create external delta table: %v", err) + } + if created.TableID == "" { + t.Fatalf("expected table_id in CreateTable response, got %+v", created) + } + + // The actual UC -> SeaweedFS STS handoff is the still-unsolved slice. + // Log what happens, but don't fail the test on it: SeaweedFS' STS is + // known-good (slice 1 above), and UC's CRUD is known-good (this slice). + _, ucCredErr := uc.generateTemporaryTableCredentials(ctx, created.TableID, "READ_WRITE") + if ucCredErr != nil { + t.Logf("UC StsClient still hits real AWS (UC bug, fix in unitycatalog/unitycatalog#1532): %v", ucCredErr) + } else { + t.Logf("UC StsClient honors endpoint override; unitycatalog/unitycatalog#1532 (or equivalent) is in this image.") + } + }) +} + +// assumeRoleViaSeaweedFS calls sts:AssumeRole against SeaweedFS' STS endpoint +// using the Go AWS SDK, mirroring how lakekeeper/polaris tests prove +// SeaweedFS' STS path works. +func assumeRoleViaSeaweedFS(ctx context.Context, endpoint, accessKey, secretKey, roleArn string) (aws.Credentials, error) { + cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion("us-east-1"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")), + ) + if err != nil { + return aws.Credentials{}, fmt.Errorf("load aws config: %w", err) + } + + // Override the STS endpoint via service-specific BaseEndpoint instead of + // the deprecated EndpointResolverWithOptions wired through the global + // config. This is the supported path in aws-sdk-go-v2 since v1.21. + client := sts.NewFromConfig(cfg, func(o *sts.Options) { + o.BaseEndpoint = aws.String(endpoint) + }) + resp, err := client.AssumeRole(ctx, &sts.AssumeRoleInput{ + RoleArn: aws.String(roleArn), + RoleSessionName: aws.String("uc-master-role-test"), + }) + if err != nil { + return aws.Credentials{}, err + } + if resp.Credentials == nil { + return aws.Credentials{}, fmt.Errorf("AssumeRole returned no credentials") + } + return aws.Credentials{ + AccessKeyID: aws.ToString(resp.Credentials.AccessKeyId), + SecretAccessKey: aws.ToString(resp.Credentials.SecretAccessKey), + SessionToken: aws.ToString(resp.Credentials.SessionToken), + Source: "seaweedfs-sts", + }, nil +} diff --git a/test/s3tables/unity_catalog/setup_test.go b/test/s3tables/unity_catalog/setup_test.go new file mode 100644 index 000000000..f3351c441 --- /dev/null +++ b/test/s3tables/unity_catalog/setup_test.go @@ -0,0 +1,379 @@ +package unity_catalog + +import ( + "context" + "fmt" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + + "github.com/seaweedfs/seaweedfs/test/testutil" +) + +const ( + ucImageDefault = "unitycatalog/unitycatalog:v0.4.0" + ucContainerCfg = "/home/unitycatalog/etc/conf/server.properties" + ucStartupGrace = 90 * time.Second + ucAPIBase = "/api/2.1/unity-catalog" + ucWarehouse = "lakehouse" + ucWarehouseKey = "warehouse" + + // Role used by the master-role STS-vended variant of the test. The trust + // policy is wide open so any caller can assume it; in production UC + // would set this ARN as `aws.masterRoleArn`. + ucVendedRoleArn = "arn:aws:iam::000000000000:role/UnityCatalogVendedRole" + ucVendedRoleName = "UnityCatalogVendedRole" +) + +type testEnv struct { + seaweedDir string + weedBinary string + dataDir string + bindIP string + s3Port int + s3GrpcPort int + masterPort int + masterGrpcPort int + filerPort int + filerGrpcPort int + volumePort int + volumeGrpcPort int + weedProcess *exec.Cmd + weedCancel context.CancelFunc + + accessKey string + secretKey string + + ucImage string + ucContainerID string + ucHostPort int +} + +// ucServerOpts customizes the Unity Catalog server.properties and runtime +// environment. Defaults match the upstream playground (static keys, no master +// role). +type ucServerOpts struct { + // MasterRoleArn populates aws.masterRoleArn. Empty means UC falls back to + // static aws.accessKey / aws.secretKey for storage operations. + MasterRoleArn string + // ExtraEnv adds environment variables to the UC container, useful for + // AWS_ENDPOINT_URL_STS-style overrides. + ExtraEnv map[string]string +} + +func newTestEnv(t *testing.T) *testEnv { + t.Helper() + + wd, err := os.Getwd() + if err != nil { + t.Fatalf("getwd: %v", err) + } + seaweedDir := wd + for i := 0; i < 8; i++ { + if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { + break + } + seaweedDir = filepath.Dir(seaweedDir) + } + + weedBinary := filepath.Join(seaweedDir, "weed", "weed") + if _, err := os.Stat(weedBinary); err != nil { + weedBinary = "weed" + if _, err := exec.LookPath(weedBinary); err != nil { + t.Skip("weed binary not found, skipping integration test") + } + } + + dataDir, err := os.MkdirTemp("", "seaweed-uc-test-*") + if err != nil { + t.Fatalf("mkdtemp: %v", err) + } + + bindIP := testutil.FindBindIP() + ports := testutil.MustAllocatePorts(t, 9) + masterPort, masterGrpcPort := ports[0], ports[1] + volumePort, volumeGrpcPort := ports[2], ports[3] + filerPort, filerGrpcPort := ports[4], ports[5] + s3Port, s3GrpcPort := ports[6], ports[7] + ucHostPort := ports[8] + + image := os.Getenv("UC_IMAGE") + if image == "" { + image = ucImageDefault + } + + return &testEnv{ + seaweedDir: seaweedDir, + weedBinary: weedBinary, + dataDir: dataDir, + bindIP: bindIP, + s3Port: s3Port, + s3GrpcPort: s3GrpcPort, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + filerPort: filerPort, + filerGrpcPort: filerGrpcPort, + volumePort: volumePort, + volumeGrpcPort: volumeGrpcPort, + accessKey: "admin", + secretKey: "admin", + ucImage: image, + ucHostPort: ucHostPort, + } +} + +// startSeaweedFS starts a `weed mini` instance. If iamJSON is empty, a minimal +// admin-only IAM config is used. When iamJSON is non-empty, it is passed to +// both -s3.config and -s3.iam.config so the STS handler is enabled (mirroring +// the lakekeeper test). +func (env *testEnv) startSeaweedFS(t *testing.T, iamJSON string) { + t.Helper() + + enableSTS := iamJSON != "" + if iamJSON == "" { + iamJSON = fmt.Sprintf(`{ + "identities": [ + { + "name": "admin", + "credentials": [{"accessKey": %q, "secretKey": %q}], + "actions": ["Admin", "Read", "List", "Tagging", "Write"] + } + ] +}`, env.accessKey, env.secretKey) + } + + iamConfigPath := filepath.Join(env.dataDir, "iam.json") + if err := os.WriteFile(iamConfigPath, []byte(iamJSON), 0644); err != nil { + t.Fatalf("write iam config: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + env.weedCancel = cancel + + weedArgs := []string{"-v", "4", "mini", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), + "-volume.port", fmt.Sprintf("%d", env.volumePort), + "-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort), + "-s3.config", iamConfigPath, + "-ip", env.bindIP, + "-ip.bind", "0.0.0.0", + "-dir", env.dataDir, + } + if enableSTS { + weedArgs = append(weedArgs, "-s3.iam.config", iamConfigPath, "-s3.iam.readOnly=false") + } + + cmd := exec.CommandContext(ctx, env.weedBinary, weedArgs...) + cmd.Dir = env.dataDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + t.Fatalf("start weed mini: %v", err) + } + env.weedProcess = cmd + + if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/status", env.s3Port), testutil.SeaweedMiniStartupTimeout) { + t.Fatalf("S3 API at 127.0.0.1:%d did not become ready", env.s3Port) + } +} + +// startUnityCatalog launches the Unity Catalog OSS server in Docker against +// the running SeaweedFS instance. It mirrors the upstream playground's +// server.properties layout and bind-mounts only that single file (matching +// docker-compose.yaml from the playground). +func (env *testEnv) startUnityCatalog(t *testing.T, ctx context.Context, opts ucServerOpts) { + t.Helper() + + s3EndpointForContainer := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) + + props := strings.Join([]string{ + "server.env=test", + "server.authorization=disable", + "server.cookie-timeout=PT1H", + "server.managed-table.enabled=false", + fmt.Sprintf("aws.masterRoleArn=%s", opts.MasterRoleArn), + fmt.Sprintf("aws.accessKey=%s", env.accessKey), + fmt.Sprintf("aws.secretKey=%s", env.secretKey), + "aws.region=us-east-1", + fmt.Sprintf("aws.endpoint=%s", s3EndpointForContainer), + // UC keys perBucketS3Configs by NormalizedURL.from(bucketPath) and + // looks up using the storageBase, which is "s3://" (scheme + + // authority only). The playground's "s3://lakehouse/warehouse" never + // matches because of that asymmetry; the bucket-only form works. + fmt.Sprintf("s3.bucketPath.0=s3://%s", ucWarehouse), + "s3.region.0=us-east-1", + fmt.Sprintf("s3.awsRoleArn.0=%s", opts.MasterRoleArn), + fmt.Sprintf("s3.accessKey.0=%s", env.accessKey), + fmt.Sprintf("s3.secretKey.0=%s", env.secretKey), + fmt.Sprintf("s3.endpoint.0=%s", s3EndpointForContainer), + "", + }, "\n") + + confDir := filepath.Join(env.dataDir, "uc-conf") + if err := os.MkdirAll(confDir, 0755); err != nil { + t.Fatalf("mkdir uc-conf: %v", err) + } + propsPath := filepath.Join(confDir, "server.properties") + if err := os.WriteFile(propsPath, []byte(props), 0644); err != nil { + t.Fatalf("write server.properties: %v", err) + } + + containerName := fmt.Sprintf("seaweed-unity-catalog-%d", time.Now().UnixNano()) + + args := []string{ + "run", "-d", "--rm", + "--name", containerName, + "--add-host", "host.docker.internal:host-gateway", + "-p", fmt.Sprintf("%d:8080", env.ucHostPort), + "-v", fmt.Sprintf("%s:%s:ro", propsPath, ucContainerCfg), + "-e", "JAVA_OPTS=-Xmx1g", + } + for k, v := range opts.ExtraEnv { + args = append(args, "-e", fmt.Sprintf("%s=%s", k, v)) + } + args = append(args, env.ucImage) + + out, err := exec.CommandContext(ctx, "docker", args...).CombinedOutput() + if err != nil { + t.Fatalf("docker run unity-catalog: %v\n%s", err, out) + } + // docker run -d prefixes pull progress / warnings before the final container + // ID line when the image isn't cached locally. Trim to the last whitespace- + // separated token so the ID survives a fresh CI runner. + fields := strings.Fields(string(out)) + if len(fields) == 0 { + t.Fatalf("docker run produced no output; combined: %q", string(out)) + } + env.ucContainerID = fields[len(fields)-1] + t.Logf("unity-catalog container id: %s", env.ucContainerID) + + probe := fmt.Sprintf("http://127.0.0.1:%d%s/catalogs", env.ucHostPort, ucAPIBase) + deadline := time.Now().Add(ucStartupGrace) + var lastErr error + for time.Now().Before(deadline) { + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, probe, nil) + resp, err := (&http.Client{Timeout: 3 * time.Second}).Do(req) + if err == nil { + resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return + } + lastErr = fmt.Errorf("status %d", resp.StatusCode) + } else { + lastErr = err + } + select { + case <-ctx.Done(): + t.Fatalf("ctx done while waiting for unity catalog: %v", ctx.Err()) + case <-time.After(time.Second): + } + } + logs, _ := exec.Command("docker", "logs", "--tail", "200", env.ucContainerID).CombinedOutput() + t.Fatalf("unity catalog did not become ready: %v\nrecent logs:\n%s", lastErr, logs) +} + +func (env *testEnv) cleanup(t *testing.T) { + if env.ucContainerID != "" { + if logs, err := exec.Command("docker", "logs", "--tail", "200", env.ucContainerID).CombinedOutput(); err == nil { + t.Logf("unity-catalog tail logs:\n%s", logs) + } + _ = exec.Command("docker", "rm", "-f", env.ucContainerID).Run() + } + if env.weedCancel != nil { + env.weedCancel() + } + if env.weedProcess != nil { + _ = env.weedProcess.Wait() + } + if env.dataDir != "" { + _ = os.RemoveAll(env.dataDir) + } +} + +// -- S3 client helpers --------------------------------------------------------- + +func (env *testEnv) newHostS3Client(t *testing.T, ctx context.Context) *s3.Client { + t.Helper() + return env.newHostS3ClientWithCreds(t, ctx, env.accessKey, env.secretKey, "") +} + +func (env *testEnv) newHostS3ClientWithCreds(t *testing.T, ctx context.Context, ak, sk, token string) *s3.Client { + t.Helper() + endpoint := fmt.Sprintf("http://127.0.0.1:%d", env.s3Port) + cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion("us-east-1"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(ak, sk, token)), + ) + if err != nil { + t.Fatalf("load aws config: %v", err) + } + return s3.NewFromConfig(cfg, func(o *s3.Options) { + o.BaseEndpoint = aws.String(endpoint) + o.UsePathStyle = true + }) +} + +// stsEnabledIAMConfig returns an iam.json that defines an admin user, the +// UnityCatalogVendedRole role with a permissive trust policy, and a FullAccess +// policy attached to it. This is the SeaweedFS-side counterpart of the +// `aws.masterRoleArn` configuration on the Unity Catalog server. +func stsEnabledIAMConfig(accessKey, secretKey string) string { + return fmt.Sprintf(`{ + "identities": [ + { + "name": "admin", + "credentials": [{"accessKey": %q, "secretKey": %q}], + "actions": ["Admin", "Read", "List", "Tagging", "Write"] + } + ], + "sts": { + "tokenDuration": "12h", + "maxSessionLength": "24h", + "issuer": "seaweedfs-sts", + "signingKey": "dGVzdC1zaWduaW5nLWtleS1mb3Itc3RzLWludGVncmF0aW9uLXRlc3Rz" + }, + "roles": [ + { + "roleName": %q, + "roleArn": %q, + "trustPolicy": { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": "*", + "Action": "sts:AssumeRole" + } + ] + }, + "attachedPolicies": ["FullAccess"] + } + ], + "policies": [ + { + "name": "FullAccess", + "document": { + "Version": "2012-10-17", + "Statement": [ + {"Effect": "Allow", "Action": "*", "Resource": "*"} + ] + } + } + ] +}`, accessKey, secretKey, ucVendedRoleName, ucVendedRoleArn) +} diff --git a/test/s3tables/unity_catalog/unity_catalog_test.go b/test/s3tables/unity_catalog/unity_catalog_test.go new file mode 100644 index 000000000..35b48be09 --- /dev/null +++ b/test/s3tables/unity_catalog/unity_catalog_test.go @@ -0,0 +1,183 @@ +// Package unity_catalog provides integration tests that run Unity Catalog +// OSS in Docker against a SeaweedFS S3 backend. The base test mirrors the +// configuration used by the upstream UC playground at +// https://github.com/data-engineering-helpers/mds-in-a-box/tree/main/unitycatalog-playground: +// static aws.accessKey/aws.secretKey, no master role, EXTERNAL Delta tables. +package unity_catalog + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + + "github.com/seaweedfs/seaweedfs/test/testutil" +) + +// TestUnityCatalogDeltaIntegration brings up SeaweedFS, runs Unity Catalog OSS +// in Docker against it, and exercises catalog/schema/table CRUD plus +// temporary-table-credentials and storage I/O via vended credentials. The UC +// server is configured with static keys (aws.masterRoleArn empty), matching +// the playground's working configuration when targeting SeaweedFS. +func TestUnityCatalogDeltaIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in -short mode") + } + if !testutil.HasDocker() { + t.Skip("docker not available") + } + + env := newTestEnv(t) + defer env.cleanup(t) + + t.Log(">>> starting SeaweedFS for Unity Catalog test...") + env.startSeaweedFS(t, "") + t.Log(">>> SeaweedFS ready") + + ctx, cancel := context.WithTimeout(context.Background(), 8*time.Minute) + defer cancel() + + s3c := env.newHostS3Client(t, ctx) + if _, err := s3c.CreateBucket(ctx, &s3.CreateBucketInput{Bucket: aws.String(ucWarehouse)}); err != nil { + t.Fatalf("create warehouse bucket: %v", err) + } + + t.Log(">>> starting Unity Catalog server (static keys)...") + env.startUnityCatalog(t, ctx, ucServerOpts{}) + t.Log(">>> Unity Catalog ready") + + uc := newUCClient(fmt.Sprintf("http://127.0.0.1:%d", env.ucHostPort)) + + suffix := time.Now().UnixNano() + catalogName := fmt.Sprintf("seaweed_uc_%d", suffix) + schemaName := fmt.Sprintf("ns_%d", suffix) + tableName := fmt.Sprintf("events_%d", suffix) + tableLocation := fmt.Sprintf("s3://%s/%s/%s/%s", ucWarehouse, ucWarehouseKey, schemaName, tableName) + + defer func() { + _ = uc.deleteTable(context.Background(), catalogName+"."+schemaName+"."+tableName) + _ = uc.deleteSchema(context.Background(), catalogName+"."+schemaName) + _ = uc.deleteCatalog(context.Background(), catalogName) + }() + + t.Run("CreateCatalog", func(t *testing.T) { + got, err := uc.createCatalog(ctx, ucCreateCatalog{ + Name: catalogName, + Comment: "seaweedfs integration test", + StorageRoot: fmt.Sprintf("s3://%s/%s", ucWarehouse, ucWarehouseKey), + }) + if err != nil { + t.Fatalf("create catalog: %v", err) + } + if got.Name != catalogName { + t.Fatalf("catalog name = %q, want %q", got.Name, catalogName) + } + }) + + t.Run("CreateSchema", func(t *testing.T) { + got, err := uc.createSchema(ctx, ucCreateSchema{ + Name: schemaName, + CatalogName: catalogName, + }) + if err != nil { + t.Fatalf("create schema: %v", err) + } + if got.Name != schemaName || got.CatalogName != catalogName { + t.Fatalf("schema = %+v, want name=%q catalog=%q", got, schemaName, catalogName) + } + }) + + var createdTable ucTableInfo + t.Run("CreateExternalDeltaTable", func(t *testing.T) { + columns := []ucColumn{ + {Name: "id", TypeText: "long", TypeName: "LONG", TypeJSON: `{"name":"id","type":"long","nullable":true,"metadata":{}}`, Position: 0, Nullable: true}, + {Name: "value", TypeText: "string", TypeName: "STRING", TypeJSON: `{"name":"value","type":"string","nullable":true,"metadata":{}}`, Position: 1, Nullable: true}, + } + got, err := uc.createTable(ctx, ucCreateTable{ + Name: tableName, + CatalogName: catalogName, + SchemaName: schemaName, + TableType: "EXTERNAL", + DataSourceFormat: "DELTA", + Columns: columns, + StorageLocation: tableLocation, + }) + if err != nil { + t.Fatalf("create table: %v", err) + } + if got.TableType != "EXTERNAL" || got.DataSourceFormat != "DELTA" { + t.Fatalf("table types = %s/%s, want EXTERNAL/DELTA", got.TableType, got.DataSourceFormat) + } + if got.StorageLocation == "" { + t.Fatalf("table storage_location is empty in response: %+v", got) + } + createdTable = *got + }) + + t.Run("GetTable", func(t *testing.T) { + full := catalogName + "." + schemaName + "." + tableName + got, err := uc.getTable(ctx, full) + if err != nil { + t.Fatalf("get table: %v", err) + } + if got.Name != tableName { + t.Fatalf("get table name = %q, want %q", got.Name, tableName) + } + }) + + t.Run("ListTables", func(t *testing.T) { + tables, err := uc.listTables(ctx, catalogName, schemaName) + if err != nil { + t.Fatalf("list tables: %v", err) + } + found := false + for _, tbl := range tables { + if tbl.Name == tableName { + found = true + break + } + } + if !found { + t.Fatalf("created table %q not found in list", tableName) + } + }) + + t.Run("TemporaryTableCredentialsRejected", func(t *testing.T) { + // With aws.masterRoleArn empty AND no s3.sessionToken.0 set, UC OSS + // always tries to AssumeRole via its internal StsClient (see + // AwsCredentialVendor.createPerBucketCredentialGenerator). Against a + // non-AWS endpoint, that call doesn't reach a real STS, so UC returns + // "S3 bucket configuration not found." or an STS-side error. This is + // the gap users hit at : + // "with simple S3 access and secret keys, Unity Catalog does not seem + // to work." + // + // The assertion is therefore inverted: we expect a non-nil error from + // /temporary-table-credentials with this configuration. A future + // variant can pin s3.sessionToken.0 (UC's StaticAwsCredentialGenerator + // path) once SeaweedFS' SigV4 path tolerates the vended session token. + if createdTable.TableID == "" { + t.Fatalf("created table has empty table_id; cannot request temporary credentials") + } + _, err := uc.generateTemporaryTableCredentials(ctx, createdTable.TableID, "READ_WRITE") + if err == nil { + t.Fatalf("expected /temporary-table-credentials to fail with the static-key playground configuration; it succeeded unexpectedly") + } + t.Logf("expected failure (UC static-key path requires AWS STS): %v", err) + }) + + t.Run("DeleteTableSchemaCatalog", func(t *testing.T) { + if err := uc.deleteTable(ctx, catalogName+"."+schemaName+"."+tableName); err != nil { + t.Fatalf("delete table: %v", err) + } + if err := uc.deleteSchema(ctx, catalogName+"."+schemaName); err != nil { + t.Fatalf("delete schema: %v", err) + } + if err := uc.deleteCatalog(ctx, catalogName); err != nil { + t.Fatalf("delete catalog: %v", err) + } + }) +}