mirror of
https://github.com/versity/versitygw.git
synced 2026-05-13 15:31:28 +00:00
Add options for embedders to register Fiber routes and middleware before the S3 route table is initialized. WithRoute registers a top-level route with explicit method and path matching. WithMiddleware registers prefix middleware that can handle a request or call ctx.Next() to continue into the S3 stack. Add coverage for route registration order when a top-level route and catch-all middleware are both configured.
423 lines
12 KiB
Go
423 lines
12 KiB
Go
// Copyright 2023 Versity Software
|
|
// This file is licensed under the Apache License, Version 2.0
|
|
// (the "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
package s3api
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gofiber/fiber/v2"
|
|
"github.com/gofiber/fiber/v2/middleware/logger"
|
|
"github.com/gofiber/fiber/v2/middleware/recover"
|
|
"github.com/versity/versitygw/auth"
|
|
"github.com/versity/versitygw/backend"
|
|
"github.com/versity/versitygw/debuglogger"
|
|
"github.com/versity/versitygw/metrics"
|
|
"github.com/versity/versitygw/s3api/controllers"
|
|
"github.com/versity/versitygw/s3api/middlewares"
|
|
"github.com/versity/versitygw/s3api/utils"
|
|
"github.com/versity/versitygw/s3err"
|
|
"github.com/versity/versitygw/s3event"
|
|
"github.com/versity/versitygw/s3log"
|
|
"github.com/versity/versitygw/webui"
|
|
)
|
|
|
|
const (
|
|
shutDownDuration = time.Second * 10
|
|
)
|
|
|
|
type S3ApiServer struct {
|
|
Router *S3ApiRouter
|
|
app *fiber.App
|
|
backend backend.Backend
|
|
CertStorage *utils.CertStorage
|
|
quiet bool
|
|
keepAlive bool
|
|
health string
|
|
maxConnections int
|
|
maxRequests int
|
|
webuiMountPrefix string
|
|
webuiSrvCfg *webui.ServerConfig
|
|
routes []routeMount
|
|
middlewares []middlewareMount
|
|
socketPerm os.FileMode
|
|
}
|
|
|
|
type routeMount struct {
|
|
method string
|
|
path string
|
|
handlers []fiber.Handler
|
|
}
|
|
|
|
type middlewareMount struct {
|
|
prefix string
|
|
handler fiber.Handler
|
|
}
|
|
|
|
func New(
|
|
be backend.Backend,
|
|
root middlewares.RootUserConfig,
|
|
region string,
|
|
iam auth.IAMService,
|
|
l s3log.AuditLogger,
|
|
adminLogger s3log.AuditLogger,
|
|
evs s3event.S3EventSender,
|
|
mm metrics.Manager,
|
|
opts ...Option,
|
|
) (*S3ApiServer, error) {
|
|
server := &S3ApiServer{
|
|
backend: be,
|
|
Router: &S3ApiRouter{
|
|
be: be,
|
|
iam: iam,
|
|
logger: l,
|
|
aLogger: adminLogger,
|
|
evs: evs,
|
|
mm: mm,
|
|
root: root,
|
|
region: region,
|
|
},
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(server)
|
|
}
|
|
|
|
app := fiber.New(fiber.Config{
|
|
AppName: "versitygw",
|
|
ServerHeader: "VERSITYGW",
|
|
StreamRequestBody: true,
|
|
DisableKeepalive: !server.keepAlive,
|
|
Network: fiber.NetworkTCP,
|
|
DisableStartupMessage: true,
|
|
ErrorHandler: globalErrorHandler,
|
|
Concurrency: server.maxConnections,
|
|
// Sets buffer limit to read/parse incoming requests
|
|
// if the limit is reached, fiber/fasthttp will throw an error
|
|
// in the global error handler
|
|
ReadBufferSize: 8 * 1024, // 8 KB
|
|
})
|
|
|
|
server.app = app
|
|
server.Router.app = app
|
|
|
|
// initialize the panic recovery middleware
|
|
app.Use(recover.New(
|
|
recover.Config{
|
|
EnableStackTrace: true,
|
|
StackTraceHandler: stackTraceHandler,
|
|
}))
|
|
|
|
// Logging middlewares
|
|
if !server.quiet {
|
|
app.Use(logger.New(logger.Config{
|
|
Format: "${time} | vgw | ${status} | ${latency} | ${ip} | ${method} | ${path} | ${error} | ${queryParams}\n",
|
|
}))
|
|
}
|
|
// Set up health endpoint if specified
|
|
if server.health != "" {
|
|
app.Get(server.health, func(ctx *fiber.Ctx) error {
|
|
return ctx.SendStatus(http.StatusOK)
|
|
})
|
|
}
|
|
|
|
// Set up WebUI on the S3 port if configured
|
|
if server.webuiSrvCfg != nil {
|
|
webui.MountOn(app, server.webuiMountPrefix, server.webuiSrvCfg)
|
|
}
|
|
|
|
// initialize total requests cap limiter middleware
|
|
app.Use(middlewares.RateLimiter(server.maxRequests, mm, l))
|
|
|
|
for _, route := range server.routes {
|
|
method, err := validateRouteMount(route)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
app.Add(method, route.path, route.handlers...)
|
|
}
|
|
|
|
for _, mount := range server.middlewares {
|
|
if err := validateMiddlewareMount(mount); err != nil {
|
|
return nil, err
|
|
}
|
|
app.Use(mount.prefix, mount.handler)
|
|
}
|
|
|
|
// initilaze the default value setter middleware
|
|
app.Use(middlewares.SetDefaultValues(root, region))
|
|
|
|
// initialize the 'DecodeURL' middleware which
|
|
// path unescapes the url
|
|
app.Use(controllers.WrapMiddleware(middlewares.DecodeURL, l, mm))
|
|
|
|
// initialize the debug logger in debug mode
|
|
if debuglogger.IsDebugEnabled() {
|
|
app.Use(middlewares.DebugLogger())
|
|
}
|
|
|
|
server.Router.Init()
|
|
|
|
return server, nil
|
|
}
|
|
|
|
func validateRouteMount(route routeMount) (string, error) {
|
|
if route.method == "" {
|
|
return "", fmt.Errorf("invalid route for path %q: empty method", route.path)
|
|
}
|
|
method := strings.ToUpper(route.method)
|
|
if !isStandardHTTPMethod(method) {
|
|
return "", fmt.Errorf("invalid HTTP method %q for route path %q: must be one of %s",
|
|
route.method, route.path, strings.Join(fiber.DefaultMethods, ", "))
|
|
}
|
|
if route.path == "" || route.path[0] != '/' {
|
|
return "", fmt.Errorf("invalid route path %q: must start with /", route.path)
|
|
}
|
|
if len(route.handlers) == 0 {
|
|
return "", fmt.Errorf("invalid route for %s %s: no handlers", method, route.path)
|
|
}
|
|
for i, handler := range route.handlers {
|
|
if handler == nil {
|
|
return "", fmt.Errorf("invalid route for %s %s: nil handler at index %d", method, route.path, i)
|
|
}
|
|
}
|
|
return method, nil
|
|
}
|
|
|
|
func isStandardHTTPMethod(method string) bool {
|
|
for _, valid := range fiber.DefaultMethods {
|
|
if method == valid {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func validateMiddlewareMount(mount middlewareMount) error {
|
|
if mount.prefix == "" || mount.prefix[0] != '/' {
|
|
return fmt.Errorf("invalid middleware prefix %q: must start with /", mount.prefix)
|
|
}
|
|
if mount.handler == nil {
|
|
return fmt.Errorf("invalid middleware for prefix %q: nil handler", mount.prefix)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Option sets various options for New()
|
|
type Option func(*S3ApiServer)
|
|
|
|
// WithTLS sets TLS Credentials
|
|
func WithTLS(cs *utils.CertStorage) Option {
|
|
return func(s *S3ApiServer) { s.CertStorage = cs }
|
|
}
|
|
|
|
// WithAdminServer runs admin endpoints with the gateway in the same network
|
|
func WithAdminServer() Option {
|
|
return func(s *S3ApiServer) { s.Router.WithAdmSrv = true }
|
|
}
|
|
|
|
// WithQuiet silences default logging output
|
|
func WithQuiet() Option {
|
|
return func(s *S3ApiServer) { s.quiet = true }
|
|
}
|
|
|
|
// WithHealth sets up a GET health endpoint
|
|
func WithHealth(health string) Option {
|
|
return func(s *S3ApiServer) { s.health = health }
|
|
}
|
|
|
|
func WithReadOnly() Option {
|
|
return func(s *S3ApiServer) { s.Router.readonly = true }
|
|
}
|
|
|
|
// WithMpMaxParts sets the maximum number of parts allowed in a multipart upload.
|
|
func WithMpMaxParts(n int) Option {
|
|
return func(s *S3ApiServer) { s.Router.mpMaxParts = n }
|
|
}
|
|
|
|
// WithHostStyle enabled host-style bucket addressing on the server
|
|
func WithHostStyle(virtualDomain string) Option {
|
|
return func(s *S3ApiServer) {
|
|
s.Router.virtualDomain = virtualDomain
|
|
}
|
|
}
|
|
|
|
// WithKeepAlive enables the server keep alive
|
|
func WithKeepAlive() Option {
|
|
return func(s *S3ApiServer) { s.keepAlive = true }
|
|
}
|
|
|
|
// WithCORSAllowOrigin sets the default CORS Access-Control-Allow-Origin value.
|
|
// This is applied when no bucket CORS configuration exists, and for admin APIs.
|
|
func WithCORSAllowOrigin(origin string) Option {
|
|
return func(s *S3ApiServer) { s.Router.corsAllowOrigin = origin }
|
|
}
|
|
|
|
// WithWebUI mounts the WebUI on the S3 server's Fiber app at the given path prefix,
|
|
// before S3 routes are registered. The prefix must start with "/" and must not be
|
|
// empty or just "/".
|
|
func WithWebUI(prefix string, cfg *webui.ServerConfig) Option {
|
|
return func(s *S3ApiServer) {
|
|
s.webuiMountPrefix = prefix
|
|
s.webuiSrvCfg = cfg
|
|
}
|
|
}
|
|
|
|
// WithRoute registers a top-level Fiber route after the gateway rate limiter
|
|
// and before S3 routes are registered. Handlers are terminal unless they
|
|
// explicitly call ctx.Next().
|
|
func WithRoute(method, path string, handlers ...fiber.Handler) Option {
|
|
return func(s *S3ApiServer) {
|
|
copied := append([]fiber.Handler(nil), handlers...)
|
|
s.routes = append(s.routes, routeMount{
|
|
method: method,
|
|
path: path,
|
|
handlers: copied,
|
|
})
|
|
}
|
|
}
|
|
|
|
// WithMiddleware mounts a Fiber middleware after the gateway rate limiter and
|
|
// before the S3 route table is registered. The middleware must call ctx.Next()
|
|
// for requests it does not fully handle.
|
|
func WithMiddleware(prefix string, handler fiber.Handler) Option {
|
|
return func(s *S3ApiServer) {
|
|
s.middlewares = append(s.middlewares, middlewareMount{
|
|
prefix: prefix,
|
|
handler: handler,
|
|
})
|
|
}
|
|
}
|
|
|
|
// WithConcurrencyLimiter sets the server's maximum connection limit
|
|
// and the hard limit for in-flight requests.
|
|
func WithConcurrencyLimiter(maxConnections, maxRequests int) Option {
|
|
return func(s *S3ApiServer) {
|
|
s.maxConnections = maxConnections
|
|
s.maxRequests = maxRequests
|
|
}
|
|
}
|
|
|
|
// WithSocketPerm sets the file-mode permissions applied to file-backed UNIX
|
|
// domain sockets after binding. It has no effect on TCP/IP or abstract
|
|
// namespace sockets.
|
|
func WithSocketPerm(perm os.FileMode) Option {
|
|
return func(s *S3ApiServer) { s.socketPerm = perm }
|
|
}
|
|
|
|
// WithDisableACL disables the s3 api server ACLs, by ignoring all
|
|
// bucket/object ACL headers
|
|
func WithDisableACL() Option {
|
|
return func(s *S3ApiServer) { s.Router.disableACL = true }
|
|
}
|
|
|
|
// ServeMultiPort creates listeners for multiple port specifications and serves
|
|
// on all of them simultaneously. This supports listening on multiple ports and/or
|
|
// addresses (e.g., [":7070", "localhost:8080", "0.0.0.0:9090"]).
|
|
func (sa *S3ApiServer) ServeMultiPort(ports []string) error {
|
|
if len(ports) == 0 {
|
|
return fmt.Errorf("no ports specified")
|
|
}
|
|
|
|
// Multiple ports - create listeners for each
|
|
var listeners []net.Listener
|
|
|
|
for _, portSpec := range ports {
|
|
var ln net.Listener
|
|
var err error
|
|
|
|
if sa.CertStorage != nil {
|
|
ln, err = utils.NewMultiAddrTLSListener(sa.app.Config().Network, portSpec, sa.CertStorage.GetCertificate, utils.ListenerOptions{SocketPerm: sa.socketPerm})
|
|
} else {
|
|
ln, err = utils.NewMultiAddrListener(sa.app.Config().Network, portSpec, utils.ListenerOptions{SocketPerm: sa.socketPerm})
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("failed to bind s3 listener %s: %w", portSpec, err)
|
|
}
|
|
|
|
listeners = append(listeners, ln)
|
|
}
|
|
|
|
if len(listeners) == 0 {
|
|
return fmt.Errorf("failed to create any s3 listeners")
|
|
}
|
|
|
|
// Combine all listeners
|
|
finalListener := utils.NewMultiListener(listeners...)
|
|
|
|
return sa.app.Listener(finalListener)
|
|
}
|
|
|
|
// ShutDown gracefully shuts down the server with a context timeout
|
|
func (sa *S3ApiServer) ShutDown() error {
|
|
return sa.app.ShutdownWithTimeout(shutDownDuration)
|
|
}
|
|
|
|
// stackTraceHandler stores the system panics
|
|
// in the context locals
|
|
func stackTraceHandler(ctx *fiber.Ctx, e any) {
|
|
utils.ContextKeyStack.Set(ctx, e)
|
|
}
|
|
|
|
// globalErrorHandler catches the errors before reaching to
|
|
// the handlers and any system panics
|
|
func globalErrorHandler(ctx *fiber.Ctx, er error) error {
|
|
// set content type to application/xml
|
|
ctx.Response().Header.SetContentType(fiber.MIMEApplicationXML)
|
|
|
|
if utils.ContextKeyStack.IsSet(ctx) {
|
|
// if stack is set, it means the stack trace
|
|
// has caught a panic
|
|
// log it as a panic log
|
|
debuglogger.Panic(er)
|
|
} else {
|
|
// handle the fiber specific errors
|
|
var fiberErr *fiber.Error
|
|
if errors.As(er, &fiberErr) {
|
|
if errors.Is(fiberErr, fiber.ErrRequestHeaderFieldsTooLarge) {
|
|
debuglogger.Logf("total request headers size exceeds the allowed 8KB")
|
|
ctx.Status(http.StatusBadRequest)
|
|
return nil
|
|
}
|
|
if strings.Contains(fiberErr.Message, "cannot parse Content-Length") {
|
|
debuglogger.Logf("failed to parse Content-Length")
|
|
ctx.Status(http.StatusBadRequest)
|
|
return nil
|
|
}
|
|
if strings.Contains(fiberErr.Message, "error when reading request headers") {
|
|
// This error means fiber failed to parse the incoming request
|
|
// which is a malfoedmed one. Return a BadRequest in this case
|
|
debuglogger.Logf("failed to parse the http request")
|
|
err := s3err.GetAPIError(s3err.ErrCannotParseHTTPRequest)
|
|
ctx.Status(err.HTTPStatusCode)
|
|
return ctx.Send(s3err.GetAPIErrorResponse(err, "", "", ""))
|
|
}
|
|
}
|
|
|
|
// additionally log the internal error
|
|
debuglogger.InternalError(er)
|
|
}
|
|
|
|
ctx.Status(http.StatusInternalServerError)
|
|
|
|
return ctx.Send(s3err.GetAPIErrorResponse(
|
|
s3err.GetAPIError(s3err.ErrInternalError), "", "", ""))
|
|
}
|