Files
versitygw/s3api/server.go
Yevgeniy Miretskiy d1fba07fe6 feat: add custom route and middleware options
Add options for embedders to register Fiber routes and middleware before the S3 route table is initialized.

WithRoute registers a top-level route with explicit method and path matching. WithMiddleware registers prefix middleware that can handle a request or call ctx.Next() to continue into the S3 stack.

Add coverage for route registration order when a top-level route and catch-all middleware are both configured.
2026-05-04 08:41:16 -07:00

423 lines
12 KiB
Go

// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package s3api
import (
"errors"
"fmt"
"net"
"net/http"
"os"
"strings"
"time"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/logger"
"github.com/gofiber/fiber/v2/middleware/recover"
"github.com/versity/versitygw/auth"
"github.com/versity/versitygw/backend"
"github.com/versity/versitygw/debuglogger"
"github.com/versity/versitygw/metrics"
"github.com/versity/versitygw/s3api/controllers"
"github.com/versity/versitygw/s3api/middlewares"
"github.com/versity/versitygw/s3api/utils"
"github.com/versity/versitygw/s3err"
"github.com/versity/versitygw/s3event"
"github.com/versity/versitygw/s3log"
"github.com/versity/versitygw/webui"
)
const (
shutDownDuration = time.Second * 10
)
type S3ApiServer struct {
Router *S3ApiRouter
app *fiber.App
backend backend.Backend
CertStorage *utils.CertStorage
quiet bool
keepAlive bool
health string
maxConnections int
maxRequests int
webuiMountPrefix string
webuiSrvCfg *webui.ServerConfig
routes []routeMount
middlewares []middlewareMount
socketPerm os.FileMode
}
type routeMount struct {
method string
path string
handlers []fiber.Handler
}
type middlewareMount struct {
prefix string
handler fiber.Handler
}
func New(
be backend.Backend,
root middlewares.RootUserConfig,
region string,
iam auth.IAMService,
l s3log.AuditLogger,
adminLogger s3log.AuditLogger,
evs s3event.S3EventSender,
mm metrics.Manager,
opts ...Option,
) (*S3ApiServer, error) {
server := &S3ApiServer{
backend: be,
Router: &S3ApiRouter{
be: be,
iam: iam,
logger: l,
aLogger: adminLogger,
evs: evs,
mm: mm,
root: root,
region: region,
},
}
for _, opt := range opts {
opt(server)
}
app := fiber.New(fiber.Config{
AppName: "versitygw",
ServerHeader: "VERSITYGW",
StreamRequestBody: true,
DisableKeepalive: !server.keepAlive,
Network: fiber.NetworkTCP,
DisableStartupMessage: true,
ErrorHandler: globalErrorHandler,
Concurrency: server.maxConnections,
// Sets buffer limit to read/parse incoming requests
// if the limit is reached, fiber/fasthttp will throw an error
// in the global error handler
ReadBufferSize: 8 * 1024, // 8 KB
})
server.app = app
server.Router.app = app
// initialize the panic recovery middleware
app.Use(recover.New(
recover.Config{
EnableStackTrace: true,
StackTraceHandler: stackTraceHandler,
}))
// Logging middlewares
if !server.quiet {
app.Use(logger.New(logger.Config{
Format: "${time} | vgw | ${status} | ${latency} | ${ip} | ${method} | ${path} | ${error} | ${queryParams}\n",
}))
}
// Set up health endpoint if specified
if server.health != "" {
app.Get(server.health, func(ctx *fiber.Ctx) error {
return ctx.SendStatus(http.StatusOK)
})
}
// Set up WebUI on the S3 port if configured
if server.webuiSrvCfg != nil {
webui.MountOn(app, server.webuiMountPrefix, server.webuiSrvCfg)
}
// initialize total requests cap limiter middleware
app.Use(middlewares.RateLimiter(server.maxRequests, mm, l))
for _, route := range server.routes {
method, err := validateRouteMount(route)
if err != nil {
return nil, err
}
app.Add(method, route.path, route.handlers...)
}
for _, mount := range server.middlewares {
if err := validateMiddlewareMount(mount); err != nil {
return nil, err
}
app.Use(mount.prefix, mount.handler)
}
// initilaze the default value setter middleware
app.Use(middlewares.SetDefaultValues(root, region))
// initialize the 'DecodeURL' middleware which
// path unescapes the url
app.Use(controllers.WrapMiddleware(middlewares.DecodeURL, l, mm))
// initialize the debug logger in debug mode
if debuglogger.IsDebugEnabled() {
app.Use(middlewares.DebugLogger())
}
server.Router.Init()
return server, nil
}
func validateRouteMount(route routeMount) (string, error) {
if route.method == "" {
return "", fmt.Errorf("invalid route for path %q: empty method", route.path)
}
method := strings.ToUpper(route.method)
if !isStandardHTTPMethod(method) {
return "", fmt.Errorf("invalid HTTP method %q for route path %q: must be one of %s",
route.method, route.path, strings.Join(fiber.DefaultMethods, ", "))
}
if route.path == "" || route.path[0] != '/' {
return "", fmt.Errorf("invalid route path %q: must start with /", route.path)
}
if len(route.handlers) == 0 {
return "", fmt.Errorf("invalid route for %s %s: no handlers", method, route.path)
}
for i, handler := range route.handlers {
if handler == nil {
return "", fmt.Errorf("invalid route for %s %s: nil handler at index %d", method, route.path, i)
}
}
return method, nil
}
func isStandardHTTPMethod(method string) bool {
for _, valid := range fiber.DefaultMethods {
if method == valid {
return true
}
}
return false
}
func validateMiddlewareMount(mount middlewareMount) error {
if mount.prefix == "" || mount.prefix[0] != '/' {
return fmt.Errorf("invalid middleware prefix %q: must start with /", mount.prefix)
}
if mount.handler == nil {
return fmt.Errorf("invalid middleware for prefix %q: nil handler", mount.prefix)
}
return nil
}
// Option sets various options for New()
type Option func(*S3ApiServer)
// WithTLS sets TLS Credentials
func WithTLS(cs *utils.CertStorage) Option {
return func(s *S3ApiServer) { s.CertStorage = cs }
}
// WithAdminServer runs admin endpoints with the gateway in the same network
func WithAdminServer() Option {
return func(s *S3ApiServer) { s.Router.WithAdmSrv = true }
}
// WithQuiet silences default logging output
func WithQuiet() Option {
return func(s *S3ApiServer) { s.quiet = true }
}
// WithHealth sets up a GET health endpoint
func WithHealth(health string) Option {
return func(s *S3ApiServer) { s.health = health }
}
func WithReadOnly() Option {
return func(s *S3ApiServer) { s.Router.readonly = true }
}
// WithMpMaxParts sets the maximum number of parts allowed in a multipart upload.
func WithMpMaxParts(n int) Option {
return func(s *S3ApiServer) { s.Router.mpMaxParts = n }
}
// WithHostStyle enabled host-style bucket addressing on the server
func WithHostStyle(virtualDomain string) Option {
return func(s *S3ApiServer) {
s.Router.virtualDomain = virtualDomain
}
}
// WithKeepAlive enables the server keep alive
func WithKeepAlive() Option {
return func(s *S3ApiServer) { s.keepAlive = true }
}
// WithCORSAllowOrigin sets the default CORS Access-Control-Allow-Origin value.
// This is applied when no bucket CORS configuration exists, and for admin APIs.
func WithCORSAllowOrigin(origin string) Option {
return func(s *S3ApiServer) { s.Router.corsAllowOrigin = origin }
}
// WithWebUI mounts the WebUI on the S3 server's Fiber app at the given path prefix,
// before S3 routes are registered. The prefix must start with "/" and must not be
// empty or just "/".
func WithWebUI(prefix string, cfg *webui.ServerConfig) Option {
return func(s *S3ApiServer) {
s.webuiMountPrefix = prefix
s.webuiSrvCfg = cfg
}
}
// WithRoute registers a top-level Fiber route after the gateway rate limiter
// and before S3 routes are registered. Handlers are terminal unless they
// explicitly call ctx.Next().
func WithRoute(method, path string, handlers ...fiber.Handler) Option {
return func(s *S3ApiServer) {
copied := append([]fiber.Handler(nil), handlers...)
s.routes = append(s.routes, routeMount{
method: method,
path: path,
handlers: copied,
})
}
}
// WithMiddleware mounts a Fiber middleware after the gateway rate limiter and
// before the S3 route table is registered. The middleware must call ctx.Next()
// for requests it does not fully handle.
func WithMiddleware(prefix string, handler fiber.Handler) Option {
return func(s *S3ApiServer) {
s.middlewares = append(s.middlewares, middlewareMount{
prefix: prefix,
handler: handler,
})
}
}
// WithConcurrencyLimiter sets the server's maximum connection limit
// and the hard limit for in-flight requests.
func WithConcurrencyLimiter(maxConnections, maxRequests int) Option {
return func(s *S3ApiServer) {
s.maxConnections = maxConnections
s.maxRequests = maxRequests
}
}
// WithSocketPerm sets the file-mode permissions applied to file-backed UNIX
// domain sockets after binding. It has no effect on TCP/IP or abstract
// namespace sockets.
func WithSocketPerm(perm os.FileMode) Option {
return func(s *S3ApiServer) { s.socketPerm = perm }
}
// WithDisableACL disables the s3 api server ACLs, by ignoring all
// bucket/object ACL headers
func WithDisableACL() Option {
return func(s *S3ApiServer) { s.Router.disableACL = true }
}
// ServeMultiPort creates listeners for multiple port specifications and serves
// on all of them simultaneously. This supports listening on multiple ports and/or
// addresses (e.g., [":7070", "localhost:8080", "0.0.0.0:9090"]).
func (sa *S3ApiServer) ServeMultiPort(ports []string) error {
if len(ports) == 0 {
return fmt.Errorf("no ports specified")
}
// Multiple ports - create listeners for each
var listeners []net.Listener
for _, portSpec := range ports {
var ln net.Listener
var err error
if sa.CertStorage != nil {
ln, err = utils.NewMultiAddrTLSListener(sa.app.Config().Network, portSpec, sa.CertStorage.GetCertificate, utils.ListenerOptions{SocketPerm: sa.socketPerm})
} else {
ln, err = utils.NewMultiAddrListener(sa.app.Config().Network, portSpec, utils.ListenerOptions{SocketPerm: sa.socketPerm})
}
if err != nil {
return fmt.Errorf("failed to bind s3 listener %s: %w", portSpec, err)
}
listeners = append(listeners, ln)
}
if len(listeners) == 0 {
return fmt.Errorf("failed to create any s3 listeners")
}
// Combine all listeners
finalListener := utils.NewMultiListener(listeners...)
return sa.app.Listener(finalListener)
}
// ShutDown gracefully shuts down the server with a context timeout
func (sa *S3ApiServer) ShutDown() error {
return sa.app.ShutdownWithTimeout(shutDownDuration)
}
// stackTraceHandler stores the system panics
// in the context locals
func stackTraceHandler(ctx *fiber.Ctx, e any) {
utils.ContextKeyStack.Set(ctx, e)
}
// globalErrorHandler catches the errors before reaching to
// the handlers and any system panics
func globalErrorHandler(ctx *fiber.Ctx, er error) error {
// set content type to application/xml
ctx.Response().Header.SetContentType(fiber.MIMEApplicationXML)
if utils.ContextKeyStack.IsSet(ctx) {
// if stack is set, it means the stack trace
// has caught a panic
// log it as a panic log
debuglogger.Panic(er)
} else {
// handle the fiber specific errors
var fiberErr *fiber.Error
if errors.As(er, &fiberErr) {
if errors.Is(fiberErr, fiber.ErrRequestHeaderFieldsTooLarge) {
debuglogger.Logf("total request headers size exceeds the allowed 8KB")
ctx.Status(http.StatusBadRequest)
return nil
}
if strings.Contains(fiberErr.Message, "cannot parse Content-Length") {
debuglogger.Logf("failed to parse Content-Length")
ctx.Status(http.StatusBadRequest)
return nil
}
if strings.Contains(fiberErr.Message, "error when reading request headers") {
// This error means fiber failed to parse the incoming request
// which is a malfoedmed one. Return a BadRequest in this case
debuglogger.Logf("failed to parse the http request")
err := s3err.GetAPIError(s3err.ErrCannotParseHTTPRequest)
ctx.Status(err.HTTPStatusCode)
return ctx.Send(s3err.GetAPIErrorResponse(err, "", "", ""))
}
}
// additionally log the internal error
debuglogger.InternalError(er)
}
ctx.Status(http.StatusInternalServerError)
return ctx.Send(s3err.GetAPIErrorResponse(
s3err.GetAPIError(s3err.ErrInternalError), "", "", ""))
}