mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* fix(iceberg): use dot separator for namespace paths instead of unit separator The Iceberg REST Catalog handler was using \x1F (unit separator) to join multi-level namespaces when constructing S3 location and filer paths. The S3 Tables storage layer uses "." (dot) as the namespace separator, causing tables created via the Iceberg REST API to point to different paths than where S3 Tables actually stores them. Fixes #8959 * fix(iceberg): use dot separator in log messages for readable namespace output * fix(iceberg): use path.Join for S3 location path segments Use path.Join to construct the namespace/table path segments in fallback S3 locations for robustness and consistency with handleCreateTable. * test(iceberg): add multi-level namespace integration tests for Spark and Trino Add regression tests for #8959 that create a two-level namespace (e.g. "analytics.daily"), create a table under it, insert data, and query it back. This exercises the dot-separated namespace path construction and verifies that Spark/Trino can actually read the data at the S3 location returned by the Iceberg REST API. * fix(test): enable nested namespace in Trino Iceberg catalog config Trino requires `iceberg.rest-catalog.nested-namespace-enabled=true` to support multi-level namespaces. Without this, CREATE SCHEMA with a dotted name fails with "Nested namespace is not enabled for this catalog". * fix(test): parse Trino COUNT(*) output as integer instead of substring match Avoids false matches from strings.Contains(output, "3") by parsing the actual numeric result with strconv.Atoi and asserting equality. * fix(test): use separate Trino config for nested namespace test The nested-namespace-enabled=true setting in Trino changes how SHOW SCHEMAS works, causing "Internal error" for all tests sharing that catalog config. Move the flag to a dedicated config used only by TestTrinoMultiLevelNamespace. * fix(iceberg): support parent query parameter in ListNamespaces for nested namespaces Add handling for the Iceberg REST spec's `parent` query parameter in handleListNamespaces. When Trino has nested-namespace-enabled=true, it sends `GET /v1/namespaces?parent=<ns>` to list child namespaces. The parent value is decoded from the Iceberg unit separator format and converted to a dot-separated prefix for the S3 Tables layer. Also simplify TestTrinoMultiLevelNamespace to focus on namespace operations (create, list, show tables) rather than data operations, since Trino's REST catalog has a non-empty location check that conflicts with server-side metadata creation. * fix(test): expand Trino multi-level namespace test and merge config helpers - Expand TestTrinoMultiLevelNamespace to create a table with explicit location, insert rows, query them back, and verify the S3 file path contains the dot-separated namespace (not \x1F). This ensures the original #8959 bug would be caught by the Trino integration test. - Merge writeTrinoConfig and writeTrinoNestedNamespaceConfig into a single parameterized function using functional options.
211 lines
6.5 KiB
Go
211 lines
6.5 KiB
Go
package iceberg
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
|
|
"github.com/apache/iceberg-go/table"
|
|
"github.com/google/uuid"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
|
)
|
|
|
|
const requirementAssertCreate = "assert-create"
|
|
|
|
type icebergRequestError struct {
|
|
status int
|
|
errType string
|
|
message string
|
|
}
|
|
|
|
type createOnCommitInput struct {
|
|
bucketARN string
|
|
markerBucket string
|
|
namespace []string
|
|
tableName string
|
|
identityName string
|
|
location string
|
|
tableUUID uuid.UUID
|
|
baseMetadata table.Metadata
|
|
baseMetadataLoc string
|
|
baseMetadataVer int
|
|
updates table.Updates
|
|
statisticsUpdates []statisticsUpdate
|
|
}
|
|
|
|
func isS3TablesConflict(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
if errors.Is(err, s3tables.ErrVersionTokenMismatch) {
|
|
return true
|
|
}
|
|
var tableErr *s3tables.S3TablesError
|
|
return errors.As(err, &tableErr) && tableErr.Type == s3tables.ErrCodeConflict
|
|
}
|
|
|
|
func isS3TablesNotFound(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
if strings.Contains(strings.ToLower(err.Error()), "not found") {
|
|
return true
|
|
}
|
|
var tableErr *s3tables.S3TablesError
|
|
return errors.As(err, &tableErr) &&
|
|
(tableErr.Type == s3tables.ErrCodeNoSuchTable || tableErr.Type == s3tables.ErrCodeNoSuchNamespace || strings.Contains(strings.ToLower(tableErr.Message), "not found"))
|
|
}
|
|
|
|
func hasAssertCreateRequirement(requirements table.Requirements) bool {
|
|
for _, requirement := range requirements {
|
|
if requirement.GetType() == requirementAssertCreate {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func isS3TablesAlreadyExists(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
if strings.Contains(strings.ToLower(err.Error()), "already exists") {
|
|
return true
|
|
}
|
|
var tableErr *s3tables.S3TablesError
|
|
return errors.As(err, &tableErr) &&
|
|
(tableErr.Type == s3tables.ErrCodeTableAlreadyExists || tableErr.Type == s3tables.ErrCodeNamespaceAlreadyExists || strings.Contains(strings.ToLower(tableErr.Message), "already exists"))
|
|
}
|
|
|
|
func (s *Server) finalizeCreateOnCommit(ctx context.Context, input createOnCommitInput) (*CommitTableResponse, *icebergRequestError) {
|
|
builder, err := table.MetadataBuilderFromBase(input.baseMetadata, input.baseMetadataLoc)
|
|
if err != nil {
|
|
return nil, &icebergRequestError{
|
|
status: http.StatusInternalServerError,
|
|
errType: "InternalServerError",
|
|
message: "Failed to create metadata builder: " + err.Error(),
|
|
}
|
|
}
|
|
for _, update := range input.updates {
|
|
if err := update.Apply(builder); err != nil {
|
|
return nil, &icebergRequestError{
|
|
status: http.StatusBadRequest,
|
|
errType: "BadRequestException",
|
|
message: "Failed to apply update: " + err.Error(),
|
|
}
|
|
}
|
|
}
|
|
|
|
newMetadata, err := builder.Build()
|
|
if err != nil {
|
|
return nil, &icebergRequestError{
|
|
status: http.StatusBadRequest,
|
|
errType: "BadRequestException",
|
|
message: "Failed to build new metadata: " + err.Error(),
|
|
}
|
|
}
|
|
|
|
metadataVersion := input.baseMetadataVer + 1
|
|
if metadataVersion <= 0 {
|
|
metadataVersion = 1
|
|
}
|
|
metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion)
|
|
newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(input.location, "/"), metadataFileName)
|
|
|
|
metadataBytes, err := json.Marshal(newMetadata)
|
|
if err != nil {
|
|
return nil, &icebergRequestError{
|
|
status: http.StatusInternalServerError,
|
|
errType: "InternalServerError",
|
|
message: "Failed to serialize metadata: " + err.Error(),
|
|
}
|
|
}
|
|
metadataBytes, err = applyStatisticsUpdates(metadataBytes, input.statisticsUpdates)
|
|
if err != nil {
|
|
return nil, &icebergRequestError{
|
|
status: http.StatusBadRequest,
|
|
errType: "BadRequestException",
|
|
message: "Failed to apply statistics updates: " + err.Error(),
|
|
}
|
|
}
|
|
newMetadata, err = table.ParseMetadataBytes(metadataBytes)
|
|
if err != nil {
|
|
return nil, &icebergRequestError{
|
|
status: http.StatusInternalServerError,
|
|
errType: "InternalServerError",
|
|
message: "Failed to parse committed metadata: " + err.Error(),
|
|
}
|
|
}
|
|
|
|
metadataBucket, metadataPath, err := parseS3Location(input.location)
|
|
if err != nil {
|
|
return nil, &icebergRequestError{
|
|
status: http.StatusInternalServerError,
|
|
errType: "InternalServerError",
|
|
message: "Invalid table location: " + err.Error(),
|
|
}
|
|
}
|
|
if err := s.saveMetadataFile(ctx, metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil {
|
|
return nil, &icebergRequestError{
|
|
status: http.StatusInternalServerError,
|
|
errType: "InternalServerError",
|
|
message: "Failed to save metadata file: " + err.Error(),
|
|
}
|
|
}
|
|
|
|
createReq := &s3tables.CreateTableRequest{
|
|
TableBucketARN: input.bucketARN,
|
|
Namespace: input.namespace,
|
|
Name: input.tableName,
|
|
Format: "ICEBERG",
|
|
Metadata: &s3tables.TableMetadata{
|
|
Iceberg: &s3tables.IcebergMetadata{
|
|
TableUUID: input.tableUUID.String(),
|
|
},
|
|
FullMetadata: metadataBytes,
|
|
},
|
|
MetadataVersion: metadataVersion,
|
|
MetadataLocation: newMetadataLocation,
|
|
}
|
|
createErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
mgrClient := s3tables.NewManagerClient(client)
|
|
return s.tablesManager.Execute(ctx, mgrClient, "CreateTable", createReq, nil, input.identityName)
|
|
})
|
|
if createErr != nil {
|
|
if cleanupErr := s.deleteMetadataFile(ctx, metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
|
|
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after create-on-commit failure: %v", newMetadataLocation, cleanupErr)
|
|
}
|
|
if isS3TablesConflict(createErr) || isS3TablesAlreadyExists(createErr) {
|
|
return nil, &icebergRequestError{
|
|
status: http.StatusConflict,
|
|
errType: "CommitFailedException",
|
|
message: "Table was created concurrently",
|
|
}
|
|
}
|
|
glog.Errorf("Iceberg: CommitTable CreateTable error: %v", createErr)
|
|
return nil, &icebergRequestError{
|
|
status: http.StatusInternalServerError,
|
|
errType: "InternalServerError",
|
|
message: "Failed to commit table creation: " + createErr.Error(),
|
|
}
|
|
}
|
|
|
|
markerBucket := input.markerBucket
|
|
if markerBucket == "" {
|
|
markerBucket = metadataBucket
|
|
}
|
|
if markerErr := s.deleteStageCreateMarkers(ctx, markerBucket, input.namespace, input.tableName); markerErr != nil {
|
|
glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after finalize: %v", flattenNamespacePath(input.namespace), input.tableName, markerErr)
|
|
}
|
|
|
|
return &CommitTableResponse{
|
|
MetadataLocation: newMetadataLocation,
|
|
Metadata: newMetadata,
|
|
}, nil
|
|
}
|