test(s3tables): Unity Catalog OSS integration tests against SeaweedFS (#9308)

* test(s3tables): add Unity Catalog OSS integration test against SeaweedFS

Mirrors the configuration used by the upstream playground at
data-engineering-helpers/mds-in-a-box/unitycatalog-playground.

Three test variants under test/s3tables/unity_catalog:

- TestUnityCatalogDeltaIntegration: aws.masterRoleArn empty / static
  keys; catalog/schema/EXTERNAL Delta CRUD + temporary-table-credentials
  S3 round-trip (the playground's working configuration).
- TestUnityCatalogMasterRoleIntegration: aws.masterRoleArn set to a
  SeaweedFS-side role with a permissive trust policy; UC's StsClient
  is pinned at SeaweedFS via AWS_ENDPOINT_URL_STS, and the test asserts
  the vended creds carry a session_token and a non-static access key,
  proving the role-vended path the playground notes as not-yet-working
  actually does work today.
- TestUnityCatalogDeltaRsRoundTrip: writes/reads a real Delta table at
  the registered storage_location using delta-rs in a slim Python
  container, with temporary credentials fetched from UC.

All three self-skip without Docker or a weed binary, matching the
sibling lakekeeper / polaris tests.

* test(s3tables): tighten Unity Catalog tests against actual UC OSS behavior

After running the suite locally, ground the assertions in what the
upstream UC OSS Docker image actually does against SeaweedFS today.

- Static-key playground configuration
  (TestUnityCatalogDeltaIntegration): catalog/schema/EXTERNAL Delta CRUD
  pass against the SeaweedFS-backed warehouse. The temporary-table-
  credentials subtest is renamed and inverted to assert the failure mode
  the playground reports -- UC's AwsCredentialVendor falls through to an
  internal StsClient.assumeRole when masterRoleArn and sessionToken are
  both empty, which has no real STS to talk to. Bucket path is also
  fixed to match UC's getStorageBase() lookup (s3://lakehouse vs the
  playground's s3://lakehouse/warehouse, which the upstream code never
  matches).

- Master-role variant (TestUnityCatalogMasterRoleIntegration): split
  into two passing slices. Slice 1 proves SeaweedFS' STS endpoint
  vending UnityCatalogVendedRole works via the Go AWS SDK and the vended
  creds round-trip on S3. Slice 2 boots UC with aws.masterRoleArn set
  and verifies catalog/schema/Delta CRUD. The third hop -- UC's Java
  StsClient actually reaching SeaweedFS' STS handler during
  /temporary-table-credentials -- is logged but not asserted, since the
  AWS Java SDK's STS request currently lands on a SeaweedFS S3 path
  rather than the STS handler.

- Delta-RS round-trip (TestUnityCatalogDeltaRsRoundTrip): gated on
  UC_DELTA_RS_RUN=1 since it depends on the master-role STS handoff
  above. The Dockerfile / writer script stay in tree so the test runs
  end-to-end the moment that hop is fixed.

README rewritten to be explicit about what each test validates today
and what is still pending.

Result: `go test -run TestUnityCatalog ./test/s3tables/unity_catalog/...`
passes cleanly with weed + Docker available, and self-skips otherwise.

* test(s3tables): exercise unity catalog integrations

* ci: run Unity Catalog integration tests on PRs

Adds a unity-catalog-integration-tests job to s3-tables-tests.yml,
modeled on the existing lakekeeper / dremio jobs. Pre-pulls the UC
image and python:3.11-slim (used by the delta-rs writer container) and
runs `go test ./test/s3tables/unity_catalog`.

Format-check and go-vet jobs already recurse into ./test/s3tables/...
so the new package is covered there too.

* test/ci: address PR review

Tighten the UC readiness probe to require 200, not <500, so a
401/403/404 during startup surfaces immediately instead of being
treated as ready (CodeRabbit).

Pin the UC image to v0.4.0 in both the workflow and the test default,
matching the pinned-tag convention the rest of s3-tables-tests.yml
uses (CodeRabbit). Use UC_IMAGE=unitycatalog/unitycatalog:main to
re-test against current upstream.

* docs: separate UC static-key vs master-role failure modes

The README mixed the two together. Static-key empty-sessionToken
short-circuits with "S3 bucket configuration not found." before UC
even fires an STS call; the AccessDenied I described is what happens
in the master-role variant where UC's Java StsClient actually reaches
SeaweedFS. Cross-link the playground PR that fixes the static-key
vending side.

Also drop the "what most playground users actually run" hand-wave
under MANAGED tables.

* docs: trim README

Drop the playground cross-reference and the "two layers fail
independently" framing.

* docs: pin down what's actually pending

Investigated the master-role STS handoff with a sniffer in front of
SeaweedFS' STS port. UC's StsClient is constructed without an
endpointOverride and never reads aws.endpoint or AWS_ENDPOINT_URL_STS;
verified by pointing AWS_ENDPOINT_URL_STS at port 1 and seeing the
same real-AWS InvalidClientTokenId 403 with zero traffic to SeaweedFS.

The fix is upstream in UC. Updated the README and the master-role
test's t.Logf to say so precisely, and dropped the stale "Spark client"
bullet (delta-rs covers that path).

* test(s3tables): use BaseEndpoint instead of deprecated resolver

EndpointResolverWithOptions is deprecated in aws-sdk-go-v2; the
supported way to override a service endpoint is via the per-service
Options.BaseEndpoint. Switch the assume-role helper to that pattern so
the test stops compiling against deprecated API and the resolver
boilerplate disappears.

Addresses gemini review on PR #9308.

* test(s3tables): drop unused splitS3URI helper

Helper had no callers; gemini caught it on PR #9308. Easy to bring
back from git history if needed.

* test(s3tables): extract last token of docker run output as container ID

docker run -d may prefix the container ID with image-pull progress
when the image isn't cached locally. strings.TrimSpace on the whole
output then gave a multi-line string, not the ID. Take the last
whitespace-separated token so the ID survives a fresh CI runner.

Addresses gemini review on PR #9308.

* test(s3tables): cap Unity Catalog response body reads at 10 MiB

io.ReadAll without a limit could OOM the test runner if the UC
container hands back an unexpectedly large body. 10 MiB is well
above any well-formed catalog response and turns a misbehaving
server into a test failure instead of a runner crash.

Addresses gemini review on PR #9308.

* docs: link UC fix PR and call out UC's mocked-Sts test pattern

UC's own credential-vending tests substitute StsClient with an in-process
EchoAwsStsClient (BaseCRUDTestWithMockCredentials) or Mockito.mockStatic
(CloudCredentialVendorTest), so the wire path between UC's Java SDK and
a real STS server is untested -- which is why the missing endpointOverride
slipped through upstream. Linked the upstream fix at
unitycatalog/unitycatalog#1532.
This commit is contained in:
Chris Lu
2026-05-04 21:14:22 -07:00
committed by GitHub
parent 6d95a5592a
commit a769c938ec
9 changed files with 1395 additions and 0 deletions

View File

@@ -662,6 +662,77 @@ jobs:
path: test/s3tables/lakekeeper/test-output.log
retention-days: 3
unity-catalog-integration-tests:
name: Unity Catalog Integration Tests
runs-on: ubuntu-22.04
timeout-minutes: 30
steps:
- name: Check out code
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version-file: 'go.mod'
id: go
- name: Set up Docker
uses: docker/setup-buildx-action@v4
- name: Pre-pull Unity Catalog image
run: docker pull unitycatalog/unitycatalog:v0.4.0
- name: Pre-pull Python image for delta-rs writer
run: docker pull python:3.11-slim
- name: Run go mod tidy
run: go mod tidy
- name: Install SeaweedFS
run: |
go install -buildvcs=false ./weed
- name: Run Unity Catalog Integration Tests
timeout-minutes: 25
working-directory: test/s3tables/unity_catalog
run: |
set -x
set -o pipefail
echo "=== System Information ==="
uname -a
free -h
df -h
docker info
echo "=== Starting Unity Catalog Tests ==="
go test -v -timeout 20m . 2>&1 | tee test-output.log || {
echo "Unity Catalog integration tests failed"
exit 1
}
- name: Show test output on failure
if: failure()
working-directory: test/s3tables/unity_catalog
run: |
echo "=== Test Output ==="
if [ -f test-output.log ]; then
tail -200 test-output.log
fi
echo "=== Process information ==="
ps aux | grep -E "(weed|test|docker|unitycatalog)" || true
echo "=== Unity Catalog containers ==="
docker ps -a --filter "name=seaweed-unity-catalog" || true
- name: Upload test logs on failure
if: failure()
uses: actions/upload-artifact@v7
with:
name: unity-catalog-integration-test-logs
path: test/s3tables/unity_catalog/test-output.log
retention-days: 3
s3-tables-build-verification:
name: S3 Tables Build Verification
runs-on: ubuntu-22.04

View File

@@ -0,0 +1,11 @@
FROM python:3.11-slim
RUN pip install --no-cache-dir \
"deltalake==0.21.0" \
"pyarrow==17.0.0" \
"requests==2.32.3"
WORKDIR /app
COPY delta_rs_writer.py /app/delta_rs_writer.py
ENTRYPOINT ["python3", "/app/delta_rs_writer.py"]

View File

@@ -0,0 +1,104 @@
# Unity Catalog OSS integration tests
These tests run Unity Catalog OSS in Docker against an embedded SeaweedFS
S3 endpoint. The `server.properties` mirrors the upstream playground at
[`mds-in-a-box/unitycatalog-playground`](https://github.com/data-engineering-helpers/mds-in-a-box/tree/main/unitycatalog-playground).
| Test | Variant | Status |
| --- | --- | --- |
| `TestUnityCatalogDeltaIntegration` | static keys, `aws.masterRoleArn=` empty | passes; covers catalog/schema/EXTERNAL Delta CRUD against SeaweedFS-backed warehouse and asserts that UC's `/temporary-table-credentials` *cannot* vend usable creds with this configuration -- exactly the gap the playground reports. |
| `TestUnityCatalogMasterRoleIntegration` | `aws.masterRoleArn=arn:aws:iam::000000000000:role/UnityCatalogVendedRole` | passes; proves SeaweedFS' STS endpoint accepts `sts:AssumeRole` for the role UC would use (Go SDK round-trip), and that UC starts and accepts CRUD when wired with the master-role config. UC's own StsClient still talks to real AWS regardless of `aws.endpoint` / `AWS_ENDPOINT_URL_STS` (UC bug, see below); that hop is logged via `t.Logf` rather than asserted. |
| `TestUnityCatalogDeltaRsRoundTrip` | static keys + `delta-rs` Python client | passes; resolves table metadata through UC and writes/reads a real Delta table at the registered `storage_location` using `python:3.11-slim + deltalake` with the SeaweedFS test credentials. |
## Prerequisites
- Docker available locally (the tests call `docker run` / `docker build` directly).
- A `weed` binary at the repo root (`weed/weed`) or on `$PATH`.
## Run
```bash
go test -timeout 15m \
-run 'TestUnityCatalog' \
./test/s3tables/unity_catalog/...
```
Pin a specific Unity Catalog image (defaults to
`unitycatalog/unitycatalog:v0.4.0`):
```bash
UC_IMAGE=unitycatalog/unitycatalog:main \
go test -timeout 15m -run TestUnityCatalogDeltaIntegration \
./test/s3tables/unity_catalog/...
```
The tests self-skip when Docker is unavailable or no `weed` binary is on
the path; running under `-short` also skips them.
## Why the static-key path can't vend usable creds
UC OSS' `AwsCredentialVendor.createPerBucketCredentialGenerator`:
```java
if (config.getSessionToken() != null && !config.getSessionToken().isEmpty()) {
return new AwsCredentialGenerator.StaticAwsCredentialGenerator(config);
}
return createStsCredentialGenerator(config);
```
With `aws.masterRoleArn=` empty and `s3.sessionToken.0=` empty (this
test's configuration), `/temporary-table-credentials` short-circuits with
`"S3 bucket configuration not found."` before UC fires any STS call.
Setting a stub `s3.sessionToken.0` switches UC to
`StaticAwsCredentialGenerator` and the endpoint returns the static keys,
but the response carries that stub session token -- SeaweedFS won't
recognize it on the next S3 call, so the vended creds aren't usable for
table I/O. Clients have to fall back to the static keys directly.
With `aws.masterRoleArn` set, UC's `AwsCredentialGenerator.StsAwsCredentialGenerator`
builds the StsClient with only `.region(...)` and `.credentialsProvider(...)` --
no `.endpointOverride()`. The SDK's generic env-var resolution doesn't kick in
for that builder shape, so even with `AWS_ENDPOINT_URL_STS=...` (or the
matching `aws.endpointUrlSts` Java property, or the catch-all
`AWS_ENDPOINT_URL=...`) the StsClient still targets real AWS and gets back
`InvalidClientTokenId`. Verified by pointing the env var at port 1: UC reports
the same AWS-issued 403 that it reports against SeaweedFS, and a sniffer in
front of SeaweedFS' STS port records zero traffic. SeaweedFS' STS handler
itself works -- the Go SDK round-trip in `assumeRoleViaSeaweedFS` proves that
against the same SeaweedFS instance.
UC's own AWS credential-vending tests don't catch this because they mock
`StsClient` away entirely -- `BaseCRUDTestWithMockCredentials` injects a
custom `stsClientBuilderSupplier` returning an `EchoAwsStsClient` that
synthesizes credentials in-process, and `CloudCredentialVendorTest` uses
`Mockito.mockStatic(StsClient.class)`. No test ever exercises the wire
path between UC's Java SDK and a real STS endpoint, so the missing
`endpointOverride` slipped through.
Fix is upstream in
[unitycatalog/unitycatalog#1532](https://github.com/unitycatalog/unitycatalog/pull/1532),
which adds an `aws.endpoint` property and applies it to both the StsClient
and the S3Client builders. Until that lands, the master-role test logs
the failure but does not assert it.
## What the tests actually validate today
- Unity Catalog accepts a SeaweedFS-backed `server.properties` and starts.
- Catalog / schema / EXTERNAL Delta table CRUD all work against the
SeaweedFS warehouse via the UC REST API.
- SeaweedFS' STS endpoint correctly issues `sts:AssumeRole` credentials
for the `UnityCatalogVendedRole` and those credentials are accepted on
S3 round-trips (Go AWS SDK).
- Delta-RS resolves a UC table's `storage_location` and can write/read Delta
data through the SeaweedFS S3 endpoint with the test credentials.
## What is still pending
Nothing on the SeaweedFS side. The remaining gap (UC's StsClient ignoring
endpoint config) needs a UC OSS patch upstream.
## MANAGED tables
Not exercised. UC OSS gates them behind `server.managed-table.enabled=true`
and a two-step staging flow (`POST /staging-tables` then `POST /tables`);
EXTERNAL Delta is the simpler path and what these tests cover.

View File

@@ -0,0 +1,184 @@
package unity_catalog
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
type ucClient struct {
base string
http *http.Client
}
func newUCClient(base string) *ucClient {
return &ucClient{base: base + ucAPIBase, http: &http.Client{Timeout: 30 * time.Second}}
}
type ucCreateCatalog struct {
Name string `json:"name"`
Comment string `json:"comment,omitempty"`
StorageRoot string `json:"storage_root,omitempty"`
}
type ucCatalogInfo struct {
Name string `json:"name"`
StorageRoot string `json:"storage_root"`
ID string `json:"id"`
}
type ucCreateSchema struct {
Name string `json:"name"`
CatalogName string `json:"catalog_name"`
}
type ucSchemaInfo struct {
Name string `json:"name"`
CatalogName string `json:"catalog_name"`
FullName string `json:"full_name"`
SchemaID string `json:"schema_id"`
}
type ucColumn struct {
Name string `json:"name"`
TypeText string `json:"type_text"`
TypeJSON string `json:"type_json,omitempty"`
TypeName string `json:"type_name"`
Position int `json:"position"`
Nullable bool `json:"nullable"`
}
type ucCreateTable struct {
Name string `json:"name"`
CatalogName string `json:"catalog_name"`
SchemaName string `json:"schema_name"`
TableType string `json:"table_type"`
DataSourceFormat string `json:"data_source_format"`
Columns []ucColumn `json:"columns"`
StorageLocation string `json:"storage_location,omitempty"`
}
type ucTableInfo struct {
Name string `json:"name"`
CatalogName string `json:"catalog_name"`
SchemaName string `json:"schema_name"`
TableType string `json:"table_type"`
DataSourceFormat string `json:"data_source_format"`
Columns []ucColumn `json:"columns"`
StorageLocation string `json:"storage_location"`
TableID string `json:"table_id"`
}
type ucListTablesResp struct {
Tables []ucTableInfo `json:"tables"`
}
type ucAwsCreds struct {
AccessKeyID string `json:"access_key_id"`
SecretAccessKey string `json:"secret_access_key"`
SessionToken string `json:"session_token"`
}
type ucTempCreds struct {
AwsTempCredentials *ucAwsCreds `json:"aws_temp_credentials"`
ExpirationTime int64 `json:"expiration_time"`
}
func (c *ucClient) createCatalog(ctx context.Context, in ucCreateCatalog) (*ucCatalogInfo, error) {
var out ucCatalogInfo
return &out, c.do(ctx, http.MethodPost, "/catalogs", in, &out)
}
func (c *ucClient) deleteCatalog(ctx context.Context, name string) error {
q := url.Values{"force": []string{"true"}}
return c.do(ctx, http.MethodDelete, "/catalogs/"+url.PathEscape(name)+"?"+q.Encode(), nil, nil)
}
func (c *ucClient) createSchema(ctx context.Context, in ucCreateSchema) (*ucSchemaInfo, error) {
var out ucSchemaInfo
return &out, c.do(ctx, http.MethodPost, "/schemas", in, &out)
}
func (c *ucClient) deleteSchema(ctx context.Context, fullName string) error {
q := url.Values{"force": []string{"true"}}
return c.do(ctx, http.MethodDelete, "/schemas/"+url.PathEscape(fullName)+"?"+q.Encode(), nil, nil)
}
func (c *ucClient) createTable(ctx context.Context, in ucCreateTable) (*ucTableInfo, error) {
var out ucTableInfo
return &out, c.do(ctx, http.MethodPost, "/tables", in, &out)
}
func (c *ucClient) getTable(ctx context.Context, fullName string) (*ucTableInfo, error) {
var out ucTableInfo
return &out, c.do(ctx, http.MethodGet, "/tables/"+url.PathEscape(fullName), nil, &out)
}
func (c *ucClient) listTables(ctx context.Context, catalog, schema string) ([]ucTableInfo, error) {
var out ucListTablesResp
q := url.Values{
"catalog_name": []string{catalog},
"schema_name": []string{schema},
}
path := "/tables?" + q.Encode()
if err := c.do(ctx, http.MethodGet, path, nil, &out); err != nil {
return nil, err
}
return out.Tables, nil
}
func (c *ucClient) deleteTable(ctx context.Context, fullName string) error {
return c.do(ctx, http.MethodDelete, "/tables/"+url.PathEscape(fullName), nil, nil)
}
func (c *ucClient) generateTemporaryTableCredentials(ctx context.Context, tableID, op string) (*ucTempCreds, error) {
body := map[string]string{"table_id": tableID, "operation": op}
var out ucTempCreds
return &out, c.do(ctx, http.MethodPost, "/temporary-table-credentials", body, &out)
}
func (c *ucClient) do(ctx context.Context, method, path string, in any, out any) error {
var body io.Reader
if in != nil {
buf, err := json.Marshal(in)
if err != nil {
return fmt.Errorf("marshal %s %s: %w", method, path, err)
}
body = bytes.NewReader(buf)
}
req, err := http.NewRequestWithContext(ctx, method, c.base+path, body)
if err != nil {
return err
}
if in != nil {
req.Header.Set("Content-Type", "application/json")
}
req.Header.Set("Accept", "application/json")
resp, err := c.http.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// 10 MiB cap is well above any reasonable Unity Catalog response and
// keeps a runaway server from OOMing the test runner.
respBytes, err := io.ReadAll(io.LimitReader(resp.Body, 10*1024*1024))
if err != nil {
return fmt.Errorf("read %s %s: %w", method, path, err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("%s %s: status %d: %s", method, path, resp.StatusCode, strings.TrimSpace(string(respBytes)))
}
if out == nil || len(respBytes) == 0 {
return nil
}
if err := json.Unmarshal(respBytes, out); err != nil {
return fmt.Errorf("decode %s %s: %w (body=%s)", method, path, err, string(respBytes))
}
return nil
}

View File

@@ -0,0 +1,125 @@
package unity_catalog
import (
"context"
"fmt"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/seaweedfs/seaweedfs/test/testutil"
)
// TestUnityCatalogDeltaRsRoundTrip writes and reads a real Delta table at
// the registered storage_location using the delta-rs Python library inside
// a Docker container. The script resolves table metadata from UC and uses the
// test SeaweedFS credentials as delta-rs storage_options.
func TestUnityCatalogDeltaRsRoundTrip(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in -short mode")
}
if !testutil.HasDocker() {
t.Skip("docker not available")
}
env := newTestEnv(t)
defer env.cleanup(t)
t.Log(">>> starting SeaweedFS for delta-rs test...")
env.startSeaweedFS(t, "")
t.Log(">>> SeaweedFS ready")
ctx, cancel := context.WithTimeout(context.Background(), 12*time.Minute)
defer cancel()
s3c := env.newHostS3Client(t, ctx)
if _, err := s3c.CreateBucket(ctx, &s3.CreateBucketInput{Bucket: aws.String(ucWarehouse)}); err != nil {
t.Fatalf("create warehouse bucket: %v", err)
}
t.Log(">>> starting Unity Catalog server...")
env.startUnityCatalog(t, ctx, ucServerOpts{})
t.Log(">>> Unity Catalog ready")
uc := newUCClient(fmt.Sprintf("http://127.0.0.1:%d", env.ucHostPort))
suffix := time.Now().UnixNano()
catalogName := fmt.Sprintf("seaweed_uc_delta_rs_%d", suffix)
schemaName := fmt.Sprintf("ns_%d", suffix)
tableName := fmt.Sprintf("delta_rs_%d", suffix)
tableLocation := fmt.Sprintf("s3://%s/%s/%s/%s", ucWarehouse, ucWarehouseKey, schemaName, tableName)
defer func() {
_ = uc.deleteTable(context.Background(), catalogName+"."+schemaName+"."+tableName)
_ = uc.deleteSchema(context.Background(), catalogName+"."+schemaName)
_ = uc.deleteCatalog(context.Background(), catalogName)
}()
if _, err := uc.createCatalog(ctx, ucCreateCatalog{
Name: catalogName,
StorageRoot: fmt.Sprintf("s3://%s/%s", ucWarehouse, ucWarehouseKey),
}); err != nil {
t.Fatalf("create catalog: %v", err)
}
if _, err := uc.createSchema(ctx, ucCreateSchema{Name: schemaName, CatalogName: catalogName}); err != nil {
t.Fatalf("create schema: %v", err)
}
created, err := uc.createTable(ctx, ucCreateTable{
Name: tableName,
CatalogName: catalogName,
SchemaName: schemaName,
TableType: "EXTERNAL",
DataSourceFormat: "DELTA",
Columns: []ucColumn{
{Name: "id", TypeText: "long", TypeName: "LONG", TypeJSON: `{"name":"id","type":"long","nullable":true,"metadata":{}}`, Position: 0, Nullable: true},
{Name: "value", TypeText: "string", TypeName: "STRING", TypeJSON: `{"name":"value","type":"string","nullable":true,"metadata":{}}`, Position: 1, Nullable: true},
},
StorageLocation: tableLocation,
})
if err != nil {
t.Fatalf("create table: %v", err)
}
testDir, err := filepath.Abs(".")
if err != nil {
t.Fatalf("abs: %v", err)
}
imageTag := fmt.Sprintf("seaweed-uc-delta-rs:%d", time.Now().UnixNano())
build := exec.CommandContext(ctx, "docker", "build", "-t", imageTag, "-f", "Dockerfile.delta-rs", ".")
build.Dir = testDir
if out, err := build.CombinedOutput(); err != nil {
t.Fatalf("docker build delta-rs image: %v\n%s", err, out)
}
defer func() { _ = exec.Command("docker", "rmi", "-f", imageTag).Run() }()
ucURLForContainer := fmt.Sprintf("http://host.docker.internal:%d", env.ucHostPort)
s3EndpointForContainer := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port)
cmd := exec.CommandContext(ctx, "docker", "run", "--rm",
"--add-host", "host.docker.internal:host-gateway",
"-e", "UC_URL="+ucURLForContainer,
"-e", "S3_ENDPOINT="+s3EndpointForContainer,
"-e", "UC_CATALOG="+catalogName,
"-e", "UC_SCHEMA="+schemaName,
"-e", "UC_TABLE="+tableName,
"-e", "UC_TABLE_ID="+created.TableID,
"-e", "AWS_ACCESS_KEY_ID="+env.accessKey,
"-e", "AWS_SECRET_ACCESS_KEY="+env.secretKey,
imageTag,
)
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("delta-rs container failed: %v\noutput:\n%s", err, out)
}
got := string(out)
if !strings.Contains(got, "DELTA_RS_OK rows=3") {
t.Fatalf("expected DELTA_RS_OK rows=3 in output; got:\n%s", got)
}
t.Logf("delta-rs output: %s", strings.TrimSpace(got))
}

View File

@@ -0,0 +1,137 @@
"""Round-trip a Delta table through Unity Catalog using delta-rs.
Reads connection settings from environment variables:
UC_URL Unity Catalog REST base, e.g. http://host.docker.internal:8080
S3_ENDPOINT SeaweedFS S3 endpoint reachable from the container
UC_CATALOG Catalog name to operate against
UC_SCHEMA Schema name
UC_TABLE Table name
UC_TABLE_ID Optional: pre-fetched table_id; falls back to GET /tables/{full_name}
AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY
Optional: direct S3 credentials. If omitted, the script
asks UC for temporary table credentials.
Steps:
1. GET the table info from UC.
2. Resolve S3 credentials from env or UC temporary-table-credentials.
3. write_deltalake() a small pyarrow Table to storage_location.
4. Read it back with DeltaTable() and assert row count.
Prints `DELTA_RS_OK rows=<n>` on success and exits 0; any failure exits 1.
"""
from __future__ import annotations
import os
import sys
import urllib.parse
import pyarrow as pa
import requests
from deltalake import DeltaTable, write_deltalake
def must_env(name: str) -> str:
value = os.environ.get(name)
if not value:
print(f"ERROR: required env var {name} is not set", file=sys.stderr)
sys.exit(2)
return value
def main() -> int:
uc_url = must_env("UC_URL").rstrip("/")
s3_endpoint = must_env("S3_ENDPOINT")
catalog = must_env("UC_CATALOG")
schema = must_env("UC_SCHEMA")
table = must_env("UC_TABLE")
table_id = os.environ.get("UC_TABLE_ID", "").strip()
full_name = f"{catalog}.{schema}.{table}"
api = f"{uc_url}/api/2.1/unity-catalog"
encoded_full_name = urllib.parse.quote(full_name, safe="")
r = requests.get(f"{api}/tables/{encoded_full_name}", timeout=10)
r.raise_for_status()
info = r.json()
if not table_id:
table_id = info.get("table_id") or ""
storage_location = info.get("storage_location") or ""
if not table_id:
print("ERROR: table_id is empty in UC response", file=sys.stderr)
return 1
if not storage_location:
print("ERROR: storage_location is empty in UC response", file=sys.stderr)
return 1
access_key = os.environ.get("AWS_ACCESS_KEY_ID", "").strip()
secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY", "").strip()
session_token = os.environ.get("AWS_SESSION_TOKEN", "").strip()
if bool(access_key) != bool(secret_key):
print(
"ERROR: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY must be set together",
file=sys.stderr,
)
return 1
if not access_key:
creds_resp = requests.post(
f"{api}/temporary-table-credentials",
json={"table_id": table_id, "operation": "READ_WRITE"},
timeout=15,
)
creds_resp.raise_for_status()
creds = creds_resp.json().get("aws_temp_credentials") or {}
access_key = creds.get("access_key_id") or ""
secret_key = creds.get("secret_access_key") or ""
session_token = creds.get("session_token") or ""
if not access_key or not secret_key:
print(
"ERROR: missing aws_temp_credentials; "
f"response status {creds_resp.status_code}",
file=sys.stderr,
)
return 1
storage_options = {
"AWS_ACCESS_KEY_ID": access_key,
"AWS_SECRET_ACCESS_KEY": secret_key,
"AWS_REGION": "us-east-1",
"AWS_ENDPOINT_URL": s3_endpoint,
"AWS_ALLOW_HTTP": "true",
# delta-rs requires this for non-S3 backends that lack atomic rename.
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}
if session_token:
storage_options["AWS_SESSION_TOKEN"] = session_token
arrow_table = pa.table(
{
"id": pa.array([1, 2, 3], type=pa.int64()),
"value": pa.array(["alpha", "beta", "gamma"], type=pa.string()),
}
)
write_deltalake(
storage_location,
arrow_table,
storage_options=storage_options,
mode="overwrite",
)
dt = DeltaTable(storage_location, storage_options=storage_options)
read_back = dt.to_pyarrow_table()
rows = read_back.num_rows
print(f"DELTA_RS_OK rows={rows} version={dt.version()}")
return 0 if rows == arrow_table.num_rows else 1
if __name__ == "__main__":
exit_code = main()
sys.stdout.flush()
sys.stderr.flush()
# deltalake/pyarrow can abort during native teardown after a successful
# round-trip in the short-lived test container. Preserve main's result.
os._exit(exit_code)

View File

@@ -0,0 +1,201 @@
package unity_catalog
import (
"context"
"fmt"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/seaweedfs/seaweedfs/test/testutil"
)
// TestUnityCatalogMasterRoleIntegration covers the master-role configuration
// the upstream playground notes as not-yet-working
// (https://github.com/data-engineering-helpers/mds-in-a-box/blob/main/unitycatalog-playground/etc/conf/server.properties#L45).
//
// It verifies two things end-to-end:
//
// 1. SeaweedFS' STS endpoint accepts sts:AssumeRole for the
// UnityCatalogVendedRole, returning real STS-vended credentials. This
// mirrors what the lakekeeper / polaris tests already exercise via the Go
// AWS SDK and is the SeaweedFS-side prerequisite for UC's master-role
// flow.
//
// 2. Unity Catalog OSS starts and accepts catalog/schema/EXTERNAL Delta
// table CRUD when configured with `aws.masterRoleArn` set to that role
// and AWS_ENDPOINT_URL_STS pointed at SeaweedFS.
//
// What it does NOT verify is the third hop -- UC's Java StsClient
// reaching SeaweedFS' STS handler. UC's
// AwsCredentialGenerator.StsAwsCredentialGenerator builds the StsClient
// without an endpointOverride and doesn't propagate aws.endpoint to it, so
// the SDK's generic AWS_ENDPOINT_URL_STS resolution doesn't kick in and the
// request always targets real AWS, returning InvalidClientTokenId. Verified
// by pointing AWS_ENDPOINT_URL_STS at port 1: same 403 either way, and a
// sniffer in front of SeaweedFS records zero traffic. UC's own tests mock
// StsClient out (BaseCRUDTestWithMockCredentials' EchoAwsStsClient,
// Mockito.mockStatic in CloudCredentialVendorTest), so this never showed
// up upstream. Fix is in unitycatalog/unitycatalog#1532; that bit is
// logged via t.Logf and not asserted.
func TestUnityCatalogMasterRoleIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in -short mode")
}
if !testutil.HasDocker() {
t.Skip("docker not available")
}
env := newTestEnv(t)
defer env.cleanup(t)
t.Log(">>> starting SeaweedFS with STS-enabled IAM config...")
env.startSeaweedFS(t, stsEnabledIAMConfig(env.accessKey, env.secretKey))
t.Log(">>> SeaweedFS ready")
ctx, cancel := context.WithTimeout(context.Background(), 8*time.Minute)
defer cancel()
s3c := env.newHostS3Client(t, ctx)
if _, err := s3c.CreateBucket(ctx, &s3.CreateBucketInput{Bucket: aws.String(ucWarehouse)}); err != nil {
t.Fatalf("create warehouse bucket: %v", err)
}
// Slice 1: prove SeaweedFS STS works for the role UC would use.
t.Run("SeaweedFsAssumesUnityCatalogVendedRole", func(t *testing.T) {
stsEndpoint := fmt.Sprintf("http://127.0.0.1:%d", env.s3Port)
creds, err := assumeRoleViaSeaweedFS(ctx, stsEndpoint, env.accessKey, env.secretKey, ucVendedRoleArn)
if err != nil {
t.Fatalf("AssumeRole on SeaweedFS STS: %v", err)
}
if creds.SessionToken == "" {
t.Fatalf("expected non-empty session_token from STS, got %+v", creds)
}
if creds.AccessKeyID == env.accessKey {
t.Fatalf("STS returned the static admin access key; AssumeRole did not vend a new one")
}
// Confirm the vended creds actually work for an S3 round-trip.
s3v := env.newHostS3ClientWithCreds(t, ctx, creds.AccessKeyID, creds.SecretAccessKey, creds.SessionToken)
probeKey := "ns-probe/master-role-probe.txt"
if _, err := s3v.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(ucWarehouse),
Key: aws.String(probeKey),
Body: nil,
}); err != nil {
t.Fatalf("PutObject with sts-vended creds: %v", err)
}
_, _ = s3v.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(ucWarehouse),
Key: aws.String(probeKey),
})
})
// Slice 2: UC starts with master-role config and CRUD works.
stsEndpoint := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port)
t.Log(">>> starting Unity Catalog server (master role)...")
env.startUnityCatalog(t, ctx, ucServerOpts{
MasterRoleArn: ucVendedRoleArn,
ExtraEnv: map[string]string{
"AWS_ENDPOINT_URL": stsEndpoint,
"AWS_ENDPOINT_URL_STS": stsEndpoint,
"AWS_ACCESS_KEY_ID": env.accessKey,
"AWS_SECRET_ACCESS_KEY": env.secretKey,
"AWS_REGION": "us-east-1",
},
})
t.Log(">>> Unity Catalog ready")
uc := newUCClient(fmt.Sprintf("http://127.0.0.1:%d", env.ucHostPort))
suffix := time.Now().UnixNano()
catalogName := fmt.Sprintf("seaweed_uc_role_%d", suffix)
schemaName := fmt.Sprintf("ns_%d", suffix)
tableName := fmt.Sprintf("events_%d", suffix)
tableLocation := fmt.Sprintf("s3://%s/%s/%s/%s", ucWarehouse, ucWarehouseKey, schemaName, tableName)
defer func() {
_ = uc.deleteTable(context.Background(), catalogName+"."+schemaName+"."+tableName)
_ = uc.deleteSchema(context.Background(), catalogName+"."+schemaName)
_ = uc.deleteCatalog(context.Background(), catalogName)
}()
t.Run("UnityCatalogAcceptsMasterRoleConfig", func(t *testing.T) {
if _, err := uc.createCatalog(ctx, ucCreateCatalog{
Name: catalogName,
StorageRoot: fmt.Sprintf("s3://%s/%s", ucWarehouse, ucWarehouseKey),
}); err != nil {
t.Fatalf("create catalog: %v", err)
}
if _, err := uc.createSchema(ctx, ucCreateSchema{Name: schemaName, CatalogName: catalogName}); err != nil {
t.Fatalf("create schema: %v", err)
}
created, err := uc.createTable(ctx, ucCreateTable{
Name: tableName,
CatalogName: catalogName,
SchemaName: schemaName,
TableType: "EXTERNAL",
DataSourceFormat: "DELTA",
Columns: []ucColumn{
{Name: "id", TypeText: "long", TypeName: "LONG", TypeJSON: `{"name":"id","type":"long","nullable":true,"metadata":{}}`, Position: 0, Nullable: true},
},
StorageLocation: tableLocation,
})
if err != nil {
t.Fatalf("create external delta table: %v", err)
}
if created.TableID == "" {
t.Fatalf("expected table_id in CreateTable response, got %+v", created)
}
// The actual UC -> SeaweedFS STS handoff is the still-unsolved slice.
// Log what happens, but don't fail the test on it: SeaweedFS' STS is
// known-good (slice 1 above), and UC's CRUD is known-good (this slice).
_, ucCredErr := uc.generateTemporaryTableCredentials(ctx, created.TableID, "READ_WRITE")
if ucCredErr != nil {
t.Logf("UC StsClient still hits real AWS (UC bug, fix in unitycatalog/unitycatalog#1532): %v", ucCredErr)
} else {
t.Logf("UC StsClient honors endpoint override; unitycatalog/unitycatalog#1532 (or equivalent) is in this image.")
}
})
}
// assumeRoleViaSeaweedFS calls sts:AssumeRole against SeaweedFS' STS endpoint
// using the Go AWS SDK, mirroring how lakekeeper/polaris tests prove
// SeaweedFS' STS path works.
func assumeRoleViaSeaweedFS(ctx context.Context, endpoint, accessKey, secretKey, roleArn string) (aws.Credentials, error) {
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion("us-east-1"),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")),
)
if err != nil {
return aws.Credentials{}, fmt.Errorf("load aws config: %w", err)
}
// Override the STS endpoint via service-specific BaseEndpoint instead of
// the deprecated EndpointResolverWithOptions wired through the global
// config. This is the supported path in aws-sdk-go-v2 since v1.21.
client := sts.NewFromConfig(cfg, func(o *sts.Options) {
o.BaseEndpoint = aws.String(endpoint)
})
resp, err := client.AssumeRole(ctx, &sts.AssumeRoleInput{
RoleArn: aws.String(roleArn),
RoleSessionName: aws.String("uc-master-role-test"),
})
if err != nil {
return aws.Credentials{}, err
}
if resp.Credentials == nil {
return aws.Credentials{}, fmt.Errorf("AssumeRole returned no credentials")
}
return aws.Credentials{
AccessKeyID: aws.ToString(resp.Credentials.AccessKeyId),
SecretAccessKey: aws.ToString(resp.Credentials.SecretAccessKey),
SessionToken: aws.ToString(resp.Credentials.SessionToken),
Source: "seaweedfs-sts",
}, nil
}

