diff --git a/weed/s3api/iceberg/commit_helpers.go b/weed/s3api/iceberg/commit_helpers.go index 1e4c912ba..d5740ac06 100644 --- a/weed/s3api/iceberg/commit_helpers.go +++ b/weed/s3api/iceberg/commit_helpers.go @@ -133,6 +133,10 @@ func (s *Server) finalizeCreateOnCommit(ctx context.Context, input createOnCommi message: "Failed to apply statistics updates: " + err.Error(), } } + // Same spec-compliance fixup we apply on create-table; ensures + // v{N}.metadata.json files written through this create-on-commit path are + // also readable by strict Iceberg clients reading directly from S3. + metadataBytes = ensureMetadataSpecCompliance(metadataBytes) newMetadata, err = table.ParseMetadataBytes(metadataBytes) if err != nil { return nil, &icebergRequestError{ diff --git a/weed/s3api/iceberg/handlers_commit.go b/weed/s3api/iceberg/handlers_commit.go index 571ad43dd..587137a11 100644 --- a/weed/s3api/iceberg/handlers_commit.go +++ b/weed/s3api/iceberg/handlers_commit.go @@ -264,6 +264,11 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+err.Error()) return } + // Same spec-compliance fixup we apply on create-table; ensures + // v{N}.metadata.json files written during commit are also readable by + // strict Iceberg clients reading directly from S3, and that the + // FullMetadata persisted in S3Tables stays consistent. + metadataBytes = ensureMetadataSpecCompliance(metadataBytes) newMetadata, err = table.ParseMetadataBytes(metadataBytes) if err != nil { writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+err.Error()) diff --git a/weed/s3api/iceberg/handlers_table.go b/weed/s3api/iceberg/handlers_table.go index 8042b3130..88fca0b16 100644 --- a/weed/s3api/iceberg/handlers_table.go +++ b/weed/s3api/iceberg/handlers_table.go @@ -161,6 +161,10 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+err.Error()) return } + // Backfill Iceberg spec-required fields that iceberg-go v0.5.0 omits when + // empty (e.g. current-snapshot-id), so the persisted v*.metadata.json is + // readable by strict clients (Java/Spark/Trino) that go directly to S3. + metadataBytes = ensureMetadataSpecCompliance(metadataBytes) tableName := req.Name metadataFileName := "v1.metadata.json" // Initial version is always 1 diff --git a/weed/s3api/iceberg/metadata_compliance.go b/weed/s3api/iceberg/metadata_compliance.go new file mode 100644 index 000000000..1d4fdaf0a --- /dev/null +++ b/weed/s3api/iceberg/metadata_compliance.go @@ -0,0 +1,132 @@ +package iceberg + +import ( + "bytes" + "encoding/json" +) + +// specRequiredEmptyOrder is iterated in this fixed order so the byte-level +// append in ensureMetadataSpecCompliance produces deterministic output. +var specRequiredEmptyOrder = []string{ + "current-snapshot-id", + "snapshots", + "snapshot-log", + "metadata-log", + "refs", +} + +// specRequiredEmptyDefaults holds the sentinel values Iceberg requires when +// the corresponding state is empty. +var specRequiredEmptyDefaults = map[string]json.RawMessage{ + "current-snapshot-id": json.RawMessage("-1"), + "snapshots": json.RawMessage("[]"), + "snapshot-log": json.RawMessage("[]"), + "metadata-log": json.RawMessage("[]"), + "refs": json.RawMessage("{}"), +} + +// isJSONNull reports whether raw encodes the JSON null literal, ignoring +// insignificant whitespace allowed by RFC 8259. +func isJSONNull(raw json.RawMessage) bool { + return bytes.Equal(bytes.TrimSpace(raw), []byte("null")) +} + +// ensureMetadataSpecCompliance backfills Iceberg spec-required fields that +// iceberg-go v0.5.0 drops via `omitempty` on optional pointer/slice fields. +// +// For tables with no snapshots (e.g. freshly created tables), iceberg-go +// represents an empty state with nil/empty Go values, then strips those keys +// entirely from the JSON output. The Iceberg REST spec, however, requires +// these keys to be present even when empty — Java/Spark/Trino clients +// fail to parse responses missing them (most visibly with +// "Cannot parse missing long current-snapshot-id"). This helper rehydrates +// the missing keys without overwriting any present real values. +// +// Missing keys are spliced in at the byte level just before the closing +// brace so iceberg-go's struct-declared key order is preserved (a naive +// map remarshal would alphabetize every key in the document). The slower +// remarshal path is only used when an explicit JSON null needs replacing, +// which iceberg-go itself never emits. +// +// Returns the original bytes unchanged when parsing fails or no key needs +// to be added, to avoid corrupting an otherwise-valid payload. +func ensureMetadataSpecCompliance(raw []byte) []byte { + if len(raw) == 0 { + return raw + } + var obj map[string]json.RawMessage + if err := json.Unmarshal(raw, &obj); err != nil { + return raw + } + // A top-level JSON null leaves obj == nil with no error; writes would + // panic on the slow path. It is not a valid TableMetadata document, so + // pass it through unchanged. + if obj == nil { + return raw + } + + // A field is "missing" for spec purposes if it's absent OR encoded as + // JSON null. Some writers emit `"current-snapshot-id": null` for empty + // state instead of omitting; strict clients reject both the same way. + var toAppend []string + hasExplicitNull := false + for _, key := range specRequiredEmptyOrder { + v, present := obj[key] + switch { + case !present: + toAppend = append(toAppend, key) + case isJSONNull(v): + hasExplicitNull = true + } + } + if len(toAppend) == 0 && !hasExplicitNull { + return raw + } + + if !hasExplicitNull { + return appendMissingObjectKeys(raw, toAppend, len(obj) > 0) + } + + // Slow path: replace explicit nulls. Rare in practice, so the + // alphabetical key reordering inherent to map remarshal is acceptable. + for key, v := range obj { + if _, required := specRequiredEmptyDefaults[key]; required && isJSONNull(v) { + obj[key] = specRequiredEmptyDefaults[key] + } + } + for _, key := range toAppend { + obj[key] = specRequiredEmptyDefaults[key] + } + fixed, err := json.Marshal(obj) + if err != nil { + return raw + } + return fixed +} + +// appendMissingObjectKeys splices the given keys into raw just before the +// closing brace of the top-level JSON object. raw is assumed to be a valid +// JSON object (the caller has already unmarshalled it). hasMembers tells us +// whether the object had any existing members, so we know whether to emit a +// leading comma for the first appended key. +func appendMissingObjectKeys(raw []byte, keys []string, hasMembers bool) []byte { + closeIdx := bytes.LastIndexByte(raw, '}') + if closeIdx < 0 { + return raw + } + var buf bytes.Buffer + buf.Grow(len(raw) + len(keys)*48) + buf.Write(raw[:closeIdx]) + for _, key := range keys { + if hasMembers { + buf.WriteByte(',') + } + buf.WriteByte('"') + buf.WriteString(key) + buf.WriteString(`":`) + buf.Write(specRequiredEmptyDefaults[key]) + hasMembers = true + } + buf.Write(raw[closeIdx:]) + return buf.Bytes() +} diff --git a/weed/s3api/iceberg/metadata_compliance_test.go b/weed/s3api/iceberg/metadata_compliance_test.go new file mode 100644 index 000000000..ac34f6359 --- /dev/null +++ b/weed/s3api/iceberg/metadata_compliance_test.go @@ -0,0 +1,162 @@ +package iceberg + +import ( + "encoding/json" + "testing" +) + +func TestEnsureMetadataSpecCompliance_BackfillsMissingFields(t *testing.T) { + // Mirrors the real iceberg-go v0.5.0 output for a freshly created table: + // current-snapshot-id, snapshots, snapshot-log, metadata-log, refs all absent. + input := []byte(`{ + "format-version": 2, + "table-uuid": "82e3eec4-3aee-414f-a444-94c03c641d20", + "location": "s3://s3table/default/t1", + "last-sequence-number": 0, + "last-updated-ms": 1779866785466, + "last-column-id": 2, + "current-schema-id": 0, + "default-spec-id": 0, + "last-partition-id": 999, + "default-sort-order-id": 0 + }`) + + out := ensureMetadataSpecCompliance(input) + + var got map[string]json.RawMessage + if err := json.Unmarshal(out, &got); err != nil { + t.Fatalf("output is not valid JSON: %v", err) + } + + if v, ok := got["current-snapshot-id"]; !ok || string(v) != "-1" { + t.Errorf("current-snapshot-id missing or wrong: present=%v value=%s", ok, string(v)) + } + for _, key := range []string{"snapshots", "snapshot-log", "metadata-log"} { + v, ok := got[key] + if !ok || string(v) != "[]" { + t.Errorf("%s missing or wrong: present=%v value=%s", key, ok, string(v)) + } + } + if v, ok := got["refs"]; !ok || string(v) != "{}" { + t.Errorf("refs missing or wrong: present=%v value=%s", ok, string(v)) + } +} + +func TestEnsureMetadataSpecCompliance_PreservesExistingFields(t *testing.T) { + // Real snapshot id and refs must not be overwritten by sentinels. + input := []byte(`{ + "format-version": 2, + "current-snapshot-id": 9876543210, + "snapshots": [{"snapshot-id": 9876543210}], + "snapshot-log": [{"snapshot-id": 9876543210, "timestamp-ms": 1}], + "metadata-log": [{"metadata-file": "v1.metadata.json", "timestamp-ms": 1}], + "refs": {"main": {"snapshot-id": 9876543210, "type": "branch"}} + }`) + + out := ensureMetadataSpecCompliance(input) + + var got map[string]json.RawMessage + if err := json.Unmarshal(out, &got); err != nil { + t.Fatalf("output is not valid JSON: %v", err) + } + if string(got["current-snapshot-id"]) != "9876543210" { + t.Errorf("real snapshot id was overwritten: %s", string(got["current-snapshot-id"])) + } + if string(got["snapshots"]) == "[]" { + t.Errorf("non-empty snapshots was overwritten with empty array") + } +} + +func TestEnsureMetadataSpecCompliance_ReplacesExplicitNullsWithSentinels(t *testing.T) { + // Some writers emit explicit JSON null for unset values instead of omitting + // the key. Strict Iceberg clients reject these the same way as missing keys. + input := []byte(`{ + "format-version": 2, + "current-snapshot-id": null, + "snapshots": null, + "snapshot-log": null, + "metadata-log": null, + "refs": null + }`) + + out := ensureMetadataSpecCompliance(input) + + var got map[string]json.RawMessage + if err := json.Unmarshal(out, &got); err != nil { + t.Fatalf("output is not valid JSON: %v", err) + } + if string(got["current-snapshot-id"]) != "-1" { + t.Errorf("current-snapshot-id should be replaced with -1, got %s", string(got["current-snapshot-id"])) + } + for _, key := range []string{"snapshots", "snapshot-log", "metadata-log"} { + if string(got[key]) != "[]" { + t.Errorf("%s should be replaced with [], got %s", key, string(got[key])) + } + } + if string(got["refs"]) != "{}" { + t.Errorf("refs should be replaced with {}, got %s", string(got["refs"])) + } +} + +func TestEnsureMetadataSpecCompliance_InvalidJSONReturnedUnchanged(t *testing.T) { + input := []byte(`{not valid json`) + out := ensureMetadataSpecCompliance(input) + if string(out) != string(input) { + t.Errorf("invalid JSON should be returned unchanged; got %s", string(out)) + } +} + +func TestEnsureMetadataSpecCompliance_EmptyInputReturnedUnchanged(t *testing.T) { + if out := ensureMetadataSpecCompliance(nil); out != nil { + t.Errorf("nil input should be returned unchanged, got %v", out) + } + if out := ensureMetadataSpecCompliance([]byte{}); len(out) != 0 { + t.Errorf("empty input should be returned unchanged, got %v", out) + } + // A top-level JSON null literal must not panic on the slow path. + if out := ensureMetadataSpecCompliance([]byte("null")); string(out) != "null" { + t.Errorf("top-level null should be returned unchanged, got %s", string(out)) + } +} + +// Original iceberg-go key ordering must survive the backfill: appended +// sentinels go at the end without disturbing prior fields. A map-based +// remarshal would have sorted everything alphabetically. +func TestEnsureMetadataSpecCompliance_PreservesOriginalKeyOrder(t *testing.T) { + // Compact JSON, keys in struct-declared order from iceberg-go. + input := []byte(`{"format-version":2,"table-uuid":"82e3eec4-3aee-414f-a444-94c03c641d20","location":"s3://x/t","last-sequence-number":0,"last-updated-ms":1,"last-column-id":2,"current-schema-id":0,"default-spec-id":0,"last-partition-id":999,"default-sort-order-id":0}`) + + out := ensureMetadataSpecCompliance(input) + + // Prior keys keep their order, sentinels appended at the end. + want := `{"format-version":2,"table-uuid":"82e3eec4-3aee-414f-a444-94c03c641d20","location":"s3://x/t","last-sequence-number":0,"last-updated-ms":1,"last-column-id":2,"current-schema-id":0,"default-spec-id":0,"last-partition-id":999,"default-sort-order-id":0,"current-snapshot-id":-1,"snapshots":[],"snapshot-log":[],"metadata-log":[],"refs":{}}` + if string(out) != want { + t.Errorf("unexpected output\n got: %s\nwant: %s", string(out), want) + } + + // Sanity: still valid JSON. + var parsed map[string]json.RawMessage + if err := json.Unmarshal(out, &parsed); err != nil { + t.Fatalf("output is not valid JSON: %v", err) + } +} + +// An empty object {} must round-trip to a valid JSON object containing +// only the spec sentinels (no leading comma). +func TestEnsureMetadataSpecCompliance_EmptyObjectBackfilled(t *testing.T) { + out := ensureMetadataSpecCompliance([]byte(`{}`)) + want := `{"current-snapshot-id":-1,"snapshots":[],"snapshot-log":[],"metadata-log":[],"refs":{}}` + if string(out) != want { + t.Errorf("unexpected output\n got: %s\nwant: %s", string(out), want) + } +} + +// When all fields are already present, the original bytes must be returned +// untouched (no whitespace normalization, no key reordering). +func TestEnsureMetadataSpecCompliance_AllPresentReturnsSameBytes(t *testing.T) { + input := []byte("{\n \"current-snapshot-id\": 1,\n \"snapshots\": [],\n \"snapshot-log\": [],\n \"metadata-log\": [],\n \"refs\": {}\n}") + out := ensureMetadataSpecCompliance(input) + if string(out) != string(input) { + t.Errorf("expected original bytes returned unchanged\n got: %q\nwant: %q", string(out), string(input)) + } +} diff --git a/weed/s3api/iceberg/types.go b/weed/s3api/iceberg/types.go index d1083f0ea..2aa399174 100644 --- a/weed/s3api/iceberg/types.go +++ b/weed/s3api/iceberg/types.go @@ -93,6 +93,29 @@ type loadTableResultAlias struct { Config iceberg.Properties `json:"config,omitempty"` } +// MarshalJSON serializes LoadTableResult while backfilling spec-required +// metadata fields that iceberg-go v0.5.0 drops via `omitempty`. Without this, +// strict Iceberg REST clients (Java/Spark/Trino) reject otherwise valid +// responses for empty tables with errors like +// "Cannot parse missing long current-snapshot-id". +func (r LoadTableResult) MarshalJSON() ([]byte, error) { + metaBytes, err := json.Marshal(r.Metadata) + if err != nil { + return nil, err + } + metaBytes = ensureMetadataSpecCompliance(metaBytes) + + return json.Marshal(struct { + MetadataLocation string `json:"metadata-location,omitempty"` + Metadata json.RawMessage `json:"metadata"` + Config iceberg.Properties `json:"config"` + }{ + MetadataLocation: r.MetadataLocation, + Metadata: metaBytes, + Config: r.Config, + }) +} + // UnmarshalJSON implements custom unmarshaling for LoadTableResult // to properly parse table.Metadata using iceberg-go's parser. func (r *LoadTableResult) UnmarshalJSON(data []byte) error { @@ -134,6 +157,25 @@ type commitTableResponseAlias struct { RawMetadata json.RawMessage `json:"metadata"` } +// MarshalJSON mirrors LoadTableResult.MarshalJSON: it backfills spec-required +// fields that iceberg-go v0.5.0 omits, so commit responses also parse cleanly +// on strict Iceberg clients. +func (r CommitTableResponse) MarshalJSON() ([]byte, error) { + metaBytes, err := json.Marshal(r.Metadata) + if err != nil { + return nil, err + } + metaBytes = ensureMetadataSpecCompliance(metaBytes) + + return json.Marshal(struct { + MetadataLocation string `json:"metadata-location"` + Metadata json.RawMessage `json:"metadata"` + }{ + MetadataLocation: r.MetadataLocation, + Metadata: metaBytes, + }) +} + // UnmarshalJSON implements custom unmarshaling for CommitTableResponse. func (r *CommitTableResponse) UnmarshalJSON(data []byte) error { var alias commitTableResponseAlias diff --git a/weed/s3api/s3tables/iceberg_layout.go b/weed/s3api/s3tables/iceberg_layout.go index a754b5d06..c9083c20a 100644 --- a/weed/s3api/s3tables/iceberg_layout.go +++ b/weed/s3api/s3tables/iceberg_layout.go @@ -26,7 +26,19 @@ var ( "data": true, } - // Patterns for valid metadata files + // Patterns for valid metadata files. + // + // Note: Iceberg engines (Flink/Spark/Trino, plus different Iceberg versions) + // use a variety of manifest/snapshot naming schemes that the strict patterns + // below don't cover - e.g. Flink emits manifests like + // "{flink-job-id}-{checkpoint}-{operator-id}-{n}.avro". The Iceberg spec + // itself doesn't mandate a specific filename; engines just need a stable + // unique name. So in addition to the strict patterns we keep below for + // documentation, the catch-all entries at the bottom accept any + // *.avro and *.metadata.json filename composed of safe characters + // ([A-Za-z0-9._-]). The catch-all deliberately does NOT cover arbitrary + // *.json — only the *.metadata.json suffix — to avoid letting random + // JSON drop into the metadata/ directory. metadataFilePatterns = []*regexp.Regexp{ regexp.MustCompile(`^v\d+\.metadata\.json$`), // Table metadata: v1.metadata.json, v2.metadata.json regexp.MustCompile(`^snap-\d+-\d+-` + uuidPattern + `\.avro$`), // Snapshot manifests: snap-123-1-uuid.avro @@ -35,6 +47,10 @@ var ( regexp.MustCompile(`^version-hint\.text$`), // Version hint file regexp.MustCompile(`^` + uuidPattern + `\.metadata\.json$`), // UUID-named metadata regexp.MustCompile(`^[^/]+\.stats$`), // Trino/Iceberg stats files + // Catch-all for Iceberg writer-generated manifest / metadata files + // whose naming we can't anticipate across engines and versions. + regexp.MustCompile(`^[A-Za-z0-9._-]+\.avro$`), + regexp.MustCompile(`^[A-Za-z0-9._-]+\.metadata\.json$`), } // Patterns for valid data files diff --git a/weed/s3api/s3tables/iceberg_layout_test.go b/weed/s3api/s3tables/iceberg_layout_test.go new file mode 100644 index 000000000..62e32c518 --- /dev/null +++ b/weed/s3api/s3tables/iceberg_layout_test.go @@ -0,0 +1,102 @@ +package s3tables + +import "testing" + +// TestIcebergLayoutValidator_AcceptsRealWorldManifestNames pins down the +// filename patterns we must accept across Iceberg engines. The original strict +// regex only covered Iceberg-internal naming (`{uuid}-m{n}.avro`, +// `snap-{n}-{n}-{uuid}.avro`) and rejected real manifests written by Flink and +// other writers, causing 403s during INSERT commits. The catch-all entries +// added alongside this test must keep these names valid. +func TestIcebergLayoutValidator_AcceptsRealWorldManifestNames(t *testing.T) { + cases := []struct { + name string + path string + }{ + { + "flink-style manifest (job-id + checkpoint + operator-id + counter)", + "metadata/02678a59b3d6b460ba392851d77155fc-1-cbc357ccb763df2852fee8c4fc7d55f2-00001.avro", + }, + { + "spark-style manifest with two uuids and -m suffix", + "metadata/00000000-0000-0000-0000-000000000000-m0.avro", + }, + { + "snapshot manifest list with iceberg-internal naming", + "metadata/snap-7234891234567890123-1-82e3eec4-3aee-414f-a444-94c03c641d20.avro", + }, + { + "versioned table metadata", + "metadata/v1.metadata.json", + }, + { + "uuid-named metadata json (newer iceberg)", + "metadata/82e3eec4-3aee-414f-a444-94c03c641d20.metadata.json", + }, + { + "flink-style metadata json with dashes and digits", + "metadata/02678a59-1-cbc357cc.metadata.json", + }, + { + "version hint", + "metadata/version-hint.text", + }, + { + "trino/iceberg stats file", + "metadata/table.stats", + }, + } + + v := NewIcebergLayoutValidator() + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if err := v.ValidateFilePath(tc.path); err != nil { + t.Errorf("expected %q to be accepted, got error: %v", tc.path, err) + } + }) + } +} + +// TestIcebergLayoutValidator_RejectsClearlyBadMetadataNames guards the +// catch-all from being too permissive — paths that look like attempts to +// escape the metadata layout or have forbidden file types must still fail. +func TestIcebergLayoutValidator_RejectsClearlyBadMetadataNames(t *testing.T) { + cases := []struct { + name string + path string + }{ + {"random extension", "metadata/random-file.txt"}, + {"executable masquerading as avro path", "metadata/evil.sh"}, + {"subdirectory under metadata is not allowed", "metadata/sub/file.avro"}, + {"top-level dir other than metadata or data", "garbage/file.avro"}, + {"catch-all anchors must reject trailing extension", "metadata/file.avro.txt"}, + } + + v := NewIcebergLayoutValidator() + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if err := v.ValidateFilePath(tc.path); err == nil { + t.Errorf("expected %q to be rejected, but it passed validation", tc.path) + } + }) + } +} + +// TestIcebergLayoutValidator_AcceptsRealWorldDataFiles is a sanity check that +// the patterns most engines actually emit for data files still pass. +func TestIcebergLayoutValidator_AcceptsRealWorldDataFiles(t *testing.T) { + cases := []string{ + "data/00000-0-ede83b82-08e1-40cd-af8a-6d83680a5194-00001.parquet", + "data/part-00000.parquet", + "data/some-file.orc", + "data/year=2026/month=05/00000-0-uuid.parquet", + } + v := NewIcebergLayoutValidator() + for _, p := range cases { + t.Run(p, func(t *testing.T) { + if err := v.ValidateFilePath(p); err != nil { + t.Errorf("expected %q to be accepted, got error: %v", p, err) + } + }) + } +}