mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-23 18:21:28 +00:00
* fix(iceberg): use dot separator for namespace paths instead of unit separator The Iceberg REST Catalog handler was using \x1F (unit separator) to join multi-level namespaces when constructing S3 location and filer paths. The S3 Tables storage layer uses "." (dot) as the namespace separator, causing tables created via the Iceberg REST API to point to different paths than where S3 Tables actually stores them. Fixes #8959 * fix(iceberg): use dot separator in log messages for readable namespace output * fix(iceberg): use path.Join for S3 location path segments Use path.Join to construct the namespace/table path segments in fallback S3 locations for robustness and consistency with handleCreateTable. * test(iceberg): add multi-level namespace integration tests for Spark and Trino Add regression tests for #8959 that create a two-level namespace (e.g. "analytics.daily"), create a table under it, insert data, and query it back. This exercises the dot-separated namespace path construction and verifies that Spark/Trino can actually read the data at the S3 location returned by the Iceberg REST API. * fix(test): enable nested namespace in Trino Iceberg catalog config Trino requires `iceberg.rest-catalog.nested-namespace-enabled=true` to support multi-level namespaces. Without this, CREATE SCHEMA with a dotted name fails with "Nested namespace is not enabled for this catalog". * fix(test): parse Trino COUNT(*) output as integer instead of substring match Avoids false matches from strings.Contains(output, "3") by parsing the actual numeric result with strconv.Atoi and asserting equality. * fix(test): use separate Trino config for nested namespace test The nested-namespace-enabled=true setting in Trino changes how SHOW SCHEMAS works, causing "Internal error" for all tests sharing that catalog config. Move the flag to a dedicated config used only by TestTrinoMultiLevelNamespace. * fix(iceberg): support parent query parameter in ListNamespaces for nested namespaces Add handling for the Iceberg REST spec's `parent` query parameter in handleListNamespaces. When Trino has nested-namespace-enabled=true, it sends `GET /v1/namespaces?parent=<ns>` to list child namespaces. The parent value is decoded from the Iceberg unit separator format and converted to a dot-separated prefix for the S3 Tables layer. Also simplify TestTrinoMultiLevelNamespace to focus on namespace operations (create, list, show tables) rather than data operations, since Trino's REST catalog has a non-empty location check that conflicts with server-side metadata creation. * fix(test): expand Trino multi-level namespace test and merge config helpers - Expand TestTrinoMultiLevelNamespace to create a table with explicit location, insert rows, query them back, and verify the S3 file path contains the dot-separated namespace (not \x1F). This ensures the original #8959 bug would be caught by the Trino integration test. - Merge writeTrinoConfig and writeTrinoNestedNamespaceConfig into a single parameterized function using functional options.
333 lines
12 KiB
Go
333 lines
12 KiB
Go
package iceberg
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand/v2"
|
|
"net/http"
|
|
"path"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/apache/iceberg-go/table"
|
|
"github.com/google/uuid"
|
|
"github.com/gorilla/mux"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
|
)
|
|
|
|
// handleUpdateTable commits updates to a table.
|
|
// Implements the Iceberg REST Catalog commit protocol.
|
|
func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
namespace := parseNamespace(vars["namespace"])
|
|
tableName := vars["table"]
|
|
|
|
if len(namespace) == 0 || tableName == "" {
|
|
writeError(w, http.StatusBadRequest, "BadRequestException", "Namespace and table name are required")
|
|
return
|
|
}
|
|
|
|
bucketName := getBucketFromPrefix(r)
|
|
bucketARN := buildTableBucketARN(bucketName)
|
|
|
|
// Extract identity from context
|
|
identityName := s3_constants.GetIdentityNameFromContext(r)
|
|
|
|
// Parse commit request and keep statistics updates separate because iceberg-go v0.4.0
|
|
// does not decode set/remove-statistics update actions yet.
|
|
var raw struct {
|
|
Identifier *TableIdentifier `json:"identifier,omitempty"`
|
|
Requirements json.RawMessage `json:"requirements"`
|
|
Updates []json.RawMessage `json:"updates"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&raw); err != nil {
|
|
writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid request body: "+err.Error())
|
|
return
|
|
}
|
|
|
|
var req CommitTableRequest
|
|
req.Identifier = raw.Identifier
|
|
var statisticsUpdates []statisticsUpdate
|
|
if len(raw.Requirements) > 0 {
|
|
if err := json.Unmarshal(raw.Requirements, &req.Requirements); err != nil {
|
|
writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid requirements: "+err.Error())
|
|
return
|
|
}
|
|
}
|
|
if len(raw.Updates) > 0 {
|
|
var err error
|
|
req.Updates, statisticsUpdates, err = parseCommitUpdates(raw.Updates)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid updates: "+err.Error())
|
|
return
|
|
}
|
|
}
|
|
maxCommitAttempts := 3
|
|
generatedLegacyUUID := uuid.New()
|
|
stageCreateEnabled := isStageCreateEnabled()
|
|
for attempt := 1; attempt <= maxCommitAttempts; attempt++ {
|
|
getReq := &s3tables.GetTableRequest{
|
|
TableBucketARN: bucketARN,
|
|
Namespace: namespace,
|
|
Name: tableName,
|
|
}
|
|
var getResp s3tables.GetTableResponse
|
|
|
|
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
mgrClient := s3tables.NewManagerClient(client)
|
|
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName)
|
|
})
|
|
if err != nil {
|
|
if isS3TablesNotFound(err) {
|
|
location := fmt.Sprintf("s3://%s/%s", bucketName, path.Join(flattenNamespacePath(namespace), tableName))
|
|
tableUUID := generatedLegacyUUID
|
|
baseMetadataVersion := 0
|
|
baseMetadataLocation := ""
|
|
var baseMetadata table.Metadata
|
|
|
|
var latestMarker *stageCreateMarker
|
|
if stageCreateEnabled {
|
|
var markerErr error
|
|
latestMarker, markerErr = s.loadLatestStageCreateMarker(r.Context(), bucketName, namespace, tableName)
|
|
if markerErr != nil {
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to load stage-create marker: "+markerErr.Error())
|
|
return
|
|
}
|
|
}
|
|
if latestMarker != nil {
|
|
if latestMarker.Location != "" {
|
|
location = strings.TrimSuffix(latestMarker.Location, "/")
|
|
}
|
|
if latestMarker.TableUUID != "" {
|
|
if parsedUUID, parseErr := uuid.Parse(latestMarker.TableUUID); parseErr == nil {
|
|
tableUUID = parsedUUID
|
|
}
|
|
}
|
|
|
|
stagedMetadataLocation := latestMarker.StagedMetadataLocation
|
|
if stagedMetadataLocation == "" {
|
|
stagedMetadataLocation = fmt.Sprintf("%s/metadata/v1.metadata.json", strings.TrimSuffix(location, "/"))
|
|
}
|
|
stagedLocation := tableLocationFromMetadataLocation(stagedMetadataLocation)
|
|
stagedFileName := path.Base(stagedMetadataLocation)
|
|
stagedBucket, stagedPath, parseLocationErr := parseS3Location(stagedLocation)
|
|
if parseLocationErr != nil {
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid staged metadata location: "+parseLocationErr.Error())
|
|
return
|
|
}
|
|
stagedMetadataBytes, loadErr := s.loadMetadataFile(r.Context(), stagedBucket, stagedPath, stagedFileName)
|
|
if loadErr != nil {
|
|
if !errors.Is(loadErr, filer_pb.ErrNotFound) {
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to load staged metadata: "+loadErr.Error())
|
|
return
|
|
}
|
|
} else if len(stagedMetadataBytes) > 0 {
|
|
stagedMetadata, parseErr := table.ParseMetadataBytes(stagedMetadataBytes)
|
|
if parseErr != nil {
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse staged metadata: "+parseErr.Error())
|
|
return
|
|
}
|
|
// Staged metadata is only a template for table creation; commit starts from version 1.
|
|
baseMetadata = stagedMetadata
|
|
baseMetadataLocation = ""
|
|
baseMetadataVersion = 0
|
|
if stagedMetadata.TableUUID() != uuid.Nil {
|
|
tableUUID = stagedMetadata.TableUUID()
|
|
}
|
|
}
|
|
}
|
|
|
|
hasAssertCreate := hasAssertCreateRequirement(req.Requirements)
|
|
hasStagedTemplate := baseMetadata != nil
|
|
if !(stageCreateEnabled && (hasAssertCreate || hasStagedTemplate)) {
|
|
writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName))
|
|
return
|
|
}
|
|
|
|
for _, requirement := range req.Requirements {
|
|
validateAgainst := table.Metadata(nil)
|
|
if hasStagedTemplate && requirement.GetType() != requirementAssertCreate {
|
|
validateAgainst = baseMetadata
|
|
}
|
|
if requirementErr := requirement.Validate(validateAgainst); requirementErr != nil {
|
|
writeError(w, http.StatusConflict, "CommitFailedException", "Requirement failed: "+requirementErr.Error())
|
|
return
|
|
}
|
|
}
|
|
|
|
if baseMetadata == nil {
|
|
baseMetadata = newTableMetadata(tableUUID, location, nil, nil, nil, nil)
|
|
if baseMetadata == nil {
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata")
|
|
return
|
|
}
|
|
}
|
|
|
|
result, reqErr := s.finalizeCreateOnCommit(r.Context(), createOnCommitInput{
|
|
bucketARN: bucketARN,
|
|
markerBucket: bucketName,
|
|
namespace: namespace,
|
|
tableName: tableName,
|
|
identityName: identityName,
|
|
location: location,
|
|
tableUUID: tableUUID,
|
|
baseMetadata: baseMetadata,
|
|
baseMetadataLoc: baseMetadataLocation,
|
|
baseMetadataVer: baseMetadataVersion,
|
|
updates: req.Updates,
|
|
statisticsUpdates: statisticsUpdates,
|
|
})
|
|
if reqErr != nil {
|
|
writeError(w, reqErr.status, reqErr.errType, reqErr.message)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, result)
|
|
return
|
|
}
|
|
glog.V(1).Infof("Iceberg: CommitTable GetTable error: %v", err)
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error())
|
|
return
|
|
}
|
|
|
|
location := tableLocationFromMetadataLocation(getResp.MetadataLocation)
|
|
if location == "" {
|
|
location = fmt.Sprintf("s3://%s/%s", bucketName, path.Join(flattenNamespacePath(namespace), tableName))
|
|
}
|
|
tableUUID := uuid.Nil
|
|
if getResp.Metadata != nil && getResp.Metadata.Iceberg != nil && getResp.Metadata.Iceberg.TableUUID != "" {
|
|
if parsed, parseErr := uuid.Parse(getResp.Metadata.Iceberg.TableUUID); parseErr == nil {
|
|
tableUUID = parsed
|
|
}
|
|
}
|
|
if tableUUID == uuid.Nil {
|
|
tableUUID = generatedLegacyUUID
|
|
}
|
|
|
|
var currentMetadata table.Metadata
|
|
if getResp.Metadata != nil && len(getResp.Metadata.FullMetadata) > 0 {
|
|
currentMetadata, err = table.ParseMetadataBytes(getResp.Metadata.FullMetadata)
|
|
if err != nil {
|
|
glog.Errorf("Iceberg: Failed to parse current metadata for %s: %v", tableName, err)
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse current metadata")
|
|
return
|
|
}
|
|
} else {
|
|
currentMetadata = newTableMetadata(tableUUID, location, nil, nil, nil, nil)
|
|
}
|
|
if currentMetadata == nil {
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata")
|
|
return
|
|
}
|
|
|
|
for _, requirement := range req.Requirements {
|
|
if err := requirement.Validate(currentMetadata); err != nil {
|
|
writeError(w, http.StatusConflict, "CommitFailedException", "Requirement failed: "+err.Error())
|
|
return
|
|
}
|
|
}
|
|
|
|
builder, err := table.MetadataBuilderFromBase(currentMetadata, getResp.MetadataLocation)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+err.Error())
|
|
return
|
|
}
|
|
for _, update := range req.Updates {
|
|
if err := update.Apply(builder); err != nil {
|
|
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+err.Error())
|
|
return
|
|
}
|
|
}
|
|
|
|
newMetadata, err := builder.Build()
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+err.Error())
|
|
return
|
|
}
|
|
|
|
metadataVersion := getResp.MetadataVersion + 1
|
|
metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion)
|
|
newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName)
|
|
|
|
metadataBytes, err := json.Marshal(newMetadata)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+err.Error())
|
|
return
|
|
}
|
|
// iceberg-go does not currently support set/remove-statistics updates in MetadataBuilder.
|
|
// Patch the encoded metadata JSON and parse it back to keep the response object consistent.
|
|
metadataBytes, err = applyStatisticsUpdates(metadataBytes, statisticsUpdates)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+err.Error())
|
|
return
|
|
}
|
|
newMetadata, err = table.ParseMetadataBytes(metadataBytes)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+err.Error())
|
|
return
|
|
}
|
|
|
|
metadataBucket, metadataPath, err := parseS3Location(location)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error())
|
|
return
|
|
}
|
|
if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil {
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error())
|
|
return
|
|
}
|
|
|
|
updateReq := &s3tables.UpdateTableRequest{
|
|
TableBucketARN: bucketARN,
|
|
Namespace: namespace,
|
|
Name: tableName,
|
|
VersionToken: getResp.VersionToken,
|
|
Metadata: &s3tables.TableMetadata{
|
|
Iceberg: &s3tables.IcebergMetadata{
|
|
TableUUID: tableUUID.String(),
|
|
},
|
|
FullMetadata: metadataBytes,
|
|
},
|
|
MetadataVersion: metadataVersion,
|
|
MetadataLocation: newMetadataLocation,
|
|
}
|
|
|
|
err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
mgrClient := s3tables.NewManagerClient(client)
|
|
return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, identityName)
|
|
})
|
|
if err == nil {
|
|
result := CommitTableResponse{
|
|
MetadataLocation: newMetadataLocation,
|
|
Metadata: newMetadata,
|
|
}
|
|
writeJSON(w, http.StatusOK, result)
|
|
return
|
|
}
|
|
|
|
if isS3TablesConflict(err) {
|
|
if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
|
|
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s on conflict: %v", newMetadataLocation, cleanupErr)
|
|
}
|
|
if attempt < maxCommitAttempts {
|
|
glog.V(1).Infof("Iceberg: CommitTable conflict for %s (attempt %d/%d), retrying", tableName, attempt, maxCommitAttempts)
|
|
jitter := time.Duration(rand.Int64N(int64(25 * time.Millisecond)))
|
|
time.Sleep(time.Duration(50*attempt)*time.Millisecond + jitter)
|
|
continue
|
|
}
|
|
writeError(w, http.StatusConflict, "CommitFailedException", "Version token mismatch")
|
|
return
|
|
}
|
|
|
|
if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
|
|
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after update failure: %v", newMetadataLocation, cleanupErr)
|
|
}
|
|
glog.Errorf("Iceberg: CommitTable UpdateTable error: %v", err)
|
|
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table update: "+err.Error())
|
|
return
|
|
}
|
|
}
|