mirror of
https://github.com/versity/versitygw.git
synced 2026-07-02 16:54:25 +00:00
4d391cabc8
Fixes #2180 Fixes #2181 Migrate the gateway from Fiber v2 to Fiber v3.3.0 and update the affected server, middleware, handler, controller, and test code for the new APIs. Replace the deprecated Fiber filesystem middleware used by the WebUI with the Fiber v3 static middleware, serving the embedded WebUI assets from an fs.Sub filesystem. Fix the request header limit handling regression by adding a temporary handler for Fiber v3/fasthttp small-buffer errors so oversized request headers return the expected regulated S3 error response. Fix the debuglogger panic by reworking the boxed key/value formatter used for debug request and response dumps. The formatter now handles long header keys and values without producing invalid wrap widths, negative padding, or out-of-range string slices.
485 lines
14 KiB
Go
485 lines
14 KiB
Go
// Copyright 2023 Versity Software
|
|
// This file is licensed under the Apache License, Version 2.0
|
|
// (the "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
package s3api
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gofiber/fiber/v3"
|
|
"github.com/gofiber/fiber/v3/middleware/logger"
|
|
"github.com/gofiber/fiber/v3/middleware/recover"
|
|
"github.com/valyala/fasthttp"
|
|
"github.com/versity/versitygw/auth"
|
|
"github.com/versity/versitygw/backend"
|
|
"github.com/versity/versitygw/debuglogger"
|
|
"github.com/versity/versitygw/metrics"
|
|
"github.com/versity/versitygw/s3api/controllers"
|
|
"github.com/versity/versitygw/s3api/middlewares"
|
|
"github.com/versity/versitygw/s3api/utils"
|
|
"github.com/versity/versitygw/s3err"
|
|
"github.com/versity/versitygw/s3event"
|
|
"github.com/versity/versitygw/s3log"
|
|
"github.com/versity/versitygw/webui"
|
|
)
|
|
|
|
const (
|
|
shutDownDuration = time.Second * 10
|
|
requestHeaderMaxSize = 8 * 1024
|
|
)
|
|
|
|
type S3ApiServer struct {
|
|
Router *S3ApiRouter
|
|
app *fiber.App
|
|
backend backend.Backend
|
|
CertStorage *utils.CertStorage
|
|
quiet bool
|
|
keepAlive bool
|
|
health string
|
|
maxConnections int
|
|
maxRequests int
|
|
webuiMountPrefix string
|
|
webuiSrvCfg *webui.ServerConfig
|
|
routes []routeMount
|
|
middlewares []middlewareMount
|
|
socketPerm os.FileMode
|
|
onListen func()
|
|
}
|
|
|
|
type routeMount struct {
|
|
method string
|
|
path string
|
|
handlers []fiber.Handler
|
|
}
|
|
|
|
type middlewareMount struct {
|
|
prefix string
|
|
handler fiber.Handler
|
|
}
|
|
|
|
func New(
|
|
be backend.Backend,
|
|
root middlewares.RootUserConfig,
|
|
region string,
|
|
iam auth.IAMService,
|
|
l s3log.AuditLogger,
|
|
adminLogger s3log.AuditLogger,
|
|
evs s3event.S3EventSender,
|
|
mm metrics.Manager,
|
|
opts ...Option,
|
|
) (*S3ApiServer, error) {
|
|
server := &S3ApiServer{
|
|
backend: be,
|
|
Router: &S3ApiRouter{
|
|
be: be,
|
|
iam: iam,
|
|
logger: l,
|
|
aLogger: adminLogger,
|
|
evs: evs,
|
|
mm: mm,
|
|
root: root,
|
|
region: region,
|
|
},
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(server)
|
|
}
|
|
|
|
app := fiber.New(fiber.Config{
|
|
AppName: "versitygw",
|
|
ServerHeader: "VERSITYGW",
|
|
StreamRequestBody: true,
|
|
DisableKeepalive: !server.keepAlive,
|
|
ErrorHandler: globalErrorHandler,
|
|
Concurrency: server.maxConnections,
|
|
// Sets buffer limit to read/parse incoming requests
|
|
// if the limit is reached, fiber/fasthttp will throw an error
|
|
// in the global error handler
|
|
ReadBufferSize: requestHeaderMaxSize,
|
|
})
|
|
installRequestHeaderLimitErrorHandler(app)
|
|
|
|
server.app = app
|
|
server.Router.app = app
|
|
|
|
// initialize the panic recovery middleware
|
|
app.Use("*", recover.New(
|
|
recover.Config{
|
|
EnableStackTrace: true,
|
|
StackTraceHandler: stackTraceHandler,
|
|
}))
|
|
|
|
// Logging middlewares
|
|
if !server.quiet {
|
|
app.Use("*", logger.New(logger.Config{
|
|
Format: "${time} | vgw | ${status} | ${latency} | ${ip} | ${method} | ${path} | ${error} | ${queryParams}\n",
|
|
}))
|
|
}
|
|
|
|
// initialize requestId middleware
|
|
app.Use("*", middlewares.RequestIDs())
|
|
|
|
// Set up health endpoint if specified
|
|
if server.health != "" {
|
|
app.Get(server.health, func(ctx fiber.Ctx) error {
|
|
return ctx.SendStatus(http.StatusOK)
|
|
})
|
|
}
|
|
|
|
// Set up WebUI on the S3 port if configured
|
|
if server.webuiSrvCfg != nil {
|
|
if err := webui.MountOn(app, server.webuiMountPrefix, server.webuiSrvCfg); err != nil {
|
|
return nil, fmt.Errorf("mount webui: %w", err)
|
|
}
|
|
}
|
|
|
|
// initialize total requests cap limiter middleware
|
|
app.Use("*", middlewares.RateLimiter(server.maxRequests, mm, l))
|
|
|
|
for _, route := range server.routes {
|
|
method, err := validateRouteMount(route)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
handlers := make([]any, len(route.handlers)-1)
|
|
for i := range handlers {
|
|
handlers[i] = route.handlers[i+1]
|
|
}
|
|
app.Add([]string{method}, route.path, route.handlers[0], handlers...)
|
|
}
|
|
|
|
for _, mount := range server.middlewares {
|
|
if err := validateMiddlewareMount(mount); err != nil {
|
|
return nil, err
|
|
}
|
|
app.Use(mount.prefix, mount.handler)
|
|
}
|
|
|
|
// initilaze the default value setter middleware
|
|
app.Use("*", middlewares.SetDefaultValues(root, region))
|
|
|
|
// initialize the 'DecodeURL' middleware which
|
|
// path unescapes the url
|
|
app.Use("*", controllers.WrapMiddleware(middlewares.DecodeURL, l, mm))
|
|
|
|
// initialize the debug logger in debug mode
|
|
if debuglogger.IsDebugEnabled() {
|
|
app.Use("*", middlewares.DebugLogger())
|
|
}
|
|
|
|
server.Router.Init()
|
|
|
|
return server, nil
|
|
}
|
|
|
|
func validateRouteMount(route routeMount) (string, error) {
|
|
if route.method == "" {
|
|
return "", fmt.Errorf("invalid route for path %q: empty method", route.path)
|
|
}
|
|
method := strings.ToUpper(route.method)
|
|
if !isStandardHTTPMethod(method) {
|
|
return "", fmt.Errorf("invalid HTTP method %q for route path %q: must be one of %s",
|
|
route.method, route.path, strings.Join(fiber.DefaultMethods, ", "))
|
|
}
|
|
if route.path == "" || route.path[0] != '/' {
|
|
return "", fmt.Errorf("invalid route path %q: must start with /", route.path)
|
|
}
|
|
if len(route.handlers) == 0 {
|
|
return "", fmt.Errorf("invalid route for %s %s: no handlers", method, route.path)
|
|
}
|
|
for i, handler := range route.handlers {
|
|
if handler == nil {
|
|
return "", fmt.Errorf("invalid route for %s %s: nil handler at index %d", method, route.path, i)
|
|
}
|
|
}
|
|
return method, nil
|
|
}
|
|
|
|
func isStandardHTTPMethod(method string) bool {
|
|
for _, valid := range fiber.DefaultMethods {
|
|
if method == valid {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func validateMiddlewareMount(mount middlewareMount) error {
|
|
if mount.prefix == "" || mount.prefix[0] != '/' {
|
|
return fmt.Errorf("invalid middleware prefix %q: must start with /", mount.prefix)
|
|
}
|
|
if mount.handler == nil {
|
|
return fmt.Errorf("invalid middleware for prefix %q: nil handler", mount.prefix)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Option sets various options for New()
|
|
type Option func(*S3ApiServer)
|
|
|
|
// WithTLS sets TLS Credentials
|
|
func WithTLS(cs *utils.CertStorage) Option {
|
|
return func(s *S3ApiServer) { s.CertStorage = cs }
|
|
}
|
|
|
|
// WithAdminServer runs admin endpoints with the gateway in the same network
|
|
func WithAdminServer() Option {
|
|
return func(s *S3ApiServer) { s.Router.WithAdmSrv = true }
|
|
}
|
|
|
|
// WithQuiet silences default logging output
|
|
func WithQuiet() Option {
|
|
return func(s *S3ApiServer) { s.quiet = true }
|
|
}
|
|
|
|
// WithHealth sets up a GET health endpoint
|
|
func WithHealth(health string) Option {
|
|
return func(s *S3ApiServer) { s.health = health }
|
|
}
|
|
|
|
func WithReadOnly() Option {
|
|
return func(s *S3ApiServer) { s.Router.readonly = true }
|
|
}
|
|
|
|
// WithMpMaxParts sets the maximum number of parts allowed in a multipart upload.
|
|
func WithMpMaxParts(n int) Option {
|
|
return func(s *S3ApiServer) { s.Router.mpMaxParts = n }
|
|
}
|
|
|
|
// WithHostStyle enabled host-style bucket addressing on the server
|
|
func WithHostStyle(virtualDomain string) Option {
|
|
return func(s *S3ApiServer) {
|
|
s.Router.virtualDomain = virtualDomain
|
|
}
|
|
}
|
|
|
|
// WithKeepAlive enables the server keep alive
|
|
func WithKeepAlive() Option {
|
|
return func(s *S3ApiServer) { s.keepAlive = true }
|
|
}
|
|
|
|
// WithCORSAllowOrigin sets the default CORS Access-Control-Allow-Origin value.
|
|
// This is applied when no bucket CORS configuration exists, and for admin APIs.
|
|
func WithCORSAllowOrigin(origin string) Option {
|
|
return func(s *S3ApiServer) { s.Router.corsAllowOrigin = origin }
|
|
}
|
|
|
|
// WithWebUI mounts the WebUI on the S3 server's Fiber app at the given path prefix,
|
|
// before S3 routes are registered. The prefix must start with "/" and must not be
|
|
// empty or just "/".
|
|
func WithWebUI(prefix string, cfg *webui.ServerConfig) Option {
|
|
return func(s *S3ApiServer) {
|
|
s.webuiMountPrefix = prefix
|
|
s.webuiSrvCfg = cfg
|
|
}
|
|
}
|
|
|
|
// WithRoute registers a top-level Fiber route after the gateway rate limiter
|
|
// and before S3 routes are registered. Handlers are terminal unless they
|
|
// explicitly call ctx.Next().
|
|
func WithRoute(method, path string, handlers ...fiber.Handler) Option {
|
|
return func(s *S3ApiServer) {
|
|
copied := append([]fiber.Handler(nil), handlers...)
|
|
s.routes = append(s.routes, routeMount{
|
|
method: method,
|
|
path: path,
|
|
handlers: copied,
|
|
})
|
|
}
|
|
}
|
|
|
|
// WithMiddleware mounts a Fiber middleware after the gateway rate limiter and
|
|
// before the S3 route table is registered. The middleware must call ctx.Next()
|
|
// for requests it does not fully handle.
|
|
func WithMiddleware(prefix string, handler fiber.Handler) Option {
|
|
return func(s *S3ApiServer) {
|
|
s.middlewares = append(s.middlewares, middlewareMount{
|
|
prefix: prefix,
|
|
handler: handler,
|
|
})
|
|
}
|
|
}
|
|
|
|
// WithConcurrencyLimiter sets the server's maximum connection limit
|
|
// and the hard limit for in-flight requests.
|
|
func WithConcurrencyLimiter(maxConnections, maxRequests int) Option {
|
|
return func(s *S3ApiServer) {
|
|
s.maxConnections = maxConnections
|
|
s.maxRequests = maxRequests
|
|
}
|
|
}
|
|
|
|
// WithSocketPerm sets the file-mode permissions applied to file-backed UNIX
|
|
// domain sockets after binding. It has no effect on TCP/IP or abstract
|
|
// namespace sockets.
|
|
func WithSocketPerm(perm os.FileMode) Option {
|
|
return func(s *S3ApiServer) { s.socketPerm = perm }
|
|
}
|
|
|
|
// WithDisableACL disables the s3 api server ACLs, by ignoring all
|
|
// bucket/object ACL headers
|
|
func WithDisableACL() Option {
|
|
return func(s *S3ApiServer) { s.Router.disableACL = true }
|
|
}
|
|
|
|
// WithOnListen registers a callback that is invoked once the server is bound
|
|
// and ready to accept requests. It is called from the fiber OnListen hook,
|
|
// which fires immediately before the server begins serving the first
|
|
// connection. Use this instead of a fixed sleep to detect readiness reliably.
|
|
func WithOnListen(fn func()) Option {
|
|
return func(s *S3ApiServer) { s.onListen = fn }
|
|
}
|
|
|
|
// ServeMultiPort creates listeners for multiple port specifications and serves
|
|
// on all of them simultaneously. This supports listening on multiple ports and/or
|
|
// addresses (e.g., [":7070", "localhost:8080", "0.0.0.0:9090"]).
|
|
func (sa *S3ApiServer) ServeMultiPort(ports []string) error {
|
|
if len(ports) == 0 {
|
|
return fmt.Errorf("no ports specified")
|
|
}
|
|
|
|
// Multiple ports - create listeners for each
|
|
var listeners []net.Listener
|
|
|
|
for _, portSpec := range ports {
|
|
var ln net.Listener
|
|
var err error
|
|
|
|
if sa.CertStorage != nil {
|
|
ln, err = utils.NewMultiAddrTLSListener(fiber.NetworkTCP, portSpec, sa.CertStorage.GetCertificate, utils.ListenerOptions{SocketPerm: sa.socketPerm})
|
|
} else {
|
|
ln, err = utils.NewMultiAddrListener(fiber.NetworkTCP, portSpec, utils.ListenerOptions{SocketPerm: sa.socketPerm})
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("failed to bind s3 listener %s: %w", portSpec, err)
|
|
}
|
|
|
|
listeners = append(listeners, ln)
|
|
}
|
|
|
|
if len(listeners) == 0 {
|
|
return fmt.Errorf("failed to create any s3 listeners")
|
|
}
|
|
|
|
// Combine all listeners
|
|
finalListener := utils.NewMultiListener(listeners...)
|
|
|
|
if sa.onListen != nil {
|
|
fn := sa.onListen
|
|
sa.app.Hooks().OnListen(func(fiber.ListenData) error {
|
|
fn()
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return sa.app.Listener(finalListener, fiber.ListenConfig{
|
|
DisableStartupMessage: true,
|
|
})
|
|
}
|
|
|
|
// ShutDown gracefully shuts down the server with a context timeout
|
|
func (sa *S3ApiServer) ShutDown() error {
|
|
return sa.app.ShutdownWithTimeout(shutDownDuration)
|
|
}
|
|
|
|
// stackTraceHandler stores the system panics
|
|
// in the context locals
|
|
func stackTraceHandler(ctx fiber.Ctx, e any) {
|
|
utils.ContextKeyStack.Set(ctx, e)
|
|
}
|
|
|
|
// installRequestHeaderLimitErrorHandler converts fasthttp small-buffer errors
|
|
// into the S3 RequestHeaderSectionTooLarge response.
|
|
//
|
|
// This is a temporary solution until Fiber handles request header limit errors
|
|
// before response writes correctly. See:
|
|
// https://github.com/gofiber/fiber/issues/4423
|
|
func installRequestHeaderLimitErrorHandler(app *fiber.App) {
|
|
server := app.Server()
|
|
fiberErrorHandler := server.ErrorHandler
|
|
server.ErrorHandler = func(ctx *fasthttp.RequestCtx, err error) {
|
|
var smallBufferErr *fasthttp.ErrSmallBuffer
|
|
if errors.As(err, &smallBufferErr) {
|
|
debuglogger.Logf("total request headers size exceeds the allowed 8KB")
|
|
|
|
requestID := utils.NewS3RequestID()
|
|
hostID := utils.NewS3HostID()
|
|
apiErr := s3err.GetRequestHeaderSectionTooLargeErr(requestHeaderMaxSize)
|
|
|
|
ctx.Response.Reset()
|
|
ctx.Response.Header.SetContentType(fiber.MIMEApplicationXML)
|
|
ctx.Response.Header.Set(utils.HeaderAmzRequestID, requestID)
|
|
ctx.Response.Header.Set(utils.HeaderAmzID2, hostID)
|
|
ctx.SetStatusCode(apiErr.StatusCode())
|
|
ctx.SetConnectionClose()
|
|
ctx.SetBody(apiErr.XMLBody(requestID, hostID))
|
|
return
|
|
}
|
|
|
|
fiberErrorHandler(ctx, err)
|
|
}
|
|
}
|
|
|
|
// globalErrorHandler catches the errors before reaching to
|
|
// the handlers and any system panics
|
|
func globalErrorHandler(ctx fiber.Ctx, er error) error {
|
|
requestID, hostID := utils.EnsureRequestIDs(ctx)
|
|
|
|
// set content type to application/xml
|
|
ctx.Response().Header.SetContentType(fiber.MIMEApplicationXML)
|
|
|
|
if utils.ContextKeyStack.IsSet(ctx) {
|
|
// if stack is set, it means the stack trace
|
|
// has caught a panic
|
|
// log it as a panic log
|
|
debuglogger.Panic(er)
|
|
} else {
|
|
// handle the fiber specific errors
|
|
var fiberErr *fiber.Error
|
|
if errors.As(er, &fiberErr) {
|
|
if errors.Is(fiberErr, fiber.ErrRequestHeaderFieldsTooLarge) {
|
|
debuglogger.Logf("total request headers size exceeds the allowed 8KB")
|
|
err := s3err.GetRequestHeaderSectionTooLargeErr(requestHeaderMaxSize)
|
|
return ctx.Status(err.StatusCode()).Send(err.XMLBody(requestID, hostID))
|
|
}
|
|
if strings.Contains(fiberErr.Message, "cannot parse Content-Length") {
|
|
debuglogger.Logf("failed to parse Content-Length")
|
|
ctx.Status(http.StatusBadRequest)
|
|
return nil
|
|
}
|
|
if strings.Contains(fiberErr.Message, "error when reading request headers") {
|
|
// This error means fiber failed to parse the incoming request
|
|
// which is a malfoedmed one. Return a BadRequest in this case
|
|
debuglogger.Logf("failed to parse the http request")
|
|
err := s3err.GetAPIError(s3err.ErrCannotParseHTTPRequest)
|
|
return ctx.Status(err.StatusCode()).Send(err.XMLBody(requestID, hostID))
|
|
}
|
|
}
|
|
|
|
// additionally log the internal error
|
|
debuglogger.InternalError(er)
|
|
}
|
|
|
|
ctx.Status(http.StatusInternalServerError)
|
|
|
|
return ctx.Send(s3err.GetAPIError(s3err.ErrInternalError).XMLBody(requestID, hostID))
|
|
}
|