Files
seaweedfs/weed/s3api/iceberg/commit_helpers.go
Chris Lu 761ec7da00 fix(iceberg): use dot separator for namespace paths instead of unit separator (#8960)
* fix(iceberg): use dot separator for namespace paths instead of unit separator

The Iceberg REST Catalog handler was using \x1F (unit separator) to join
multi-level namespaces when constructing S3 location and filer paths. The
S3 Tables storage layer uses "." (dot) as the namespace separator, causing
tables created via the Iceberg REST API to point to different paths than
where S3 Tables actually stores them.

Fixes #8959

* fix(iceberg): use dot separator in log messages for readable namespace output

* fix(iceberg): use path.Join for S3 location path segments

Use path.Join to construct the namespace/table path segments in fallback
S3 locations for robustness and consistency with handleCreateTable.

* test(iceberg): add multi-level namespace integration tests for Spark and Trino

Add regression tests for #8959 that create a two-level namespace (e.g.
"analytics.daily"), create a table under it, insert data, and query it
back. This exercises the dot-separated namespace path construction and
verifies that Spark/Trino can actually read the data at the S3 location
returned by the Iceberg REST API.

* fix(test): enable nested namespace in Trino Iceberg catalog config

Trino requires `iceberg.rest-catalog.nested-namespace-enabled=true` to
support multi-level namespaces. Without this, CREATE SCHEMA with a
dotted name fails with "Nested namespace is not enabled for this catalog".

* fix(test): parse Trino COUNT(*) output as integer instead of substring match

Avoids false matches from strings.Contains(output, "3") by parsing the
actual numeric result with strconv.Atoi and asserting equality.

* fix(test): use separate Trino config for nested namespace test

The nested-namespace-enabled=true setting in Trino changes how SHOW
SCHEMAS works, causing "Internal error" for all tests sharing that
catalog config. Move the flag to a dedicated config used only by
TestTrinoMultiLevelNamespace.

* fix(iceberg): support parent query parameter in ListNamespaces for nested namespaces

Add handling for the Iceberg REST spec's `parent` query parameter in
handleListNamespaces. When Trino has nested-namespace-enabled=true, it
sends `GET /v1/namespaces?parent=<ns>` to list child namespaces. The
parent value is decoded from the Iceberg unit separator format and
converted to a dot-separated prefix for the S3 Tables layer.

Also simplify TestTrinoMultiLevelNamespace to focus on namespace
operations (create, list, show tables) rather than data operations,
since Trino's REST catalog has a non-empty location check that conflicts
with server-side metadata creation.

* fix(test): expand Trino multi-level namespace test and merge config helpers

- Expand TestTrinoMultiLevelNamespace to create a table with explicit
  location, insert rows, query them back, and verify the S3 file path
  contains the dot-separated namespace (not \x1F). This ensures the
  original #8959 bug would be caught by the Trino integration test.
- Merge writeTrinoConfig and writeTrinoNestedNamespaceConfig into a
  single parameterized function using functional options.
2026-04-07 12:21:22 -07:00

211 lines
6.5 KiB
Go