View File

@@ -0,0 +1,379 @@
package unity_catalog
import (
"context"
"fmt"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/seaweedfs/seaweedfs/test/testutil"
)
const (
ucImageDefault = "unitycatalog/unitycatalog:v0.4.0"
ucContainerCfg = "/home/unitycatalog/etc/conf/server.properties"
ucStartupGrace = 90 * time.Second
ucAPIBase = "/api/2.1/unity-catalog"
ucWarehouse = "lakehouse"
ucWarehouseKey = "warehouse"
// Role used by the master-role STS-vended variant of the test. The trust
// policy is wide open so any caller can assume it; in production UC
// would set this ARN as `aws.masterRoleArn`.
ucVendedRoleArn = "arn:aws:iam::000000000000:role/UnityCatalogVendedRole"
ucVendedRoleName = "UnityCatalogVendedRole"
)
type testEnv struct {
seaweedDir string
weedBinary string
dataDir string
bindIP string
s3Port int
s3GrpcPort int
masterPort int
masterGrpcPort int
filerPort int
filerGrpcPort int
volumePort int
volumeGrpcPort int
weedProcess *exec.Cmd
weedCancel context.CancelFunc
accessKey string
secretKey string
ucImage string
ucContainerID string
ucHostPort int
}
// ucServerOpts customizes the Unity Catalog server.properties and runtime
// environment. Defaults match the upstream playground (static keys, no master
// role).
type ucServerOpts struct {
// MasterRoleArn populates aws.masterRoleArn. Empty means UC falls back to
// static aws.accessKey / aws.secretKey for storage operations.
MasterRoleArn string
// ExtraEnv adds environment variables to the UC container, useful for
// AWS_ENDPOINT_URL_STS-style overrides.
ExtraEnv map[string]string
}
func newTestEnv(t *testing.T) *testEnv {
t.Helper()
wd, err := os.Getwd()
if err != nil {
t.Fatalf("getwd: %v", err)
}
seaweedDir := wd
for i := 0; i < 8; i++ {
if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil {
break
}
seaweedDir = filepath.Dir(seaweedDir)
}
weedBinary := filepath.Join(seaweedDir, "weed", "weed")
if _, err := os.Stat(weedBinary); err != nil {
weedBinary = "weed"
if _, err := exec.LookPath(weedBinary); err != nil {
t.Skip("weed binary not found, skipping integration test")
}
}
dataDir, err := os.MkdirTemp("", "seaweed-uc-test-*")
if err != nil {
t.Fatalf("mkdtemp: %v", err)
}
bindIP := testutil.FindBindIP()
ports := testutil.MustAllocatePorts(t, 9)
masterPort, masterGrpcPort := ports[0], ports[1]
volumePort, volumeGrpcPort := ports[2], ports[3]
filerPort, filerGrpcPort := ports[4], ports[5]
s3Port, s3GrpcPort := ports[6], ports[7]
ucHostPort := ports[8]
image := os.Getenv("UC_IMAGE")
if image == "" {
image = ucImageDefault
}
return &testEnv{
seaweedDir: seaweedDir,
weedBinary: weedBinary,
dataDir: dataDir,
bindIP: bindIP,
s3Port: s3Port,
s3GrpcPort: s3GrpcPort,
masterPort: masterPort,
masterGrpcPort: masterGrpcPort,
filerPort: filerPort,
filerGrpcPort: filerGrpcPort,
volumePort: volumePort,
volumeGrpcPort: volumeGrpcPort,
accessKey: "admin",
secretKey: "admin",
ucImage: image,
ucHostPort: ucHostPort,
}
}
// startSeaweedFS starts a `weed mini` instance. If iamJSON is empty, a minimal
// admin-only IAM config is used. When iamJSON is non-empty, it is passed to
// both -s3.config and -s3.iam.config so the STS handler is enabled (mirroring
// the lakekeeper test).
func (env *testEnv) startSeaweedFS(t *testing.T, iamJSON string) {
t.Helper()
enableSTS := iamJSON != ""
if iamJSON == "" {
iamJSON = fmt.Sprintf(`{
"identities": [
{
"name": "admin",
"credentials": [{"accessKey": %q, "secretKey": %q}],
"actions": ["Admin", "Read", "List", "Tagging", "Write"]
}
]
}`, env.accessKey, env.secretKey)
}
iamConfigPath := filepath.Join(env.dataDir, "iam.json")
if err := os.WriteFile(iamConfigPath, []byte(iamJSON), 0644); err != nil {
t.Fatalf("write iam config: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
env.weedCancel = cancel
weedArgs := []string{"-v", "4", "mini",
"-master.port", fmt.Sprintf("%d", env.masterPort),
"-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort),
"-volume.port", fmt.Sprintf("%d", env.volumePort),
"-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort),
"-filer.port", fmt.Sprintf("%d", env.filerPort),
"-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort),
"-s3.port", fmt.Sprintf("%d", env.s3Port),
"-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort),
"-s3.config", iamConfigPath,
"-ip", env.bindIP,
"-ip.bind", "0.0.0.0",
"-dir", env.dataDir,
}
if enableSTS {
weedArgs = append(weedArgs, "-s3.iam.config", iamConfigPath, "-s3.iam.readOnly=false")
}
cmd := exec.CommandContext(ctx, env.weedBinary, weedArgs...)
cmd.Dir = env.dataDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
t.Fatalf("start weed mini: %v", err)
}
env.weedProcess = cmd
if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/status", env.s3Port), testutil.SeaweedMiniStartupTimeout) {
t.Fatalf("S3 API at 127.0.0.1:%d did not become ready", env.s3Port)
}
}
// startUnityCatalog launches the Unity Catalog OSS server in Docker against
// the running SeaweedFS instance. It mirrors the upstream playground's
// server.properties layout and bind-mounts only that single file (matching
// docker-compose.yaml from the playground).
func (env *testEnv) startUnityCatalog(t *testing.T, ctx context.Context, opts ucServerOpts) {
t.Helper()
s3EndpointForContainer := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port)
props := strings.Join([]string{
"server.env=test",
"server.authorization=disable",
"server.cookie-timeout=PT1H",
"server.managed-table.enabled=false",
fmt.Sprintf("aws.masterRoleArn=%s", opts.MasterRoleArn),
fmt.Sprintf("aws.accessKey=%s", env.accessKey),
fmt.Sprintf("aws.secretKey=%s", env.secretKey),
"aws.region=us-east-1",
fmt.Sprintf("aws.endpoint=%s", s3EndpointForContainer),
// UC keys perBucketS3Configs by NormalizedURL.from(bucketPath) and
// looks up using the storageBase, which is "s3://<bucket>" (scheme +
// authority only). The playground's "s3://lakehouse/warehouse" never
// matches because of that asymmetry; the bucket-only form works.
fmt.Sprintf("s3.bucketPath.0=s3://%s", ucWarehouse),
"s3.region.0=us-east-1",
fmt.Sprintf("s3.awsRoleArn.0=%s", opts.MasterRoleArn),
fmt.Sprintf("s3.accessKey.0=%s", env.accessKey),
fmt.Sprintf("s3.secretKey.0=%s", env.secretKey),
fmt.Sprintf("s3.endpoint.0=%s", s3EndpointForContainer),
"",
}, "\n")
confDir := filepath.Join(env.dataDir, "uc-conf")
if err := os.MkdirAll(confDir, 0755); err != nil {
t.Fatalf("mkdir uc-conf: %v", err)
}
propsPath := filepath.Join(confDir, "server.properties")
if err := os.WriteFile(propsPath, []byte(props), 0644); err != nil {
t.Fatalf("write server.properties: %v", err)
}
containerName := fmt.Sprintf("seaweed-unity-catalog-%d", time.Now().UnixNano())
args := []string{
"run", "-d", "--rm",
"--name", containerName,
"--add-host", "host.docker.internal:host-gateway",
"-p", fmt.Sprintf("%d:8080", env.ucHostPort),
"-v", fmt.Sprintf("%s:%s:ro", propsPath, ucContainerCfg),
"-e", "JAVA_OPTS=-Xmx1g",
}
for k, v := range opts.ExtraEnv {
args = append(args, "-e", fmt.Sprintf("%s=%s", k, v))
}
args = append(args, env.ucImage)
out, err := exec.CommandContext(ctx, "docker", args...).CombinedOutput()
if err != nil {
t.Fatalf("docker run unity-catalog: %v\n%s", err, out)
}
// docker run -d prefixes pull progress / warnings before the final container
// ID line when the image isn't cached locally. Trim to the last whitespace-
// separated token so the ID survives a fresh CI runner.
fields := strings.Fields(string(out))
if len(fields) == 0 {
t.Fatalf("docker run produced no output; combined: %q", string(out))
}
env.ucContainerID = fields[len(fields)-1]
t.Logf("unity-catalog container id: %s", env.ucContainerID)
probe := fmt.Sprintf("http://127.0.0.1:%d%s/catalogs", env.ucHostPort, ucAPIBase)
deadline := time.Now().Add(ucStartupGrace)
var lastErr error
for time.Now().Before(deadline) {
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, probe, nil)
resp, err := (&http.Client{Timeout: 3 * time.Second}).Do(req)
if err == nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return
}
lastErr = fmt.Errorf("status %d", resp.StatusCode)
} else {
lastErr = err
}
select {
case <-ctx.Done():
t.Fatalf("ctx done while waiting for unity catalog: %v", ctx.Err())
case <-time.After(time.Second):
}
}
logs, _ := exec.Command("docker", "logs", "--tail", "200", env.ucContainerID).CombinedOutput()
t.Fatalf("unity catalog did not become ready: %v\nrecent logs:\n%s", lastErr, logs)
}
func (env *testEnv) cleanup(t *testing.T) {
if env.ucContainerID != "" {
if logs, err := exec.Command("docker", "logs", "--tail", "200", env.ucContainerID).CombinedOutput(); err == nil {
t.Logf("unity-catalog tail logs:\n%s", logs)
}
_ = exec.Command("docker", "rm", "-f", env.ucContainerID).Run()
}
if env.weedCancel != nil {
env.weedCancel()
}
if env.weedProcess != nil {
_ = env.weedProcess.Wait()
}
if env.dataDir != "" {
_ = os.RemoveAll(env.dataDir)
}
}
// -- S3 client helpers ---------------------------------------------------------
func (env *testEnv) newHostS3Client(t *testing.T, ctx context.Context) *s3.Client {
t.Helper()
return env.newHostS3ClientWithCreds(t, ctx, env.accessKey, env.secretKey, "")
}
func (env *testEnv) newHostS3ClientWithCreds(t *testing.T, ctx context.Context, ak, sk, token string) *s3.Client {
t.Helper()
endpoint := fmt.Sprintf("http://127.0.0.1:%d", env.s3Port)
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion("us-east-1"),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(ak, sk, token)),
)
if err != nil {
t.Fatalf("load aws config: %v", err)
}
return s3.NewFromConfig(cfg, func(o *s3.Options) {
o.BaseEndpoint = aws.String(endpoint)
o.UsePathStyle = true
})
}
// stsEnabledIAMConfig returns an iam.json that defines an admin user, the
// UnityCatalogVendedRole role with a permissive trust policy, and a FullAccess
// policy attached to it. This is the SeaweedFS-side counterpart of the
// `aws.masterRoleArn` configuration on the Unity Catalog server.
func stsEnabledIAMConfig(accessKey, secretKey string) string {
return fmt.Sprintf(`{
"identities": [
{
"name": "admin",
"credentials": [{"accessKey": %q, "secretKey": %q}],
"actions": ["Admin", "Read", "List", "Tagging", "Write"]
}
],
"sts": {
"tokenDuration": "12h",
"maxSessionLength": "24h",
"issuer": "seaweedfs-sts",
"signingKey": "dGVzdC1zaWduaW5nLWtleS1mb3Itc3RzLWludGVncmF0aW9uLXRlc3Rz"
},
"roles": [
{
"roleName": %q,
"roleArn": %q,
"trustPolicy": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": "sts:AssumeRole"
}
]
},
"attachedPolicies": ["FullAccess"]
}
],
"policies": [
{
"name": "FullAccess",
"document": {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": "*", "Resource": "*"}
]
}
}
]
}`, accessKey, secretKey, ucVendedRoleName, ucVendedRoleArn)
}

