Files
seaweedfs/weed/s3api/iceberg/handlers_commit.go
Chris Lu 761ec7da00 fix(iceberg): use dot separator for namespace paths instead of unit separator (#8960)
* fix(iceberg): use dot separator for namespace paths instead of unit separator

The Iceberg REST Catalog handler was using \x1F (unit separator) to join
multi-level namespaces when constructing S3 location and filer paths. The
S3 Tables storage layer uses "." (dot) as the namespace separator, causing
tables created via the Iceberg REST API to point to different paths than
where S3 Tables actually stores them.

Fixes #8959

* fix(iceberg): use dot separator in log messages for readable namespace output

* fix(iceberg): use path.Join for S3 location path segments

Use path.Join to construct the namespace/table path segments in fallback
S3 locations for robustness and consistency with handleCreateTable.

* test(iceberg): add multi-level namespace integration tests for Spark and Trino

Add regression tests for #8959 that create a two-level namespace (e.g.
"analytics.daily"), create a table under it, insert data, and query it
back. This exercises the dot-separated namespace path construction and
verifies that Spark/Trino can actually read the data at the S3 location
returned by the Iceberg REST API.

* fix(test): enable nested namespace in Trino Iceberg catalog config

Trino requires `iceberg.rest-catalog.nested-namespace-enabled=true` to
support multi-level namespaces. Without this, CREATE SCHEMA with a
dotted name fails with "Nested namespace is not enabled for this catalog".

* fix(test): parse Trino COUNT(*) output as integer instead of substring match

Avoids false matches from strings.Contains(output, "3") by parsing the
actual numeric result with strconv.Atoi and asserting equality.

* fix(test): use separate Trino config for nested namespace test

The nested-namespace-enabled=true setting in Trino changes how SHOW
SCHEMAS works, causing "Internal error" for all tests sharing that
catalog config. Move the flag to a dedicated config used only by
TestTrinoMultiLevelNamespace.

* fix(iceberg): support parent query parameter in ListNamespaces for nested namespaces

Add handling for the Iceberg REST spec's `parent` query parameter in
handleListNamespaces. When Trino has nested-namespace-enabled=true, it
sends `GET /v1/namespaces?parent=<ns>` to list child namespaces. The
parent value is decoded from the Iceberg unit separator format and
converted to a dot-separated prefix for the S3 Tables layer.

Also simplify TestTrinoMultiLevelNamespace to focus on namespace
operations (create, list, show tables) rather than data operations,
since Trino's REST catalog has a non-empty location check that conflicts
with server-side metadata creation.

* fix(test): expand Trino multi-level namespace test and merge config helpers

- Expand TestTrinoMultiLevelNamespace to create a table with explicit
  location, insert rows, query them back, and verify the S3 file path
  contains the dot-separated namespace (not \x1F). This ensures the
  original #8959 bug would be caught by the Trino integration test.
- Merge writeTrinoConfig and writeTrinoNestedNamespaceConfig into a
  single parameterized function using functional options.
2026-04-07 12:21:22 -07:00

333 lines
12 KiB
Go