package iceberg
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"github.com/apache/iceberg-go/table"
"github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
)
const requirementAssertCreate = "assert-create"
type icebergRequestError struct {
status int
errType string
message string
}
type createOnCommitInput struct {
bucketARN string
markerBucket string
namespace []string
tableName string
identityName string
location string
tableUUID uuid.UUID
baseMetadata table.Metadata
baseMetadataLoc string
baseMetadataVer int
updates table.Updates
statisticsUpdates []statisticsUpdate
}
func isS3TablesConflict(err error) bool {
if err == nil {
return false
}
if errors.Is(err, s3tables.ErrVersionTokenMismatch) {
return true
}
var tableErr *s3tables.S3TablesError
return errors.As(err, &tableErr) && tableErr.Type == s3tables.ErrCodeConflict
}
func isS3TablesNotFound(err error) bool {
if err == nil {
return false
}
if strings.Contains(strings.ToLower(err.Error()), "not found") {
return true
}
var tableErr *s3tables.S3TablesError
return errors.As(err, &tableErr) &&
(tableErr.Type == s3tables.ErrCodeNoSuchTable || tableErr.Type == s3tables.ErrCodeNoSuchNamespace || strings.Contains(strings.ToLower(tableErr.Message), "not found"))
}
func hasAssertCreateRequirement(requirements table.Requirements) bool {
for _, requirement := range requirements {
if requirement.GetType() == requirementAssertCreate {
return true
}
}
return false
}
func isS3TablesAlreadyExists(err error) bool {
if err == nil {
return false
}
if strings.Contains(strings.ToLower(err.Error()), "already exists") {
return true
}
var tableErr *s3tables.S3TablesError
return errors.As(err, &tableErr) &&
(tableErr.Type == s3tables.ErrCodeTableAlreadyExists || tableErr.Type == s3tables.ErrCodeNamespaceAlreadyExists || strings.Contains(strings.ToLower(tableErr.Message), "already exists"))
}
func (s *Server) finalizeCreateOnCommit(ctx context.Context, input createOnCommitInput) (*CommitTableResponse, *icebergRequestError) {
builder, err := table.MetadataBuilderFromBase(input.baseMetadata, input.baseMetadataLoc)
if err != nil {
return nil, &icebergRequestError{
status: http.StatusInternalServerError,
errType: "InternalServerError",
message: "Failed to create metadata builder: " + err.Error(),
}
}
for _, update := range input.updates {
if err := update.Apply(builder); err != nil {
return nil, &icebergRequestError{
status: http.StatusBadRequest,
errType: "BadRequestException",
message: "Failed to apply update: " + err.Error(),
}
}
}
newMetadata, err := builder.Build()
if err != nil {
return nil, &icebergRequestError{
status: http.StatusBadRequest,
errType: "BadRequestException",
message: "Failed to build new metadata: " + err.Error(),
}
}
metadataVersion := input.baseMetadataVer + 1
if metadataVersion <= 0 {
metadataVersion = 1
}
metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion)
newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(input.location, "/"), metadataFileName)
metadataBytes, err := json.Marshal(newMetadata)
if err != nil {
return nil, &icebergRequestError{
status: http.StatusInternalServerError,
errType: "InternalServerError",
message: "Failed to serialize metadata: " + err.Error(),
}
}
metadataBytes, err = applyStatisticsUpdates(metadataBytes, input.statisticsUpdates)
if err != nil {
return nil, &icebergRequestError{
status: http.StatusBadRequest,
errType: "BadRequestException",
message: "Failed to apply statistics updates: " + err.Error(),
}
}
newMetadata, err = table.ParseMetadataBytes(metadataBytes)
if err != nil {
return nil, &icebergRequestError{
status: http.StatusInternalServerError,
errType: "InternalServerError",
message: "Failed to parse committed metadata: " + err.Error(),
}
}
metadataBucket, metadataPath, err := parseS3Location(input.location)
if err != nil {
return nil, &icebergRequestError{
status: http.StatusInternalServerError,
errType: "InternalServerError",
message: "Invalid table location: " + err.Error(),
}
}
if err := s.saveMetadataFile(ctx, metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil {
return nil, &icebergRequestError{
status: http.StatusInternalServerError,
errType: "InternalServerError",
message: "Failed to save metadata file: " + err.Error(),
}
}
createReq := &s3tables.CreateTableRequest{
TableBucketARN: input.bucketARN,
Namespace: input.namespace,
Name: input.tableName,
Format: "ICEBERG",
Metadata: &s3tables.TableMetadata{
Iceberg: &s3tables.IcebergMetadata{
TableUUID: input.tableUUID.String(),
},
FullMetadata: metadataBytes,
},
MetadataVersion: metadataVersion,
MetadataLocation: newMetadataLocation,
}
createErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(ctx, mgrClient, "CreateTable", createReq, nil, input.identityName)
})
if createErr != nil {
if cleanupErr := s.deleteMetadataFile(ctx, metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after create-on-commit failure: %v", newMetadataLocation, cleanupErr)
}
if isS3TablesConflict(createErr) || isS3TablesAlreadyExists(createErr) {
return nil, &icebergRequestError{
status: http.StatusConflict,
errType: "CommitFailedException",
message: "Table was created concurrently",
}
}
glog.Errorf("Iceberg: CommitTable CreateTable error: %v", createErr)
return nil, &icebergRequestError{
status: http.StatusInternalServerError,
errType: "InternalServerError",
message: "Failed to commit table creation: " + createErr.Error(),
}
}
markerBucket := input.markerBucket
if markerBucket == "" {
markerBucket = metadataBucket
}
if markerErr := s.deleteStageCreateMarkers(ctx, markerBucket, input.namespace, input.tableName); markerErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after finalize: %v", flattenNamespacePath(input.namespace), input.tableName, markerErr)
}
return &CommitTableResponse{
MetadataLocation: newMetadataLocation,
Metadata: newMetadata,
}, nil
}