mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-21 09:11:29 +00:00
* Enforce IAM for s3tables bucket creation * Prefer IAM path when policies exist * Ensure IAM enforcement honors default allow * address comments * Reused the precomputed principal when setting tableBucketMetadata.OwnerAccountID, avoiding the redundant getAccountID call. * get identity * fix * dedup * fix * comments * fix tests * update iam config * go fmt * fix ports * fix flags * mini clean shutdown * Revert "update iam config" This reverts commitca48fdbb0a. Revert "mini clean shutdown" This reverts commit9e17f6baff. Revert "fix flags" This reverts commite9e7b29d2f. Revert "go fmt" This reverts commitbd3241960b. * test/s3tables: share single weed mini per test package via TestMain Previously each top-level test function in the catalog and s3tables package started and stopped its own weed mini instance. This caused failures when a prior instance wasn't cleanly stopped before the next one started (port conflicts, leaked global state). Changes: - catalog/iceberg_catalog_test.go: introduce TestMain that starts one shared TestEnvironment (external weed binary) before all tests and tears it down after. All individual test functions now use sharedEnv. Added randomSuffix() for unique resource names across tests. - catalog/pyiceberg_test.go: updated to use sharedEnv instead of per-test environments. - catalog/pyiceberg_test_helpers.go -> pyiceberg_test_helpers_test.go: renamed to a _test.go file so it can access TestEnvironment which is defined in a test file. - table-buckets/setup.go: add package-level sharedCluster variable. - table-buckets/s3tables_integration_test.go: introduce TestMain that starts one shared TestCluster before all tests. TestS3TablesIntegration now uses sharedCluster. Extract startMiniClusterInDir (no *testing.T) for TestMain use. TestS3TablesCreateBucketIAMPolicy keeps its own cluster (different IAM config). Remove miniClusterMutex (no longer needed). Fix Stop() to not panic when t is nil." * delete * parse * default allow should work with anonymous * fix port * iceberg route The failures are from Iceberg REST using the default bucket warehouse when no prefix is provided. Your tests create random buckets, so /v1/namespaces was looking in warehouse and failing. I updated the tests to use the prefixed Iceberg routes (/v1/{bucket}/...) via a small helper. * test(s3tables): fix port conflicts and IAM ARN matching in integration tests - Pass -master.dir explicitly to prevent filer store directory collision between shared cluster and per-test clusters running in the same process - Pass -volume.port.public and -volume.publicUrl to prevent the global publicPort flag (mutated from 0 → concrete port by first cluster) from being reused by a second cluster, causing 'address already in use' - Remove the flag-reset loop in Stop() that reset global flag values while other goroutines were reading them (race → panic) - Fix IAM policy Resource ARN in TestS3TablesCreateBucketIAMPolicy to use wildcards (arn:aws:s3tables:*:*:bucket/<name>) because the handler generates ARNs with its own DefaultRegion (us-east-1) and principal name ('admin'), not the test constants testRegion/testAccountID
177 lines
5.6 KiB
Go
177 lines
5.6 KiB
Go
package s3tables
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
)
|
|
|
|
// handleCreateTableBucket creates a new table bucket
|
|
func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
|
var req CreateTableBucketRequest
|
|
if err := h.readRequestBody(r, &req); err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
// Validate bucket name
|
|
if err := validateBucketName(req.Name); err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
principal := h.getAccountID(r)
|
|
identityActions := getIdentityActions(r)
|
|
identityPolicyNames := getIdentityPolicyNames(r)
|
|
useIAM := h.shouldUseIAM(r, identityActions, identityPolicyNames)
|
|
useLegacy := !useIAM
|
|
if useIAM {
|
|
allowed, err := h.authorizeIAMAction(r, identityPolicyNames, "CreateTableBucket", h.generateTableBucketARN(principal, req.Name), fmt.Sprintf("arn:aws:s3:::%s", req.Name))
|
|
if err != nil {
|
|
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table buckets")
|
|
return NewAuthError("CreateTableBucket", principal, "not authorized to create table buckets")
|
|
}
|
|
if !allowed {
|
|
if h.defaultAllow && len(identityActions) == 0 && len(identityPolicyNames) == 0 {
|
|
useLegacy = true
|
|
} else {
|
|
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table buckets")
|
|
return NewAuthError("CreateTableBucket", principal, "not authorized to create table buckets")
|
|
}
|
|
}
|
|
}
|
|
if useLegacy {
|
|
owner := h.accountID
|
|
if owner == "" {
|
|
owner = DefaultAccountID
|
|
}
|
|
if !CheckPermissionWithContext("CreateTableBucket", principal, owner, "", "", &PolicyContext{
|
|
IdentityActions: identityActions,
|
|
DefaultAllow: h.defaultAllow,
|
|
}) {
|
|
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table buckets")
|
|
return NewAuthError("CreateTableBucket", principal, "not authorized to create table buckets")
|
|
}
|
|
}
|
|
|
|
bucketPath := GetTableBucketPath(req.Name)
|
|
|
|
// Check if bucket already exists and ensure no conflict with object store buckets
|
|
tableBucketExists := false
|
|
s3BucketExists := false
|
|
bucketsPath := s3_constants.DefaultBucketsPath
|
|
err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
resp, err := client.GetFilerConfiguration(r.Context(), &filer_pb.GetFilerConfigurationRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.DirBuckets != "" {
|
|
bucketsPath = resp.DirBuckets
|
|
}
|
|
entryResp, err := filer_pb.LookupEntry(r.Context(), client, &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: bucketsPath,
|
|
Name: req.Name,
|
|
})
|
|
if err != nil {
|
|
if !errors.Is(err, filer_pb.ErrNotFound) {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
if entryResp != nil && entryResp.Entry != nil {
|
|
if IsTableBucketEntry(entryResp.Entry) {
|
|
tableBucketExists = true
|
|
} else {
|
|
s3BucketExists = true
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
glog.Errorf("S3Tables: failed to check bucket existence: %v", err)
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to check bucket existence")
|
|
return err
|
|
}
|
|
|
|
if tableBucketExists {
|
|
h.writeError(w, http.StatusConflict, ErrCodeBucketAlreadyExists, fmt.Sprintf("table bucket %s already exists", req.Name))
|
|
return fmt.Errorf("bucket already exists")
|
|
}
|
|
if s3BucketExists {
|
|
h.writeError(w, http.StatusConflict, ErrCodeBucketAlreadyExists, fmt.Sprintf("bucket %s already exists", req.Name))
|
|
return fmt.Errorf("bucket already exists")
|
|
}
|
|
|
|
// Create the bucket directory and set metadata as extended attributes
|
|
now := time.Now()
|
|
metadata := &tableBucketMetadata{
|
|
Name: req.Name,
|
|
CreatedAt: now,
|
|
OwnerAccountID: principal,
|
|
}
|
|
|
|
metadataBytes, err := json.Marshal(metadata)
|
|
if err != nil {
|
|
glog.Errorf("S3Tables: failed to marshal metadata: %v", err)
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to marshal metadata")
|
|
return err
|
|
}
|
|
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
// Ensure root tables directory exists
|
|
if !h.entryExists(r.Context(), client, TablesPath) {
|
|
if err := h.createDirectory(r.Context(), client, TablesPath); err != nil {
|
|
return fmt.Errorf("failed to create root tables directory: %w", err)
|
|
}
|
|
}
|
|
|
|
// Create bucket directory
|
|
if err := h.createDirectory(r.Context(), client, bucketPath); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Mark as a table bucket
|
|
if err := h.setExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyTableBucket, []byte("true")); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Set metadata as extended attribute
|
|
if err := h.setExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata, metadataBytes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Set tags if provided
|
|
if len(req.Tags) > 0 {
|
|
tagsBytes, err := json.Marshal(req.Tags)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal tags: %w", err)
|
|
}
|
|
if err := h.setExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyTags, tagsBytes); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
glog.Errorf("S3Tables: failed to create table bucket %s: %v", req.Name, err)
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to create table bucket")
|
|
return err
|
|
}
|
|
|
|
resp := &CreateTableBucketResponse{
|
|
ARN: h.generateTableBucketARN(metadata.OwnerAccountID, req.Name),
|
|
}
|
|
|
|
h.writeJSON(w, http.StatusOK, resp)
|
|
return nil
|
|
}
|