package iceberg
import (
"encoding/json"
"errors"
"fmt"
"math/rand/v2"
"net/http"
"path"
"strings"
"time"
"github.com/apache/iceberg-go/table"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
)
// handleUpdateTable commits updates to a table.
// Implements the Iceberg REST Catalog commit protocol.
func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
namespace := parseNamespace(vars["namespace"])
tableName := vars["table"]
if len(namespace) == 0 || tableName == "" {
writeError(w, http.StatusBadRequest, "BadRequestException", "Namespace and table name are required")
return
}
bucketName := getBucketFromPrefix(r)
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
// Parse commit request and keep statistics updates separate because iceberg-go v0.4.0
// does not decode set/remove-statistics update actions yet.
var raw struct {
Identifier *TableIdentifier `json:"identifier,omitempty"`
Requirements json.RawMessage `json:"requirements"`
Updates []json.RawMessage `json:"updates"`
}
if err := json.NewDecoder(r.Body).Decode(&raw); err != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid request body: "+err.Error())
return
}
var req CommitTableRequest
req.Identifier = raw.Identifier
var statisticsUpdates []statisticsUpdate
if len(raw.Requirements) > 0 {
if err := json.Unmarshal(raw.Requirements, &req.Requirements); err != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid requirements: "+err.Error())
return
}
}
if len(raw.Updates) > 0 {
var err error
req.Updates, statisticsUpdates, err = parseCommitUpdates(raw.Updates)
if err != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid updates: "+err.Error())
return
}
}
maxCommitAttempts := 3
generatedLegacyUUID := uuid.New()
stageCreateEnabled := isStageCreateEnabled()
for attempt := 1; attempt <= maxCommitAttempts; attempt++ {
getReq := &s3tables.GetTableRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
Name: tableName,
}
var getResp s3tables.GetTableResponse
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName)
})
if err != nil {
if isS3TablesNotFound(err) {
location := fmt.Sprintf("s3://%s/%s", bucketName, path.Join(flattenNamespacePath(namespace), tableName))
tableUUID := generatedLegacyUUID
baseMetadataVersion := 0
baseMetadataLocation := ""
var baseMetadata table.Metadata
var latestMarker *stageCreateMarker
if stageCreateEnabled {
var markerErr error
latestMarker, markerErr = s.loadLatestStageCreateMarker(r.Context(), bucketName, namespace, tableName)
if markerErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to load stage-create marker: "+markerErr.Error())
return
}
}
if latestMarker != nil {
if latestMarker.Location != "" {
location = strings.TrimSuffix(latestMarker.Location, "/")
}
if latestMarker.TableUUID != "" {
if parsedUUID, parseErr := uuid.Parse(latestMarker.TableUUID); parseErr == nil {
tableUUID = parsedUUID
}
}
stagedMetadataLocation := latestMarker.StagedMetadataLocation
if stagedMetadataLocation == "" {
stagedMetadataLocation = fmt.Sprintf("%s/metadata/v1.metadata.json", strings.TrimSuffix(location, "/"))
}
stagedLocation := tableLocationFromMetadataLocation(stagedMetadataLocation)
stagedFileName := path.Base(stagedMetadataLocation)
stagedBucket, stagedPath, parseLocationErr := parseS3Location(stagedLocation)
if parseLocationErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid staged metadata location: "+parseLocationErr.Error())
return
}
stagedMetadataBytes, loadErr := s.loadMetadataFile(r.Context(), stagedBucket, stagedPath, stagedFileName)
if loadErr != nil {
if !errors.Is(loadErr, filer_pb.ErrNotFound) {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to load staged metadata: "+loadErr.Error())
return
}
} else if len(stagedMetadataBytes) > 0 {
stagedMetadata, parseErr := table.ParseMetadataBytes(stagedMetadataBytes)
if parseErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse staged metadata: "+parseErr.Error())
return
}
// Staged metadata is only a template for table creation; commit starts from version 1.
baseMetadata = stagedMetadata
baseMetadataLocation = ""
baseMetadataVersion = 0
if stagedMetadata.TableUUID() != uuid.Nil {
tableUUID = stagedMetadata.TableUUID()
}
}
}
hasAssertCreate := hasAssertCreateRequirement(req.Requirements)
hasStagedTemplate := baseMetadata != nil
if !(stageCreateEnabled && (hasAssertCreate || hasStagedTemplate)) {
writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName))
return
}
for _, requirement := range req.Requirements {
validateAgainst := table.Metadata(nil)
if hasStagedTemplate && requirement.GetType() != requirementAssertCreate {
validateAgainst = baseMetadata
}
if requirementErr := requirement.Validate(validateAgainst); requirementErr != nil {
writeError(w, http.StatusConflict, "CommitFailedException", "Requirement failed: "+requirementErr.Error())
return
}
}
if baseMetadata == nil {
baseMetadata = newTableMetadata(tableUUID, location, nil, nil, nil, nil)
if baseMetadata == nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata")
return
}
}
result, reqErr := s.finalizeCreateOnCommit(r.Context(), createOnCommitInput{
bucketARN: bucketARN,
markerBucket: bucketName,
namespace: namespace,
tableName: tableName,
identityName: identityName,
location: location,
tableUUID: tableUUID,
baseMetadata: baseMetadata,
baseMetadataLoc: baseMetadataLocation,
baseMetadataVer: baseMetadataVersion,
updates: req.Updates,
statisticsUpdates: statisticsUpdates,
})
if reqErr != nil {
writeError(w, reqErr.status, reqErr.errType, reqErr.message)
return
}
writeJSON(w, http.StatusOK, result)
return
}
glog.V(1).Infof("Iceberg: CommitTable GetTable error: %v", err)
writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error())
return
}
location := tableLocationFromMetadataLocation(getResp.MetadataLocation)
if location == "" {
location = fmt.Sprintf("s3://%s/%s", bucketName, path.Join(flattenNamespacePath(namespace), tableName))
}
tableUUID := uuid.Nil
if getResp.Metadata != nil && getResp.Metadata.Iceberg != nil && getResp.Metadata.Iceberg.TableUUID != "" {
if parsed, parseErr := uuid.Parse(getResp.Metadata.Iceberg.TableUUID); parseErr == nil {
tableUUID = parsed
}
}
if tableUUID == uuid.Nil {
tableUUID = generatedLegacyUUID
}
var currentMetadata table.Metadata
if getResp.Metadata != nil && len(getResp.Metadata.FullMetadata) > 0 {
currentMetadata, err = table.ParseMetadataBytes(getResp.Metadata.FullMetadata)
if err != nil {
glog.Errorf("Iceberg: Failed to parse current metadata for %s: %v", tableName, err)
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse current metadata")
return
}
} else {
currentMetadata = newTableMetadata(tableUUID, location, nil, nil, nil, nil)
}
if currentMetadata == nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata")
return
}
for _, requirement := range req.Requirements {
if err := requirement.Validate(currentMetadata); err != nil {
writeError(w, http.StatusConflict, "CommitFailedException", "Requirement failed: "+err.Error())
return
}
}
builder, err := table.MetadataBuilderFromBase(currentMetadata, getResp.MetadataLocation)
if err != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+err.Error())
return
}
for _, update := range req.Updates {
if err := update.Apply(builder); err != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+err.Error())
return
}
}
newMetadata, err := builder.Build()
if err != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+err.Error())
return
}
metadataVersion := getResp.MetadataVersion + 1
metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion)
newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName)
metadataBytes, err := json.Marshal(newMetadata)
if err != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+err.Error())
return
}
// iceberg-go does not currently support set/remove-statistics updates in MetadataBuilder.
// Patch the encoded metadata JSON and parse it back to keep the response object consistent.
metadataBytes, err = applyStatisticsUpdates(metadataBytes, statisticsUpdates)
if err != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+err.Error())
return
}
newMetadata, err = table.ParseMetadataBytes(metadataBytes)
if err != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+err.Error())
return
}
metadataBucket, metadataPath, err := parseS3Location(location)
if err != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error())
return
}
if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error())
return
}
updateReq := &s3tables.UpdateTableRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
Name: tableName,
VersionToken: getResp.VersionToken,
Metadata: &s3tables.TableMetadata{
Iceberg: &s3tables.IcebergMetadata{
TableUUID: tableUUID.String(),
},
FullMetadata: metadataBytes,
},
MetadataVersion: metadataVersion,
MetadataLocation: newMetadataLocation,
}
err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, identityName)
})
if err == nil {
result := CommitTableResponse{
MetadataLocation: newMetadataLocation,
Metadata: newMetadata,
}
writeJSON(w, http.StatusOK, result)
return
}
if isS3TablesConflict(err) {
if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s on conflict: %v", newMetadataLocation, cleanupErr)
}
if attempt < maxCommitAttempts {
glog.V(1).Infof("Iceberg: CommitTable conflict for %s (attempt %d/%d), retrying", tableName, attempt, maxCommitAttempts)
jitter := time.Duration(rand.Int64N(int64(25 * time.Millisecond)))
time.Sleep(time.Duration(50*attempt)*time.Millisecond + jitter)
continue
}
writeError(w, http.StatusConflict, "CommitFailedException", "Version token mismatch")
return
}
if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after update failure: %v", newMetadataLocation, cleanupErr)
}
glog.Errorf("Iceberg: CommitTable UpdateTable error: %v", err)
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table update: "+err.Error())
return
}
}