mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-29 21:20:21 +00:00
* s3,iceberg: reject `..`/NUL in URL path vars Both gateway routers use mux.NewRouter().SkipClean(true), so a request like `GET /bucket-A/../evil-bucket/key` survives routing as bucket=bucket-A, object=../evil-bucket/key. The captured key is then joined into a filer path; util.JoinPath / path.Join collapse the `..` server-side and the read lands in evil-bucket. With auth on, IAM still authorizes against bucket-A (the mux var), so policy is evaluated against the wrong target. Add a middleware on the S3 bucket subrouter and the Iceberg REST router that rejects any `.`, `..`, NUL, or — for single-segment slots — embedded slash in the captured path vars before any handler runs. NormalizeObjectKey already folds `\` to `/` and decoding happens in mux, so `%2e%2e` and `..\` are caught. * s3,iceberg: reject empty captured vars and empty namespace parts Comma-ok the var lookup so we only check captured slots, then treat an empty captured value as a rejection on its own — downstream path.Join would otherwise collapse it and let the next segment pick the bucket. For iceberg, also reject empty parts after splitting the namespace on \x1F so leading/trailing/consecutive unit separators (which parseNamespace silently folds out) don't let distinct route values collapse to the same parsed namespace. Register loggingMiddleware before validateRequestPath on the iceberg router so rejected requests still produce an audit-log line.
215 lines
8.9 KiB
Go
215 lines
8.9 KiB
Go
package iceberg
|
|
|
|
import (
|
|
"net/http"
|
|
|
|
"github.com/gorilla/mux"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
|
)
|
|
|
|
// FilerClient provides access to the filer for storage operations.
|
|
type FilerClient interface {
|
|
WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error
|
|
}
|
|
|
|
type S3Authenticator interface {
|
|
AuthenticateRequest(r *http.Request) (string, interface{}, s3err.ErrorCode)
|
|
DefaultAllow() bool
|
|
}
|
|
|
|
// CredentialValidator validates S3 access key / secret key pairs
|
|
// and provides credential lookup for OAuth token verification.
|
|
type CredentialValidator interface {
|
|
// ValidateS3Credential checks if the access key and secret key are valid.
|
|
// Returns the identity name and identity object on success.
|
|
ValidateS3Credential(accessKey, secretKey string) (identityName string, identity interface{}, err error)
|
|
// GetCredentialByAccessKey looks up a credential by access key.
|
|
// Returns the identity name, identity object, and secret key.
|
|
// Used for verifying Bearer tokens signed with a specific credential.
|
|
GetCredentialByAccessKey(accessKey string) (identityName string, identity interface{}, secretKey string, err error)
|
|
}
|
|
|
|
// Server implements the Iceberg REST Catalog API.
|
|
type Server struct {
|
|
filerClient FilerClient
|
|
tablesManager *s3tables.Manager
|
|
prefix string // optional prefix for routes
|
|
authenticator S3Authenticator
|
|
credentialValidator CredentialValidator
|
|
s3Endpoint string // http(s):// URL advertised in LoadTable FileIO config
|
|
}
|
|
|
|
// NewServer creates a new Iceberg REST Catalog server.
|
|
func NewServer(filerClient FilerClient, authenticator S3Authenticator) *Server {
|
|
manager := s3tables.NewManager()
|
|
return &Server{
|
|
filerClient: filerClient,
|
|
tablesManager: manager,
|
|
prefix: "",
|
|
authenticator: authenticator,
|
|
}
|
|
}
|
|
|
|
// SetCredentialValidator sets the credential validator for OAuth token support.
|
|
func (s *Server) SetCredentialValidator(cv CredentialValidator) {
|
|
s.credentialValidator = cv
|
|
}
|
|
|
|
// SetS3Endpoint configures the S3 endpoint URL to vend to clients as part of
|
|
// the LoadTable FileIO config, so they can read table data files directly
|
|
// without separately discovering the S3 API address. See issue #9103.
|
|
func (s *Server) SetS3Endpoint(endpoint string) {
|
|
s.s3Endpoint = endpoint
|
|
}
|
|
|
|
// RegisterRoutes registers Iceberg REST API routes on the provided router.
|
|
func (s *Server) RegisterRoutes(router *mux.Router) {
|
|
// Add middleware to log all requests/responses
|
|
router.Use(loggingMiddleware)
|
|
|
|
// Reject `..`/`.`/NUL in {prefix}/{namespace}/{table} vars before any
|
|
// handler runs. The router uses SkipClean(true), so traversal segments
|
|
// would otherwise reach path.Join in stage-marker / location builders.
|
|
// Registered after loggingMiddleware so rejected requests still get
|
|
// audit-logged.
|
|
router.Use(validateRequestPath)
|
|
|
|
// Configuration endpoint - no auth needed for config
|
|
router.HandleFunc("/v1/config", s.handleConfig).Methods(http.MethodGet)
|
|
|
|
// OAuth2 token endpoint - no auth needed (this IS the auth endpoint)
|
|
router.HandleFunc("/v1/oauth/tokens", s.handleOAuthTokens).Methods(http.MethodPost)
|
|
|
|
// Namespace endpoints - wrapped with Auth middleware
|
|
router.HandleFunc("/v1/namespaces", s.Auth(s.handleListNamespaces)).Methods(http.MethodGet)
|
|
router.HandleFunc("/v1/namespaces", s.Auth(s.handleCreateNamespace)).Methods(http.MethodPost)
|
|
router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleGetNamespace)).Methods(http.MethodGet)
|
|
router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleNamespaceExists)).Methods(http.MethodHead)
|
|
router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleDropNamespace)).Methods(http.MethodDelete)
|
|
|
|
// Table endpoints - wrapped with Auth middleware
|
|
router.HandleFunc("/v1/namespaces/{namespace}/tables", s.Auth(s.handleListTables)).Methods(http.MethodGet)
|
|
router.HandleFunc("/v1/namespaces/{namespace}/tables", s.Auth(s.handleCreateTable)).Methods(http.MethodPost)
|
|
router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleLoadTable)).Methods(http.MethodGet)
|
|
router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleTableExists)).Methods(http.MethodHead)
|
|
router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleDropTable)).Methods(http.MethodDelete)
|
|
router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleUpdateTable)).Methods(http.MethodPost)
|
|
|
|
// With prefix support - wrapped with Auth middleware
|
|
router.HandleFunc("/v1/{prefix}/namespaces", s.Auth(s.handleListNamespaces)).Methods(http.MethodGet)
|
|
router.HandleFunc("/v1/{prefix}/namespaces", s.Auth(s.handleCreateNamespace)).Methods(http.MethodPost)
|
|
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}", s.Auth(s.handleGetNamespace)).Methods(http.MethodGet)
|
|
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}", s.Auth(s.handleNamespaceExists)).Methods(http.MethodHead)
|
|
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}", s.Auth(s.handleDropNamespace)).Methods(http.MethodDelete)
|
|
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables", s.Auth(s.handleListTables)).Methods(http.MethodGet)
|
|
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables", s.Auth(s.handleCreateTable)).Methods(http.MethodPost)
|
|
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleLoadTable)).Methods(http.MethodGet)
|
|
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleTableExists)).Methods(http.MethodHead)
|
|
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleDropTable)).Methods(http.MethodDelete)
|
|
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleUpdateTable)).Methods(http.MethodPost)
|
|
|
|
// Catch-all for debugging
|
|
router.PathPrefix("/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
glog.V(2).Infof("Catch-all route hit: %s %s", r.Method, r.RequestURI)
|
|
writeError(w, http.StatusNotFound, "NotFound", "Path not found")
|
|
})
|
|
|
|
glog.V(2).Infof("Registered Iceberg REST Catalog routes")
|
|
}
|
|
|
|
func loggingMiddleware(next http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
glog.V(2).Infof("Iceberg REST request: %s %s from %s", r.Method, r.RequestURI, r.RemoteAddr)
|
|
|
|
// Log all headers for debugging
|
|
glog.V(2).Infof("Iceberg REST headers:")
|
|
for name, values := range r.Header {
|
|
for _, value := range values {
|
|
// Redact sensitive headers
|
|
if name == "Authorization" && len(value) > 20 {
|
|
glog.V(2).Infof(" %s: %s...%s", name, value[:20], value[len(value)-10:])
|
|
} else {
|
|
glog.V(2).Infof(" %s: %s", name, value)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create a response writer that captures the status code
|
|
wrapped := &responseWriter{ResponseWriter: w}
|
|
next.ServeHTTP(wrapped, r)
|
|
|
|
glog.V(2).Infof("Iceberg REST response: %s %s -> %d", r.Method, r.RequestURI, wrapped.statusCode)
|
|
})
|
|
}
|
|
|
|
type responseWriter struct {
|
|
http.ResponseWriter
|
|
statusCode int
|
|
}
|
|
|
|
func (w *responseWriter) WriteHeader(code int) {
|
|
w.statusCode = code
|
|
w.ResponseWriter.WriteHeader(code)
|
|
}
|
|
|
|
func (s *Server) Auth(handler http.HandlerFunc) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
// Try Bearer token authentication first (from OAuth2 flow)
|
|
if identityName, identity, ok := s.authenticateBearer(r); ok {
|
|
ctx := r.Context()
|
|
ctx = s3_constants.SetIdentityNameInContext(ctx, identityName)
|
|
if identity != nil {
|
|
ctx = s3_constants.SetIdentityInContext(ctx, identity)
|
|
}
|
|
r = r.WithContext(ctx)
|
|
handler(w, r)
|
|
return
|
|
}
|
|
|
|
if s.authenticator == nil {
|
|
writeError(w, http.StatusUnauthorized, "NotAuthorizedException", "Authentication required")
|
|
return
|
|
}
|
|
|
|
identityName, identity, errCode := s.authenticator.AuthenticateRequest(r)
|
|
if errCode != s3err.ErrNone {
|
|
// If authentication failed but DefaultAllow is enabled, proceed without identity
|
|
if s.authenticator.DefaultAllow() {
|
|
glog.V(2).Infof("Iceberg: AuthenticateRequest failed (%v), but DefaultAllow is true, proceeding", errCode)
|
|
} else {
|
|
apiErr := s3err.GetAPIError(errCode)
|
|
errorType := "RESTException"
|
|
switch apiErr.HTTPStatusCode {
|
|
case http.StatusForbidden:
|
|
errorType = "ForbiddenException"
|
|
case http.StatusUnauthorized:
|
|
errorType = "NotAuthorizedException"
|
|
case http.StatusBadRequest:
|
|
errorType = "BadRequestException"
|
|
case http.StatusInternalServerError:
|
|
errorType = "InternalServerError"
|
|
}
|
|
writeError(w, apiErr.HTTPStatusCode, errorType, apiErr.Description)
|
|
return
|
|
}
|
|
}
|
|
|
|
if identityName != "" || identity != nil {
|
|
ctx := r.Context()
|
|
if identityName != "" {
|
|
ctx = s3_constants.SetIdentityNameInContext(ctx, identityName)
|
|
}
|
|
if identity != nil {
|
|
ctx = s3_constants.SetIdentityInContext(ctx, identity)
|
|
}
|
|
r = r.WithContext(ctx)
|
|
}
|
|
|
|
handler(w, r)
|
|
}
|
|
}
|