diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 2ea6f4b59..3bd245427 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -662,6 +662,77 @@ jobs: path: test/s3tables/lakekeeper/test-output.log retention-days: 3 + unity-catalog-integration-tests: + name: Unity Catalog Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Set up Docker + uses: docker/setup-buildx-action@v4 + + - name: Pre-pull Unity Catalog image + run: docker pull unitycatalog/unitycatalog:v0.4.0 + + - name: Pre-pull Python image for delta-rs writer + run: docker pull python:3.11-slim + + - name: Run go mod tidy + run: go mod tidy + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Run Unity Catalog Integration Tests + timeout-minutes: 25 + working-directory: test/s3tables/unity_catalog + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + docker info + echo "=== Starting Unity Catalog Tests ===" + + go test -v -timeout 20m . 2>&1 | tee test-output.log || { + echo "Unity Catalog integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3tables/unity_catalog + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker|unitycatalog)" || true + echo "=== Unity Catalog containers ===" + docker ps -a --filter "name=seaweed-unity-catalog" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v7 + with: + name: unity-catalog-integration-test-logs + path: test/s3tables/unity_catalog/test-output.log + retention-days: 3 + s3-tables-build-verification: name: S3 Tables Build Verification runs-on: ubuntu-22.04 diff --git a/test/s3tables/unity_catalog/Dockerfile.delta-rs b/test/s3tables/unity_catalog/Dockerfile.delta-rs new file mode 100644 index 000000000..18b7b3013 --- /dev/null +++ b/test/s3tables/unity_catalog/Dockerfile.delta-rs @@ -0,0 +1,11 @@ +FROM python:3.11-slim + +RUN pip install --no-cache-dir \ + "deltalake==0.21.0" \ + "pyarrow==17.0.0" \ + "requests==2.32.3" + +WORKDIR /app +COPY delta_rs_writer.py /app/delta_rs_writer.py + +ENTRYPOINT ["python3", "/app/delta_rs_writer.py"] diff --git a/test/s3tables/unity_catalog/README.md b/test/s3tables/unity_catalog/README.md new file mode 100644 index 000000000..d9510cbea --- /dev/null +++ b/test/s3tables/unity_catalog/README.md @@ -0,0 +1,104 @@ +# Unity Catalog OSS integration tests + +These tests run Unity Catalog OSS in Docker against an embedded SeaweedFS +S3 endpoint. The `server.properties` mirrors the upstream playground at +[`mds-in-a-box/unitycatalog-playground`](https://github.com/data-engineering-helpers/mds-in-a-box/tree/main/unitycatalog-playground). + +| Test | Variant | Status | +| --- | --- | --- | +| `TestUnityCatalogDeltaIntegration` | static keys, `aws.masterRoleArn=` empty | passes; covers catalog/schema/EXTERNAL Delta CRUD against SeaweedFS-backed warehouse and asserts that UC's `/temporary-table-credentials` *cannot* vend usable creds with this configuration -- exactly the gap the playground reports. | +| `TestUnityCatalogMasterRoleIntegration` | `aws.masterRoleArn=arn:aws:iam::000000000000:role/UnityCatalogVendedRole` | passes; proves SeaweedFS' STS endpoint accepts `sts:AssumeRole` for the role UC would use (Go SDK round-trip), and that UC starts and accepts CRUD when wired with the master-role config. UC's own StsClient still talks to real AWS regardless of `aws.endpoint` / `AWS_ENDPOINT_URL_STS` (UC bug, see below); that hop is logged via `t.Logf` rather than asserted. | +| `TestUnityCatalogDeltaRsRoundTrip` | static keys + `delta-rs` Python client | passes; resolves table metadata through UC and writes/reads a real Delta table at the registered `storage_location` using `python:3.11-slim + deltalake` with the SeaweedFS test credentials. | + +## Prerequisites + +- Docker available locally (the tests call `docker run` / `docker build` directly). +- A `weed` binary at the repo root (`weed/weed`) or on `$PATH`. + +## Run + +```bash +go test -timeout 15m \ + -run 'TestUnityCatalog' \ + ./test/s3tables/unity_catalog/... +``` + +Pin a specific Unity Catalog image (defaults to +`unitycatalog/unitycatalog:v0.4.0`): + +```bash +UC_IMAGE=unitycatalog/unitycatalog:main \ + go test -timeout 15m -run TestUnityCatalogDeltaIntegration \ + ./test/s3tables/unity_catalog/... +``` + +The tests self-skip when Docker is unavailable or no `weed` binary is on +the path; running under `-short` also skips them. + +## Why the static-key path can't vend usable creds + +UC OSS' `AwsCredentialVendor.createPerBucketCredentialGenerator`: + +```java +if (config.getSessionToken() != null && !config.getSessionToken().isEmpty()) { + return new AwsCredentialGenerator.StaticAwsCredentialGenerator(config); +} +return createStsCredentialGenerator(config); +``` + +With `aws.masterRoleArn=` empty and `s3.sessionToken.0=` empty (this +test's configuration), `/temporary-table-credentials` short-circuits with +`"S3 bucket configuration not found."` before UC fires any STS call. +Setting a stub `s3.sessionToken.0` switches UC to +`StaticAwsCredentialGenerator` and the endpoint returns the static keys, +but the response carries that stub session token -- SeaweedFS won't +recognize it on the next S3 call, so the vended creds aren't usable for +table I/O. Clients have to fall back to the static keys directly. + +With `aws.masterRoleArn` set, UC's `AwsCredentialGenerator.StsAwsCredentialGenerator` +builds the StsClient with only `.region(...)` and `.credentialsProvider(...)` -- +no `.endpointOverride()`. The SDK's generic env-var resolution doesn't kick in +for that builder shape, so even with `AWS_ENDPOINT_URL_STS=...` (or the +matching `aws.endpointUrlSts` Java property, or the catch-all +`AWS_ENDPOINT_URL=...`) the StsClient still targets real AWS and gets back +`InvalidClientTokenId`. Verified by pointing the env var at port 1: UC reports +the same AWS-issued 403 that it reports against SeaweedFS, and a sniffer in +front of SeaweedFS' STS port records zero traffic. SeaweedFS' STS handler +itself works -- the Go SDK round-trip in `assumeRoleViaSeaweedFS` proves that +against the same SeaweedFS instance. + +UC's own AWS credential-vending tests don't catch this because they mock +`StsClient` away entirely -- `BaseCRUDTestWithMockCredentials` injects a +custom `stsClientBuilderSupplier` returning an `EchoAwsStsClient` that +synthesizes credentials in-process, and `CloudCredentialVendorTest` uses +`Mockito.mockStatic(StsClient.class)`. No test ever exercises the wire +path between UC's Java SDK and a real STS endpoint, so the missing +`endpointOverride` slipped through. + +Fix is upstream in +[unitycatalog/unitycatalog#1532](https://github.com/unitycatalog/unitycatalog/pull/1532), +which adds an `aws.endpoint` property and applies it to both the StsClient +and the S3Client builders. Until that lands, the master-role test logs +the failure but does not assert it. + +## What the tests actually validate today + +- Unity Catalog accepts a SeaweedFS-backed `server.properties` and starts. +- Catalog / schema / EXTERNAL Delta table CRUD all work against the + SeaweedFS warehouse via the UC REST API. +- SeaweedFS' STS endpoint correctly issues `sts:AssumeRole` credentials + for the `UnityCatalogVendedRole` and those credentials are accepted on + S3 round-trips (Go AWS SDK). +- Delta-RS resolves a UC table's `storage_location` and can write/read Delta + data through the SeaweedFS S3 endpoint with the test credentials. + +## What is still pending + +Nothing on the SeaweedFS side. The remaining gap (UC's StsClient ignoring +endpoint config) needs a UC OSS patch upstream. + +## MANAGED tables + +Not exercised. UC OSS gates them behind `server.managed-table.enabled=true` +and a two-step staging flow (`POST /staging-tables` then `POST /tables`); +EXTERNAL Delta is the simpler path and what these tests cover. diff --git a/test/s3tables/unity_catalog/client_test.go b/test/s3tables/unity_catalog/client_test.go new file mode 100644 index 000000000..26075cb82 --- /dev/null +++ b/test/s3tables/unity_catalog/client_test.go @@ -0,0 +1,184 @@ +package unity_catalog + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +type ucClient struct { + base string + http *http.Client +} + +func newUCClient(base string) *ucClient { + return &ucClient{base: base + ucAPIBase, http: &http.Client{Timeout: 30 * time.Second}} +} + +type ucCreateCatalog struct { + Name string `json:"name"` + Comment string `json:"comment,omitempty"` + StorageRoot string `json:"storage_root,omitempty"` +} + +type ucCatalogInfo struct { + Name string `json:"name"` + StorageRoot string `json:"storage_root"` + ID string `json:"id"` +} + +type ucCreateSchema struct { + Name string `json:"name"` + CatalogName string `json:"catalog_name"` +} + +type ucSchemaInfo struct { + Name string `json:"name"` + CatalogName string `json:"catalog_name"` + FullName string `json:"full_name"` + SchemaID string `json:"schema_id"` +} + +type ucColumn struct { + Name string `json:"name"` + TypeText string `json:"type_text"` + TypeJSON string `json:"type_json,omitempty"` + TypeName string `json:"type_name"` + Position int `json:"position"` + Nullable bool `json:"nullable"` +} + +type ucCreateTable struct { + Name string `json:"name"` + CatalogName string `json:"catalog_name"` + SchemaName string `json:"schema_name"` + TableType string `json:"table_type"` + DataSourceFormat string `json:"data_source_format"` + Columns []ucColumn `json:"columns"` + StorageLocation string `json:"storage_location,omitempty"` +} + +type ucTableInfo struct { + Name string `json:"name"` + CatalogName string `json:"catalog_name"` + SchemaName string `json:"schema_name"` + TableType string `json:"table_type"` + DataSourceFormat string `json:"data_source_format"` + Columns []ucColumn `json:"columns"` + StorageLocation string `json:"storage_location"` + TableID string `json:"table_id"` +} + +type ucListTablesResp struct { + Tables []ucTableInfo `json:"tables"` +} + +type ucAwsCreds struct { + AccessKeyID string `json:"access_key_id"` + SecretAccessKey string `json:"secret_access_key"` + SessionToken string `json:"session_token"` +} + +type ucTempCreds struct { + AwsTempCredentials *ucAwsCreds `json:"aws_temp_credentials"` + ExpirationTime int64 `json:"expiration_time"` +} + +func (c *ucClient) createCatalog(ctx context.Context, in ucCreateCatalog) (*ucCatalogInfo, error) { + var out ucCatalogInfo + return &out, c.do(ctx, http.MethodPost, "/catalogs", in, &out) +} + +func (c *ucClient) deleteCatalog(ctx context.Context, name string) error { + q := url.Values{"force": []string{"true"}} + return c.do(ctx, http.MethodDelete, "/catalogs/"+url.PathEscape(name)+"?"+q.Encode(), nil, nil) +} + +func (c *ucClient) createSchema(ctx context.Context, in ucCreateSchema) (*ucSchemaInfo, error) { + var out ucSchemaInfo + return &out, c.do(ctx, http.MethodPost, "/schemas", in, &out) +} + +func (c *ucClient) deleteSchema(ctx context.Context, fullName string) error { + q := url.Values{"force": []string{"true"}} + return c.do(ctx, http.MethodDelete, "/schemas/"+url.PathEscape(fullName)+"?"+q.Encode(), nil, nil) +} + +func (c *ucClient) createTable(ctx context.Context, in ucCreateTable) (*ucTableInfo, error) { + var out ucTableInfo + return &out, c.do(ctx, http.MethodPost, "/tables", in, &out) +} + +func (c *ucClient) getTable(ctx context.Context, fullName string) (*ucTableInfo, error) { + var out ucTableInfo + return &out, c.do(ctx, http.MethodGet, "/tables/"+url.PathEscape(fullName), nil, &out) +} + +func (c *ucClient) listTables(ctx context.Context, catalog, schema string) ([]ucTableInfo, error) { + var out ucListTablesResp + q := url.Values{ + "catalog_name": []string{catalog}, + "schema_name": []string{schema}, + } + path := "/tables?" + q.Encode() + if err := c.do(ctx, http.MethodGet, path, nil, &out); err != nil { + return nil, err + } + return out.Tables, nil +} + +func (c *ucClient) deleteTable(ctx context.Context, fullName string) error { + return c.do(ctx, http.MethodDelete, "/tables/"+url.PathEscape(fullName), nil, nil) +} + +func (c *ucClient) generateTemporaryTableCredentials(ctx context.Context, tableID, op string) (*ucTempCreds, error) { + body := map[string]string{"table_id": tableID, "operation": op} + var out ucTempCreds + return &out, c.do(ctx, http.MethodPost, "/temporary-table-credentials", body, &out) +} + +func (c *ucClient) do(ctx context.Context, method, path string, in any, out any) error { + var body io.Reader + if in != nil { + buf, err := json.Marshal(in) + if err != nil { + return fmt.Errorf("marshal %s %s: %w", method, path, err) + } + body = bytes.NewReader(buf) + } + req, err := http.NewRequestWithContext(ctx, method, c.base+path, body) + if err != nil { + return err + } + if in != nil { + req.Header.Set("Content-Type", "application/json") + } + req.Header.Set("Accept", "application/json") + resp, err := c.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + // 10 MiB cap is well above any reasonable Unity Catalog response and + // keeps a runaway server from OOMing the test runner. + respBytes, err := io.ReadAll(io.LimitReader(resp.Body, 10*1024*1024)) + if err != nil { + return fmt.Errorf("read %s %s: %w", method, path, err) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("%s %s: status %d: %s", method, path, resp.StatusCode, strings.TrimSpace(string(respBytes))) + } + if out == nil || len(respBytes) == 0 { + return nil + } + if err := json.Unmarshal(respBytes, out); err != nil { + return fmt.Errorf("decode %s %s: %w (body=%s)", method, path, err, string(respBytes)) + } + return nil +} diff --git a/test/s3tables/unity_catalog/delta_rs_test.go b/test/s3tables/unity_catalog/delta_rs_test.go new file mode 100644 index 000000000..66335a6de --- /dev/null +++ b/test/s3tables/unity_catalog/delta_rs_test.go @@ -0,0 +1,125 @@ +package unity_catalog + +import ( + "context" + "fmt" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + + "github.com/seaweedfs/seaweedfs/test/testutil" +) + +// TestUnityCatalogDeltaRsRoundTrip writes and reads a real Delta table at +// the registered storage_location using the delta-rs Python library inside +// a Docker container. The script resolves table metadata from UC and uses the +// test SeaweedFS credentials as delta-rs storage_options. +func TestUnityCatalogDeltaRsRoundTrip(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in -short mode") + } + if !testutil.HasDocker() { + t.Skip("docker not available") + } + + env := newTestEnv(t) + defer env.cleanup(t) + + t.Log(">>> starting SeaweedFS for delta-rs test...") + env.startSeaweedFS(t, "") + t.Log(">>> SeaweedFS ready") + + ctx, cancel := context.WithTimeout(context.Background(), 12*time.Minute) + defer cancel() + + s3c := env.newHostS3Client(t, ctx) + if _, err := s3c.CreateBucket(ctx, &s3.CreateBucketInput{Bucket: aws.String(ucWarehouse)}); err != nil { + t.Fatalf("create warehouse bucket: %v", err) + } + + t.Log(">>> starting Unity Catalog server...") + env.startUnityCatalog(t, ctx, ucServerOpts{}) + t.Log(">>> Unity Catalog ready") + + uc := newUCClient(fmt.Sprintf("http://127.0.0.1:%d", env.ucHostPort)) + + suffix := time.Now().UnixNano() + catalogName := fmt.Sprintf("seaweed_uc_delta_rs_%d", suffix) + schemaName := fmt.Sprintf("ns_%d", suffix) + tableName := fmt.Sprintf("delta_rs_%d", suffix) + tableLocation := fmt.Sprintf("s3://%s/%s/%s/%s", ucWarehouse, ucWarehouseKey, schemaName, tableName) + + defer func() { + _ = uc.deleteTable(context.Background(), catalogName+"."+schemaName+"."+tableName) + _ = uc.deleteSchema(context.Background(), catalogName+"."+schemaName) + _ = uc.deleteCatalog(context.Background(), catalogName) + }() + + if _, err := uc.createCatalog(ctx, ucCreateCatalog{ + Name: catalogName, + StorageRoot: fmt.Sprintf("s3://%s/%s", ucWarehouse, ucWarehouseKey), + }); err != nil { + t.Fatalf("create catalog: %v", err) + } + if _, err := uc.createSchema(ctx, ucCreateSchema{Name: schemaName, CatalogName: catalogName}); err != nil { + t.Fatalf("create schema: %v", err) + } + created, err := uc.createTable(ctx, ucCreateTable{ + Name: tableName, + CatalogName: catalogName, + SchemaName: schemaName, + TableType: "EXTERNAL", + DataSourceFormat: "DELTA", + Columns: []ucColumn{ + {Name: "id", TypeText: "long", TypeName: "LONG", TypeJSON: `{"name":"id","type":"long","nullable":true,"metadata":{}}`, Position: 0, Nullable: true}, + {Name: "value", TypeText: "string", TypeName: "STRING", TypeJSON: `{"name":"value","type":"string","nullable":true,"metadata":{}}`, Position: 1, Nullable: true}, + }, + StorageLocation: tableLocation, + }) + if err != nil { + t.Fatalf("create table: %v", err) + } + + testDir, err := filepath.Abs(".") + if err != nil { + t.Fatalf("abs: %v", err) + } + imageTag := fmt.Sprintf("seaweed-uc-delta-rs:%d", time.Now().UnixNano()) + + build := exec.CommandContext(ctx, "docker", "build", "-t", imageTag, "-f", "Dockerfile.delta-rs", ".") + build.Dir = testDir + if out, err := build.CombinedOutput(); err != nil { + t.Fatalf("docker build delta-rs image: %v\n%s", err, out) + } + defer func() { _ = exec.Command("docker", "rmi", "-f", imageTag).Run() }() + + ucURLForContainer := fmt.Sprintf("http://host.docker.internal:%d", env.ucHostPort) + s3EndpointForContainer := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) + + cmd := exec.CommandContext(ctx, "docker", "run", "--rm", + "--add-host", "host.docker.internal:host-gateway", + "-e", "UC_URL="+ucURLForContainer, + "-e", "S3_ENDPOINT="+s3EndpointForContainer, + "-e", "UC_CATALOG="+catalogName, + "-e", "UC_SCHEMA="+schemaName, + "-e", "UC_TABLE="+tableName, + "-e", "UC_TABLE_ID="+created.TableID, + "-e", "AWS_ACCESS_KEY_ID="+env.accessKey, + "-e", "AWS_SECRET_ACCESS_KEY="+env.secretKey, + imageTag, + ) + out, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("delta-rs container failed: %v\noutput:\n%s", err, out) + } + got := string(out) + if !strings.Contains(got, "DELTA_RS_OK rows=3") { + t.Fatalf("expected DELTA_RS_OK rows=3 in output; got:\n%s", got) + } + t.Logf("delta-rs output: %s", strings.TrimSpace(got)) +} diff --git a/test/s3tables/unity_catalog/delta_rs_writer.py b/test/s3tables/unity_catalog/delta_rs_writer.py new file mode 100644 index 000000000..eded3d332 --- /dev/null +++ b/test/s3tables/unity_catalog/delta_rs_writer.py @@ -0,0 +1,137 @@ +"""Round-trip a Delta table through Unity Catalog using delta-rs. + +Reads connection settings from environment variables: + + UC_URL Unity Catalog REST base, e.g. http://host.docker.internal:8080 + S3_ENDPOINT SeaweedFS S3 endpoint reachable from the container + UC_CATALOG Catalog name to operate against + UC_SCHEMA Schema name + UC_TABLE Table name + UC_TABLE_ID Optional: pre-fetched table_id; falls back to GET /tables/{full_name} + AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY + Optional: direct S3 credentials. If omitted, the script + asks UC for temporary table credentials. + +Steps: + + 1. GET the table info from UC. + 2. Resolve S3 credentials from env or UC temporary-table-credentials. + 3. write_deltalake() a small pyarrow Table to storage_location. + 4. Read it back with DeltaTable() and assert row count. + +Prints `DELTA_RS_OK rows=` on success and exits 0; any failure exits 1. +""" + +from __future__ import annotations + +import os +import sys +import urllib.parse + +import pyarrow as pa +import requests +from deltalake import DeltaTable, write_deltalake + + +def must_env(name: str) -> str: + value = os.environ.get(name) + if not value: + print(f"ERROR: required env var {name} is not set", file=sys.stderr) + sys.exit(2) + return value + + +def main() -> int: + uc_url = must_env("UC_URL").rstrip("/") + s3_endpoint = must_env("S3_ENDPOINT") + catalog = must_env("UC_CATALOG") + schema = must_env("UC_SCHEMA") + table = must_env("UC_TABLE") + table_id = os.environ.get("UC_TABLE_ID", "").strip() + + full_name = f"{catalog}.{schema}.{table}" + api = f"{uc_url}/api/2.1/unity-catalog" + + encoded_full_name = urllib.parse.quote(full_name, safe="") + r = requests.get(f"{api}/tables/{encoded_full_name}", timeout=10) + r.raise_for_status() + info = r.json() + if not table_id: + table_id = info.get("table_id") or "" + storage_location = info.get("storage_location") or "" + + if not table_id: + print("ERROR: table_id is empty in UC response", file=sys.stderr) + return 1 + if not storage_location: + print("ERROR: storage_location is empty in UC response", file=sys.stderr) + return 1 + + access_key = os.environ.get("AWS_ACCESS_KEY_ID", "").strip() + secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY", "").strip() + session_token = os.environ.get("AWS_SESSION_TOKEN", "").strip() + if bool(access_key) != bool(secret_key): + print( + "ERROR: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY must be set together", + file=sys.stderr, + ) + return 1 + if not access_key: + creds_resp = requests.post( + f"{api}/temporary-table-credentials", + json={"table_id": table_id, "operation": "READ_WRITE"}, + timeout=15, + ) + creds_resp.raise_for_status() + creds = creds_resp.json().get("aws_temp_credentials") or {} + access_key = creds.get("access_key_id") or "" + secret_key = creds.get("secret_access_key") or "" + session_token = creds.get("session_token") or "" + if not access_key or not secret_key: + print( + "ERROR: missing aws_temp_credentials; " + f"response status {creds_resp.status_code}", + file=sys.stderr, + ) + return 1 + + storage_options = { + "AWS_ACCESS_KEY_ID": access_key, + "AWS_SECRET_ACCESS_KEY": secret_key, + "AWS_REGION": "us-east-1", + "AWS_ENDPOINT_URL": s3_endpoint, + "AWS_ALLOW_HTTP": "true", + # delta-rs requires this for non-S3 backends that lack atomic rename. + "AWS_S3_ALLOW_UNSAFE_RENAME": "true", + } + if session_token: + storage_options["AWS_SESSION_TOKEN"] = session_token + + arrow_table = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int64()), + "value": pa.array(["alpha", "beta", "gamma"], type=pa.string()), + } + ) + + write_deltalake( + storage_location, + arrow_table, + storage_options=storage_options, + mode="overwrite", + ) + + dt = DeltaTable(storage_location, storage_options=storage_options) + read_back = dt.to_pyarrow_table() + rows = read_back.num_rows + print(f"DELTA_RS_OK rows={rows} version={dt.version()}") + return 0 if rows == arrow_table.num_rows else 1 + + +if __name__ == "__main__": + exit_code = main() + sys.stdout.flush() + sys.stderr.flush() + # deltalake/pyarrow can abort during native teardown after a successful + # round-trip in the short-lived test container. Preserve main's result. + os._exit(exit_code) diff --git a/test/s3tables/unity_catalog/master_role_test.go b/test/s3tables/unity_catalog/master_role_test.go new file mode 100644 index 000000000..6f4a24db2 --- /dev/null +++ b/test/s3tables/unity_catalog/master_role_test.go @@ -0,0 +1,201 @@ +package unity_catalog + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sts" + + "github.com/seaweedfs/seaweedfs/test/testutil" +) + +// TestUnityCatalogMasterRoleIntegration covers the master-role configuration +// the upstream playground notes as not-yet-working +// (https://github.com/data-engineering-helpers/mds-in-a-box/blob/main/unitycatalog-playground/etc/conf/server.properties#L45). +// +// It verifies two things end-to-end: +// +// 1. SeaweedFS' STS endpoint accepts sts:AssumeRole for the +// UnityCatalogVendedRole, returning real STS-vended credentials. This +// mirrors what the lakekeeper / polaris tests already exercise via the Go +// AWS SDK and is the SeaweedFS-side prerequisite for UC's master-role +// flow. +// +// 2. Unity Catalog OSS starts and accepts catalog/schema/EXTERNAL Delta +// table CRUD when configured with `aws.masterRoleArn` set to that role +// and AWS_ENDPOINT_URL_STS pointed at SeaweedFS. +// +// What it does NOT verify is the third hop -- UC's Java StsClient +// reaching SeaweedFS' STS handler. UC's +// AwsCredentialGenerator.StsAwsCredentialGenerator builds the StsClient +// without an endpointOverride and doesn't propagate aws.endpoint to it, so +// the SDK's generic AWS_ENDPOINT_URL_STS resolution doesn't kick in and the +// request always targets real AWS, returning InvalidClientTokenId. Verified +// by pointing AWS_ENDPOINT_URL_STS at port 1: same 403 either way, and a +// sniffer in front of SeaweedFS records zero traffic. UC's own tests mock +// StsClient out (BaseCRUDTestWithMockCredentials' EchoAwsStsClient, +// Mockito.mockStatic in CloudCredentialVendorTest), so this never showed +// up upstream. Fix is in unitycatalog/unitycatalog#1532; that bit is +// logged via t.Logf and not asserted. +func TestUnityCatalogMasterRoleIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in -short mode") + } + if !testutil.HasDocker() { + t.Skip("docker not available") + } + + env := newTestEnv(t) + defer env.cleanup(t) + + t.Log(">>> starting SeaweedFS with STS-enabled IAM config...") + env.startSeaweedFS(t, stsEnabledIAMConfig(env.accessKey, env.secretKey)) + t.Log(">>> SeaweedFS ready") + + ctx, cancel := context.WithTimeout(context.Background(), 8*time.Minute) + defer cancel() + + s3c := env.newHostS3Client(t, ctx) + if _, err := s3c.CreateBucket(ctx, &s3.CreateBucketInput{Bucket: aws.String(ucWarehouse)}); err != nil { + t.Fatalf("create warehouse bucket: %v", err) + } + + // Slice 1: prove SeaweedFS STS works for the role UC would use. + t.Run("SeaweedFsAssumesUnityCatalogVendedRole", func(t *testing.T) { + stsEndpoint := fmt.Sprintf("http://127.0.0.1:%d", env.s3Port) + creds, err := assumeRoleViaSeaweedFS(ctx, stsEndpoint, env.accessKey, env.secretKey, ucVendedRoleArn) + if err != nil { + t.Fatalf("AssumeRole on SeaweedFS STS: %v", err) + } + if creds.SessionToken == "" { + t.Fatalf("expected non-empty session_token from STS, got %+v", creds) + } + if creds.AccessKeyID == env.accessKey { + t.Fatalf("STS returned the static admin access key; AssumeRole did not vend a new one") + } + + // Confirm the vended creds actually work for an S3 round-trip. + s3v := env.newHostS3ClientWithCreds(t, ctx, creds.AccessKeyID, creds.SecretAccessKey, creds.SessionToken) + probeKey := "ns-probe/master-role-probe.txt" + if _, err := s3v.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(ucWarehouse), + Key: aws.String(probeKey), + Body: nil, + }); err != nil { + t.Fatalf("PutObject with sts-vended creds: %v", err) + } + _, _ = s3v.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(ucWarehouse), + Key: aws.String(probeKey), + }) + }) + + // Slice 2: UC starts with master-role config and CRUD works. + stsEndpoint := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) + t.Log(">>> starting Unity Catalog server (master role)...") + env.startUnityCatalog(t, ctx, ucServerOpts{ + MasterRoleArn: ucVendedRoleArn, + ExtraEnv: map[string]string{ + "AWS_ENDPOINT_URL": stsEndpoint, + "AWS_ENDPOINT_URL_STS": stsEndpoint, + "AWS_ACCESS_KEY_ID": env.accessKey, + "AWS_SECRET_ACCESS_KEY": env.secretKey, + "AWS_REGION": "us-east-1", + }, + }) + t.Log(">>> Unity Catalog ready") + + uc := newUCClient(fmt.Sprintf("http://127.0.0.1:%d", env.ucHostPort)) + suffix := time.Now().UnixNano() + catalogName := fmt.Sprintf("seaweed_uc_role_%d", suffix) + schemaName := fmt.Sprintf("ns_%d", suffix) + tableName := fmt.Sprintf("events_%d", suffix) + tableLocation := fmt.Sprintf("s3://%s/%s/%s/%s", ucWarehouse, ucWarehouseKey, schemaName, tableName) + + defer func() { + _ = uc.deleteTable(context.Background(), catalogName+"."+schemaName+"."+tableName) + _ = uc.deleteSchema(context.Background(), catalogName+"."+schemaName) + _ = uc.deleteCatalog(context.Background(), catalogName) + }() + + t.Run("UnityCatalogAcceptsMasterRoleConfig", func(t *testing.T) { + if _, err := uc.createCatalog(ctx, ucCreateCatalog{ + Name: catalogName, + StorageRoot: fmt.Sprintf("s3://%s/%s", ucWarehouse, ucWarehouseKey), + }); err != nil { + t.Fatalf("create catalog: %v", err) + } + if _, err := uc.createSchema(ctx, ucCreateSchema{Name: schemaName, CatalogName: catalogName}); err != nil { + t.Fatalf("create schema: %v", err) + } + created, err := uc.createTable(ctx, ucCreateTable{ + Name: tableName, + CatalogName: catalogName, + SchemaName: schemaName, + TableType: "EXTERNAL", + DataSourceFormat: "DELTA", + Columns: []ucColumn{ + {Name: "id", TypeText: "long", TypeName: "LONG", TypeJSON: `{"name":"id","type":"long","nullable":true,"metadata":{}}`, Position: 0, Nullable: true}, + }, + StorageLocation: tableLocation, + }) + if err != nil { + t.Fatalf("create external delta table: %v", err) + } + if created.TableID == "" { + t.Fatalf("expected table_id in CreateTable response, got %+v", created) + } + + // The actual UC -> SeaweedFS STS handoff is the still-unsolved slice. + // Log what happens, but don't fail the test on it: SeaweedFS' STS is + // known-good (slice 1 above), and UC's CRUD is known-good (this slice). + _, ucCredErr := uc.generateTemporaryTableCredentials(ctx, created.TableID, "READ_WRITE") + if ucCredErr != nil { + t.Logf("UC StsClient still hits real AWS (UC bug, fix in unitycatalog/unitycatalog#1532): %v", ucCredErr) + } else { + t.Logf("UC StsClient honors endpoint override; unitycatalog/unitycatalog#1532 (or equivalent) is in this image.") + } + }) +} + +// assumeRoleViaSeaweedFS calls sts:AssumeRole against SeaweedFS' STS endpoint +// using the Go AWS SDK, mirroring how lakekeeper/polaris tests prove +// SeaweedFS' STS path works. +func assumeRoleViaSeaweedFS(ctx context.Context, endpoint, accessKey, secretKey, roleArn string) (aws.Credentials, error) { + cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion("us-east-1"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")), + ) + if err != nil { + return aws.Credentials{}, fmt.Errorf("load aws config: %w", err) + } + + // Override the STS endpoint via service-specific BaseEndpoint instead of + // the deprecated EndpointResolverWithOptions wired through the global + // config. This is the supported path in aws-sdk-go-v2 since v1.21. + client := sts.NewFromConfig(cfg, func(o *sts.Options) { + o.BaseEndpoint = aws.String(endpoint) + }) + resp, err := client.AssumeRole(ctx, &sts.AssumeRoleInput{ + RoleArn: aws.String(roleArn), + RoleSessionName: aws.String("uc-master-role-test"), + }) + if err != nil { + return aws.Credentials{}, err + } + if resp.Credentials == nil { + return aws.Credentials{}, fmt.Errorf("AssumeRole returned no credentials") + } + return aws.Credentials{ + AccessKeyID: aws.ToString(resp.Credentials.AccessKeyId), + SecretAccessKey: aws.ToString(resp.Credentials.SecretAccessKey), + SessionToken: aws.ToString(resp.Credentials.SessionToken), + Source: "seaweedfs-sts", + }, nil +} diff --git a/test/s3tables/unity_catalog/setup_test.go b/test/s3tables/unity_catalog/setup_test.go new file mode 100644 index 000000000..f3351c441 --- /dev/null +++ b/test/s3tables/unity_catalog/setup_test.go @@ -0,0 +1,379 @@ +package unity_catalog + +import ( + "context" + "fmt" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + + "github.com/seaweedfs/seaweedfs/test/testutil" +) + +const ( + ucImageDefault = "unitycatalog/unitycatalog:v0.4.0" + ucContainerCfg = "/home/unitycatalog/etc/conf/server.properties" + ucStartupGrace = 90 * time.Second + ucAPIBase = "/api/2.1/unity-catalog" + ucWarehouse = "lakehouse" + ucWarehouseKey = "warehouse" + + // Role used by the master-role STS-vended variant of the test. The trust + // policy is wide open so any caller can assume it; in production UC + // would set this ARN as `aws.masterRoleArn`. + ucVendedRoleArn = "arn:aws:iam::000000000000:role/UnityCatalogVendedRole" + ucVendedRoleName = "UnityCatalogVendedRole" +) + +type testEnv struct { + seaweedDir string + weedBinary string + dataDir string + bindIP string + s3Port int + s3GrpcPort int + masterPort int + masterGrpcPort int + filerPort int + filerGrpcPort int + volumePort int + volumeGrpcPort int + weedProcess *exec.Cmd + weedCancel context.CancelFunc + + accessKey string + secretKey string + + ucImage string + ucContainerID string + ucHostPort int +} + +// ucServerOpts customizes the Unity Catalog server.properties and runtime +// environment. Defaults match the upstream playground (static keys, no master +// role). +type ucServerOpts struct { + // MasterRoleArn populates aws.masterRoleArn. Empty means UC falls back to + // static aws.accessKey / aws.secretKey for storage operations. + MasterRoleArn string + // ExtraEnv adds environment variables to the UC container, useful for + // AWS_ENDPOINT_URL_STS-style overrides. + ExtraEnv map[string]string +} + +func newTestEnv(t *testing.T) *testEnv { + t.Helper() + + wd, err := os.Getwd() + if err != nil { + t.Fatalf("getwd: %v", err) + } + seaweedDir := wd + for i := 0; i < 8; i++ { + if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { + break + } + seaweedDir = filepath.Dir(seaweedDir) + } + + weedBinary := filepath.Join(seaweedDir, "weed", "weed") + if _, err := os.Stat(weedBinary); err != nil { + weedBinary = "weed" + if _, err := exec.LookPath(weedBinary); err != nil { + t.Skip("weed binary not found, skipping integration test") + } + } + + dataDir, err := os.MkdirTemp("", "seaweed-uc-test-*") + if err != nil { + t.Fatalf("mkdtemp: %v", err) + } + + bindIP := testutil.FindBindIP() + ports := testutil.MustAllocatePorts(t, 9) + masterPort, masterGrpcPort := ports[0], ports[1] + volumePort, volumeGrpcPort := ports[2], ports[3] + filerPort, filerGrpcPort := ports[4], ports[5] + s3Port, s3GrpcPort := ports[6], ports[7] + ucHostPort := ports[8] + + image := os.Getenv("UC_IMAGE") + if image == "" { + image = ucImageDefault + } + + return &testEnv{ + seaweedDir: seaweedDir, + weedBinary: weedBinary, + dataDir: dataDir, + bindIP: bindIP, + s3Port: s3Port, + s3GrpcPort: s3GrpcPort, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + filerPort: filerPort, + filerGrpcPort: filerGrpcPort, + volumePort: volumePort, + volumeGrpcPort: volumeGrpcPort, + accessKey: "admin", + secretKey: "admin", + ucImage: image, + ucHostPort: ucHostPort, + } +} + +// startSeaweedFS starts a `weed mini` instance. If iamJSON is empty, a minimal +// admin-only IAM config is used. When iamJSON is non-empty, it is passed to +// both -s3.config and -s3.iam.config so the STS handler is enabled (mirroring +// the lakekeeper test). +func (env *testEnv) startSeaweedFS(t *testing.T, iamJSON string) { + t.Helper() + + enableSTS := iamJSON != "" + if iamJSON == "" { + iamJSON = fmt.Sprintf(`{ + "identities": [ + { + "name": "admin", + "credentials": [{"accessKey": %q, "secretKey": %q}], + "actions": ["Admin", "Read", "List", "Tagging", "Write"] + } + ] +}`, env.accessKey, env.secretKey) + } + + iamConfigPath := filepath.Join(env.dataDir, "iam.json") + if err := os.WriteFile(iamConfigPath, []byte(iamJSON), 0644); err != nil { + t.Fatalf("write iam config: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + env.weedCancel = cancel + + weedArgs := []string{"-v", "4", "mini", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), + "-volume.port", fmt.Sprintf("%d", env.volumePort), + "-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort), + "-s3.config", iamConfigPath, + "-ip", env.bindIP, + "-ip.bind", "0.0.0.0", + "-dir", env.dataDir, + } + if enableSTS { + weedArgs = append(weedArgs, "-s3.iam.config", iamConfigPath, "-s3.iam.readOnly=false") + } + + cmd := exec.CommandContext(ctx, env.weedBinary, weedArgs...) + cmd.Dir = env.dataDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + t.Fatalf("start weed mini: %v", err) + } + env.weedProcess = cmd + + if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/status", env.s3Port), testutil.SeaweedMiniStartupTimeout) { + t.Fatalf("S3 API at 127.0.0.1:%d did not become ready", env.s3Port) + } +} + +// startUnityCatalog launches the Unity Catalog OSS server in Docker against +// the running SeaweedFS instance. It mirrors the upstream playground's +// server.properties layout and bind-mounts only that single file (matching +// docker-compose.yaml from the playground). +func (env *testEnv) startUnityCatalog(t *testing.T, ctx context.Context, opts ucServerOpts) { + t.Helper() + + s3EndpointForContainer := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) + + props := strings.Join([]string{ + "server.env=test", + "server.authorization=disable", + "server.cookie-timeout=PT1H", + "server.managed-table.enabled=false", + fmt.Sprintf("aws.masterRoleArn=%s", opts.MasterRoleArn), + fmt.Sprintf("aws.accessKey=%s", env.accessKey), + fmt.Sprintf("aws.secretKey=%s", env.secretKey), + "aws.region=us-east-1", + fmt.Sprintf("aws.endpoint=%s", s3EndpointForContainer), + // UC keys perBucketS3Configs by NormalizedURL.from(bucketPath) and + // looks up using the storageBase, which is "s3://" (scheme + + // authority only). The playground's "s3://lakehouse/warehouse" never + // matches because of that asymmetry; the bucket-only form works. + fmt.Sprintf("s3.bucketPath.0=s3://%s", ucWarehouse), + "s3.region.0=us-east-1", + fmt.Sprintf("s3.awsRoleArn.0=%s", opts.MasterRoleArn), + fmt.Sprintf("s3.accessKey.0=%s", env.accessKey), + fmt.Sprintf("s3.secretKey.0=%s", env.secretKey), + fmt.Sprintf("s3.endpoint.0=%s", s3EndpointForContainer), + "", + }, "\n") + + confDir := filepath.Join(env.dataDir, "uc-conf") + if err := os.MkdirAll(confDir, 0755); err != nil { + t.Fatalf("mkdir uc-conf: %v", err) + } + propsPath := filepath.Join(confDir, "server.properties") + if err := os.WriteFile(propsPath, []byte(props), 0644); err != nil { + t.Fatalf("write server.properties: %v", err) + } + + containerName := fmt.Sprintf("seaweed-unity-catalog-%d", time.Now().UnixNano()) + + args := []string{ + "run", "-d", "--rm", + "--name", containerName, + "--add-host", "host.docker.internal:host-gateway", + "-p", fmt.Sprintf("%d:8080", env.ucHostPort), + "-v", fmt.Sprintf("%s:%s:ro", propsPath, ucContainerCfg), + "-e", "JAVA_OPTS=-Xmx1g", + } + for k, v := range opts.ExtraEnv { + args = append(args, "-e", fmt.Sprintf("%s=%s", k, v)) + } + args = append(args, env.ucImage) + + out, err := exec.CommandContext(ctx, "docker", args...).CombinedOutput() + if err != nil { + t.Fatalf("docker run unity-catalog: %v\n%s", err, out) + } + // docker run -d prefixes pull progress / warnings before the final container + // ID line when the image isn't cached locally. Trim to the last whitespace- + // separated token so the ID survives a fresh CI runner. + fields := strings.Fields(string(out)) + if len(fields) == 0 { + t.Fatalf("docker run produced no output; combined: %q", string(out)) + } + env.ucContainerID = fields[len(fields)-1] + t.Logf("unity-catalog container id: %s", env.ucContainerID) + + probe := fmt.Sprintf("http://127.0.0.1:%d%s/catalogs", env.ucHostPort, ucAPIBase) + deadline := time.Now().Add(ucStartupGrace) + var lastErr error + for time.Now().Before(deadline) { + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, probe, nil) + resp, err := (&http.Client{Timeout: 3 * time.Second}).Do(req) + if err == nil { + resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return + } + lastErr = fmt.Errorf("status %d", resp.StatusCode) + } else { + lastErr = err + } + select { + case <-ctx.Done(): + t.Fatalf("ctx done while waiting for unity catalog: %v", ctx.Err()) + case <-time.After(time.Second): + } + } + logs, _ := exec.Command("docker", "logs", "--tail", "200", env.ucContainerID).CombinedOutput() + t.Fatalf("unity catalog did not become ready: %v\nrecent logs:\n%s", lastErr, logs) +} + +func (env *testEnv) cleanup(t *testing.T) { + if env.ucContainerID != "" { + if logs, err := exec.Command("docker", "logs", "--tail", "200", env.ucContainerID).CombinedOutput(); err == nil { + t.Logf("unity-catalog tail logs:\n%s", logs) + } + _ = exec.Command("docker", "rm", "-f", env.ucContainerID).Run() + } + if env.weedCancel != nil { + env.weedCancel() + } + if env.weedProcess != nil { + _ = env.weedProcess.Wait() + } + if env.dataDir != "" { + _ = os.RemoveAll(env.dataDir) + } +} + +// -- S3 client helpers --------------------------------------------------------- + +func (env *testEnv) newHostS3Client(t *testing.T, ctx context.Context) *s3.Client { + t.Helper() + return env.newHostS3ClientWithCreds(t, ctx, env.accessKey, env.secretKey, "") +} + +func (env *testEnv) newHostS3ClientWithCreds(t *testing.T, ctx context.Context, ak, sk, token string) *s3.Client { + t.Helper() + endpoint := fmt.Sprintf("http://127.0.0.1:%d", env.s3Port) + cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion("us-east-1"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(ak, sk, token)), + ) + if err != nil { + t.Fatalf("load aws config: %v", err) + } + return s3.NewFromConfig(cfg, func(o *s3.Options) { + o.BaseEndpoint = aws.String(endpoint) + o.UsePathStyle = true + }) +} + +// stsEnabledIAMConfig returns an iam.json that defines an admin user, the +// UnityCatalogVendedRole role with a permissive trust policy, and a FullAccess +// policy attached to it. This is the SeaweedFS-side counterpart of the +// `aws.masterRoleArn` configuration on the Unity Catalog server. +func stsEnabledIAMConfig(accessKey, secretKey string) string { + return fmt.Sprintf(`{ + "identities": [ + { + "name": "admin", + "credentials": [{"accessKey": %q, "secretKey": %q}], + "actions": ["Admin", "Read", "List", "Tagging", "Write"] + } + ], + "sts": { + "tokenDuration": "12h", + "maxSessionLength": "24h", + "issuer": "seaweedfs-sts", + "signingKey": "dGVzdC1zaWduaW5nLWtleS1mb3Itc3RzLWludGVncmF0aW9uLXRlc3Rz" + }, + "roles": [ + { + "roleName": %q, + "roleArn": %q, + "trustPolicy": { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": "*", + "Action": "sts:AssumeRole" + } + ] + }, + "attachedPolicies": ["FullAccess"] + } + ], + "policies": [ + { + "name": "FullAccess", + "document": { + "Version": "2012-10-17", + "Statement": [ + {"Effect": "Allow", "Action": "*", "Resource": "*"} + ] + } + } + ] +}`, accessKey, secretKey, ucVendedRoleName, ucVendedRoleArn) +} diff --git a/test/s3tables/unity_catalog/unity_catalog_test.go b/test/s3tables/unity_catalog/unity_catalog_test.go new file mode 100644 index 000000000..35b48be09 --- /dev/null +++ b/test/s3tables/unity_catalog/unity_catalog_test.go @@ -0,0 +1,183 @@ +// Package unity_catalog provides integration tests that run Unity Catalog +// OSS in Docker against a SeaweedFS S3 backend. The base test mirrors the +// configuration used by the upstream UC playground at +// https://github.com/data-engineering-helpers/mds-in-a-box/tree/main/unitycatalog-playground: +// static aws.accessKey/aws.secretKey, no master role, EXTERNAL Delta tables. +package unity_catalog + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + + "github.com/seaweedfs/seaweedfs/test/testutil" +) + +// TestUnityCatalogDeltaIntegration brings up SeaweedFS, runs Unity Catalog OSS +// in Docker against it, and exercises catalog/schema/table CRUD plus +// temporary-table-credentials and storage I/O via vended credentials. The UC +// server is configured with static keys (aws.masterRoleArn empty), matching +// the playground's working configuration when targeting SeaweedFS. +func TestUnityCatalogDeltaIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in -short mode") + } + if !testutil.HasDocker() { + t.Skip("docker not available") + } + + env := newTestEnv(t) + defer env.cleanup(t) + + t.Log(">>> starting SeaweedFS for Unity Catalog test...") + env.startSeaweedFS(t, "") + t.Log(">>> SeaweedFS ready") + + ctx, cancel := context.WithTimeout(context.Background(), 8*time.Minute) + defer cancel() + + s3c := env.newHostS3Client(t, ctx) + if _, err := s3c.CreateBucket(ctx, &s3.CreateBucketInput{Bucket: aws.String(ucWarehouse)}); err != nil { + t.Fatalf("create warehouse bucket: %v", err) + } + + t.Log(">>> starting Unity Catalog server (static keys)...") + env.startUnityCatalog(t, ctx, ucServerOpts{}) + t.Log(">>> Unity Catalog ready") + + uc := newUCClient(fmt.Sprintf("http://127.0.0.1:%d", env.ucHostPort)) + + suffix := time.Now().UnixNano() + catalogName := fmt.Sprintf("seaweed_uc_%d", suffix) + schemaName := fmt.Sprintf("ns_%d", suffix) + tableName := fmt.Sprintf("events_%d", suffix) + tableLocation := fmt.Sprintf("s3://%s/%s/%s/%s", ucWarehouse, ucWarehouseKey, schemaName, tableName) + + defer func() { + _ = uc.deleteTable(context.Background(), catalogName+"."+schemaName+"."+tableName) + _ = uc.deleteSchema(context.Background(), catalogName+"."+schemaName) + _ = uc.deleteCatalog(context.Background(), catalogName) + }() + + t.Run("CreateCatalog", func(t *testing.T) { + got, err := uc.createCatalog(ctx, ucCreateCatalog{ + Name: catalogName, + Comment: "seaweedfs integration test", + StorageRoot: fmt.Sprintf("s3://%s/%s", ucWarehouse, ucWarehouseKey), + }) + if err != nil { + t.Fatalf("create catalog: %v", err) + } + if got.Name != catalogName { + t.Fatalf("catalog name = %q, want %q", got.Name, catalogName) + } + }) + + t.Run("CreateSchema", func(t *testing.T) { + got, err := uc.createSchema(ctx, ucCreateSchema{ + Name: schemaName, + CatalogName: catalogName, + }) + if err != nil { + t.Fatalf("create schema: %v", err) + } + if got.Name != schemaName || got.CatalogName != catalogName { + t.Fatalf("schema = %+v, want name=%q catalog=%q", got, schemaName, catalogName) + } + }) + + var createdTable ucTableInfo + t.Run("CreateExternalDeltaTable", func(t *testing.T) { + columns := []ucColumn{ + {Name: "id", TypeText: "long", TypeName: "LONG", TypeJSON: `{"name":"id","type":"long","nullable":true,"metadata":{}}`, Position: 0, Nullable: true}, + {Name: "value", TypeText: "string", TypeName: "STRING", TypeJSON: `{"name":"value","type":"string","nullable":true,"metadata":{}}`, Position: 1, Nullable: true}, + } + got, err := uc.createTable(ctx, ucCreateTable{ + Name: tableName, + CatalogName: catalogName, + SchemaName: schemaName, + TableType: "EXTERNAL", + DataSourceFormat: "DELTA", + Columns: columns, + StorageLocation: tableLocation, + }) + if err != nil { + t.Fatalf("create table: %v", err) + } + if got.TableType != "EXTERNAL" || got.DataSourceFormat != "DELTA" { + t.Fatalf("table types = %s/%s, want EXTERNAL/DELTA", got.TableType, got.DataSourceFormat) + } + if got.StorageLocation == "" { + t.Fatalf("table storage_location is empty in response: %+v", got) + } + createdTable = *got + }) + + t.Run("GetTable", func(t *testing.T) { + full := catalogName + "." + schemaName + "." + tableName + got, err := uc.getTable(ctx, full) + if err != nil { + t.Fatalf("get table: %v", err) + } + if got.Name != tableName { + t.Fatalf("get table name = %q, want %q", got.Name, tableName) + } + }) + + t.Run("ListTables", func(t *testing.T) { + tables, err := uc.listTables(ctx, catalogName, schemaName) + if err != nil { + t.Fatalf("list tables: %v", err) + } + found := false + for _, tbl := range tables { + if tbl.Name == tableName { + found = true + break + } + } + if !found { + t.Fatalf("created table %q not found in list", tableName) + } + }) + + t.Run("TemporaryTableCredentialsRejected", func(t *testing.T) { + // With aws.masterRoleArn empty AND no s3.sessionToken.0 set, UC OSS + // always tries to AssumeRole via its internal StsClient (see + // AwsCredentialVendor.createPerBucketCredentialGenerator). Against a + // non-AWS endpoint, that call doesn't reach a real STS, so UC returns + // "S3 bucket configuration not found." or an STS-side error. This is + // the gap users hit at : + // "with simple S3 access and secret keys, Unity Catalog does not seem + // to work." + // + // The assertion is therefore inverted: we expect a non-nil error from + // /temporary-table-credentials with this configuration. A future + // variant can pin s3.sessionToken.0 (UC's StaticAwsCredentialGenerator + // path) once SeaweedFS' SigV4 path tolerates the vended session token. + if createdTable.TableID == "" { + t.Fatalf("created table has empty table_id; cannot request temporary credentials") + } + _, err := uc.generateTemporaryTableCredentials(ctx, createdTable.TableID, "READ_WRITE") + if err == nil { + t.Fatalf("expected /temporary-table-credentials to fail with the static-key playground configuration; it succeeded unexpectedly") + } + t.Logf("expected failure (UC static-key path requires AWS STS): %v", err) + }) + + t.Run("DeleteTableSchemaCatalog", func(t *testing.T) { + if err := uc.deleteTable(ctx, catalogName+"."+schemaName+"."+tableName); err != nil { + t.Fatalf("delete table: %v", err) + } + if err := uc.deleteSchema(ctx, catalogName+"."+schemaName); err != nil { + t.Fatalf("delete schema: %v", err) + } + if err := uc.deleteCatalog(ctx, catalogName); err != nil { + t.Fatalf("delete catalog: %v", err) + } + }) +}