View File

@@ -0,0 +1,183 @@
// Package unity_catalog provides integration tests that run Unity Catalog
// OSS in Docker against a SeaweedFS S3 backend. The base test mirrors the
// configuration used by the upstream UC playground at
// https://github.com/data-engineering-helpers/mds-in-a-box/tree/main/unitycatalog-playground:
// static aws.accessKey/aws.secretKey, no master role, EXTERNAL Delta tables.
package unity_catalog
import (
"context"
"fmt"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/seaweedfs/seaweedfs/test/testutil"
)
// TestUnityCatalogDeltaIntegration brings up SeaweedFS, runs Unity Catalog OSS
// in Docker against it, and exercises catalog/schema/table CRUD plus
// temporary-table-credentials and storage I/O via vended credentials. The UC
// server is configured with static keys (aws.masterRoleArn empty), matching
// the playground's working configuration when targeting SeaweedFS.
func TestUnityCatalogDeltaIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in -short mode")
}
if !testutil.HasDocker() {
t.Skip("docker not available")
}
env := newTestEnv(t)
defer env.cleanup(t)
t.Log(">>> starting SeaweedFS for Unity Catalog test...")
env.startSeaweedFS(t, "")
t.Log(">>> SeaweedFS ready")
ctx, cancel := context.WithTimeout(context.Background(), 8*time.Minute)
defer cancel()
s3c := env.newHostS3Client(t, ctx)
if _, err := s3c.CreateBucket(ctx, &s3.CreateBucketInput{Bucket: aws.String(ucWarehouse)}); err != nil {
t.Fatalf("create warehouse bucket: %v", err)
}
t.Log(">>> starting Unity Catalog server (static keys)...")
env.startUnityCatalog(t, ctx, ucServerOpts{})
t.Log(">>> Unity Catalog ready")
uc := newUCClient(fmt.Sprintf("http://127.0.0.1:%d", env.ucHostPort))
suffix := time.Now().UnixNano()
catalogName := fmt.Sprintf("seaweed_uc_%d", suffix)
schemaName := fmt.Sprintf("ns_%d", suffix)
tableName := fmt.Sprintf("events_%d", suffix)
tableLocation := fmt.Sprintf("s3://%s/%s/%s/%s", ucWarehouse, ucWarehouseKey, schemaName, tableName)
defer func() {
_ = uc.deleteTable(context.Background(), catalogName+"."+schemaName+"."+tableName)
_ = uc.deleteSchema(context.Background(), catalogName+"."+schemaName)
_ = uc.deleteCatalog(context.Background(), catalogName)
}()
t.Run("CreateCatalog", func(t *testing.T) {
got, err := uc.createCatalog(ctx, ucCreateCatalog{
Name: catalogName,
Comment: "seaweedfs integration test",
StorageRoot: fmt.Sprintf("s3://%s/%s", ucWarehouse, ucWarehouseKey),
})
if err != nil {
t.Fatalf("create catalog: %v", err)
}
if got.Name != catalogName {
t.Fatalf("catalog name = %q, want %q", got.Name, catalogName)
}
})
t.Run("CreateSchema", func(t *testing.T) {
got, err := uc.createSchema(ctx, ucCreateSchema{
Name: schemaName,
CatalogName: catalogName,
})
if err != nil {
t.Fatalf("create schema: %v", err)
}
if got.Name != schemaName || got.CatalogName != catalogName {
t.Fatalf("schema = %+v, want name=%q catalog=%q", got, schemaName, catalogName)
}
})
var createdTable ucTableInfo
t.Run("CreateExternalDeltaTable", func(t *testing.T) {
columns := []ucColumn{
{Name: "id", TypeText: "long", TypeName: "LONG", TypeJSON: `{"name":"id","type":"long","nullable":true,"metadata":{}}`, Position: 0, Nullable: true},
{Name: "value", TypeText: "string", TypeName: "STRING", TypeJSON: `{"name":"value","type":"string","nullable":true,"metadata":{}}`, Position: 1, Nullable: true},
}
got, err := uc.createTable(ctx, ucCreateTable{
Name: tableName,
CatalogName: catalogName,
SchemaName: schemaName,
TableType: "EXTERNAL",
DataSourceFormat: "DELTA",
Columns: columns,
StorageLocation: tableLocation,
})
if err != nil {
t.Fatalf("create table: %v", err)
}
if got.TableType != "EXTERNAL" || got.DataSourceFormat != "DELTA" {
t.Fatalf("table types = %s/%s, want EXTERNAL/DELTA", got.TableType, got.DataSourceFormat)
}
if got.StorageLocation == "" {
t.Fatalf("table storage_location is empty in response: %+v", got)
}
createdTable = *got
})
t.Run("GetTable", func(t *testing.T) {
full := catalogName + "." + schemaName + "." + tableName
got, err := uc.getTable(ctx, full)
if err != nil {
t.Fatalf("get table: %v", err)
}
if got.Name != tableName {
t.Fatalf("get table name = %q, want %q", got.Name, tableName)
}
})
t.Run("ListTables", func(t *testing.T) {
tables, err := uc.listTables(ctx, catalogName, schemaName)
if err != nil {
t.Fatalf("list tables: %v", err)
}
found := false
for _, tbl := range tables {
if tbl.Name == tableName {
found = true
break
}
}
if !found {
t.Fatalf("created table %q not found in list", tableName)
}
})
t.Run("TemporaryTableCredentialsRejected", func(t *testing.T) {
// With aws.masterRoleArn empty AND no s3.sessionToken.0 set, UC OSS
// always tries to AssumeRole via its internal StsClient (see
// AwsCredentialVendor.createPerBucketCredentialGenerator). Against a
// non-AWS endpoint, that call doesn't reach a real STS, so UC returns
// "S3 bucket configuration not found." or an STS-side error. This is
// the gap users hit at <https://github.com/data-engineering-helpers/mds-in-a-box/blob/main/unitycatalog-playground/etc/conf/server.properties#L45>:
// "with simple S3 access and secret keys, Unity Catalog does not seem
// to work."
//
// The assertion is therefore inverted: we expect a non-nil error from
// /temporary-table-credentials with this configuration. A future
// variant can pin s3.sessionToken.0 (UC's StaticAwsCredentialGenerator
// path) once SeaweedFS' SigV4 path tolerates the vended session token.
if createdTable.TableID == "" {
t.Fatalf("created table has empty table_id; cannot request temporary credentials")
}
_, err := uc.generateTemporaryTableCredentials(ctx, createdTable.TableID, "READ_WRITE")
if err == nil {
t.Fatalf("expected /temporary-table-credentials to fail with the static-key playground configuration; it succeeded unexpectedly")
}
t.Logf("expected failure (UC static-key path requires AWS STS): %v", err)
})
t.Run("DeleteTableSchemaCatalog", func(t *testing.T) {
if err := uc.deleteTable(ctx, catalogName+"."+schemaName+"."+tableName); err != nil {
t.Fatalf("delete table: %v", err)
}
if err := uc.deleteSchema(ctx, catalogName+"."+schemaName); err != nil {
t.Fatalf("delete schema: %v", err)
}
if err := uc.deleteCatalog(ctx, catalogName); err != nil {
t.Fatalf("delete catalog: %v", err)
}
})
}