Files
object-browser/restapi/ws_handle.go
Cesar N 4a27ef4b2c Move heal and watch to tenant details view on operator-ui (#449)
Use insecure: true in the meantime so the wss/watch endpoint works while
we add support for custotm TLS transport in the S3 client library.
Removed "InsecureSkipVerify: true" from s3AdminClient and s3Client HTTP clients
2020-11-30 14:41:58 -06:00

417 lines
12 KiB
Go

// This file is part of MinIO Console Server
// Copyright (c) 2020 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package restapi
import (
"context"
"log"
"net"
"net/http"
"strings"
"time"
"github.com/go-openapi/errors"
"github.com/gorilla/websocket"
"github.com/minio/console/cluster"
"github.com/minio/console/models"
"github.com/minio/console/pkg/auth"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 0,
WriteBufferSize: 1024,
}
const (
// websocket base path
wsBasePath = "/ws"
)
// ConsoleWebsocketAdmin interface of a Websocket Client
type ConsoleWebsocketAdmin interface {
trace()
console()
}
type wsAdminClient struct {
// websocket connection.
conn wsConn
// MinIO admin Client
client MinioAdmin
}
// ConsoleWebsocket interface of a Websocket Client
type ConsoleWebsocket interface {
watch(options watchOptions)
heal(opts healOptions)
}
type wsS3Client struct {
// websocket connection.
conn wsConn
// mcClient
client MCClient
}
// WSConn interface with all functions to be implemented
// by mock when testing, it should include all websocket.Conn
// respective api calls that are used within this project.
type WSConn interface {
writeMessage(messageType int, data []byte) error
close() error
readMessage() (messageType int, p []byte, err error)
}
// Interface implementation
//
// Define the structure of a websocket Connection
type wsConn struct {
conn *websocket.Conn
}
func (c wsConn) writeMessage(messageType int, data []byte) error {
return c.conn.WriteMessage(messageType, data)
}
func (c wsConn) close() error {
return c.conn.Close()
}
func (c wsConn) readMessage() (messageType int, p []byte, err error) {
return c.conn.ReadMessage()
}
// serveWS validates the incoming request and
// upgrades the request to a Websocket protocol.
// Websocket communication will be done depending
// on the path.
// Request should come like ws://<host>:<port>/ws/<api>
func serveWS(w http.ResponseWriter, req *http.Request) {
// Perform authentication before upgrading to a Websocket Connection
// authenticate WS connection with Console
session, err := auth.GetClaimsFromTokenInRequest(req)
if err != nil {
log.Print("error on ws authentication: ", err)
errors.ServeError(w, req, errors.New(http.StatusUnauthorized, err.Error()))
return
}
// upgrades the HTTP server connection to the WebSocket protocol.
conn, err := upgrader.Upgrade(w, req, nil)
if err != nil {
log.Print("error on upgrade: ", err)
errors.ServeError(w, req, err)
return
}
wsPath := strings.TrimPrefix(req.URL.Path, wsBasePath)
switch {
case strings.HasPrefix(wsPath, `/trace`):
// Trace api only for operator Console
namespace, tenant, err := getTraceOptionsFromReq(req)
if err != nil {
log.Println("error getting trace options:", err)
closeWsConn(conn)
return
}
wsAdminClient, err := newWebSocketTenantAdminClient(conn, session, namespace, tenant)
if err != nil {
closeWsConn(conn)
return
}
go wsAdminClient.trace()
case strings.HasPrefix(wsPath, `/console`):
// Trace api only for operator Console
namespace, tenant, err := getConsoleLogOptionsFromReq(req)
if err != nil {
log.Println("error getting log options:", err)
closeWsConn(conn)
return
}
wsAdminClient, err := newWebSocketTenantAdminClient(conn, session, namespace, tenant)
if err != nil {
closeWsConn(conn)
return
}
go wsAdminClient.console()
case strings.HasPrefix(wsPath, `/heal`):
hOptions, err := getHealOptionsFromReq(req)
if err != nil {
log.Println("error getting heal options:", err)
closeWsConn(conn)
return
}
wsAdminClient, err := newWebSocketTenantAdminClient(conn, session, hOptions.Namespace, hOptions.Tenant)
if err != nil {
closeWsConn(conn)
return
}
go wsAdminClient.heal(hOptions)
case strings.HasPrefix(wsPath, `/watch`):
wOptions, err := getWatchOptionsFromReq(req)
if err != nil {
log.Println("error getting watch options:", err)
closeWsConn(conn)
return
}
wsS3Client, err := newWebSocketS3Client(conn, session, wOptions.Namespace, wOptions.Tenant, wOptions.BucketName)
if err != nil {
closeWsConn(conn)
return
}
go wsS3Client.watch(wOptions)
default:
// path not found
closeWsConn(conn)
}
}
// newWebSocketTenantAdminClient creates a ws Client with a k8s tenant client
// this is to be used for a kubernetes environment and for a particular tenant
// in a defined namespace
func newWebSocketTenantAdminClient(conn *websocket.Conn, session *models.Principal, namespace, tenant string) (*wsAdminClient, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
opClientClientSet, err := cluster.OperatorClient(session.SessionToken)
if err != nil {
return nil, err
}
clientSet, err := cluster.K8sClient(session.SessionToken)
if err != nil {
return nil, err
}
opClient := &operatorClient{
client: opClientClientSet,
}
k8sClient := &k8sClient{
client: clientSet,
}
minTenant, err := getTenant(ctx, opClient, namespace, tenant)
if err != nil {
return nil, err
}
minTenant.EnsureDefaults()
svcURL := GetTenantServiceURL(minTenant)
// getTenantAdminClient will use all certificates under ~/.console/certs/CAs to trust the TLS connections with MinIO tenants
mAdmin, err := getTenantAdminClient(
ctx,
k8sClient,
minTenant,
svcURL,
true,
)
if err != nil {
return nil, err
}
// create a websocket connection interface implementation
// defining the connection to be used
wsConnection := wsConn{conn: conn}
// create a minioClient interface implementation
// defining the client to be used
adminClient := adminClient{client: mAdmin}
// create websocket client and handle request
wsAdminClient := &wsAdminClient{conn: wsConnection, client: adminClient}
return wsAdminClient, nil
}
// newWebSocketS3Client returns a wsAdminClient authenticated as Console admin
func newWebSocketS3Client(conn *websocket.Conn, claims *models.Principal, namespace, tenant, bucketName string) (*wsS3Client, error) {
// Only start Websocket Interaction after user has been
// authenticated with MinIO
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
opClientClientSet, err := cluster.OperatorClient(claims.SessionToken)
if err != nil {
return nil, err
}
clientSet, err := cluster.K8sClient(claims.SessionToken)
if err != nil {
return nil, err
}
opClient := &operatorClient{
client: opClientClientSet,
}
k8sClient := &k8sClient{
client: clientSet,
}
minTenant, err := getTenant(ctx, opClient, namespace, tenant)
if err != nil {
return nil, err
}
minTenant.EnsureDefaults()
// Get Tenant Creds and substitute session ones
tenantCreds, err := getTenantCreds(ctx, k8sClient, minTenant)
if err != nil {
return nil, err
}
tenantClaims := &models.Principal{
AccessKeyID: tenantCreds.accessKey,
SecretAccessKey: tenantCreds.secretKey,
}
svcURL := GetTenantServiceURL(minTenant)
// TODO: change isSecure: true to minTenant.TLS() and add support to S3Client to accept custom TLS Transport
s3Client, err := newTenantS3BucketClient(tenantClaims, svcURL, bucketName, false)
if err != nil {
log.Println("error creating S3Client:", err)
conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
conn.Close()
return nil, err
}
// create a websocket connection interface implementation
// defining the connection to be used
wsConnection := wsConn{conn: conn}
// create a s3Client interface implementation
// defining the client to be used
mcS3C := mcClient{client: s3Client}
// create websocket client and handle request
wsS3Client := &wsS3Client{conn: wsConnection, client: mcS3C}
return wsS3Client, nil
}
// wsReadClientCtx reads the messages that come from the client
// if the client sends a Close Message the context will be
// canceled. If the connection is closed the goroutine inside
// will return.
func wsReadClientCtx(conn WSConn) context.Context {
// a cancel context is needed to end all goroutines used
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
for {
_, _, err := conn.readMessage()
if err != nil {
// if error of type websocket.CloseError and is Unexpected
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Println("error unexpected CloseError on ReadMessage:", err)
return
}
// Not all errors are of type websocket.CloseError.
if _, ok := err.(*websocket.CloseError); !ok {
log.Println("error on ReadMessage:", err)
return
}
// else is an expected Close Error
log.Println("closed conn.ReadMessage:", err)
return
}
}
}()
return ctx
}
// closeWsConn sends Close Message and closes the websocket connection
func closeWsConn(conn *websocket.Conn) {
conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
conn.Close()
}
// trace serves madmin.ServiceTraceInfo
// on a Websocket connection.
func (wsc *wsAdminClient) trace() {
defer func() {
log.Println("trace stopped")
// close connection after return
wsc.conn.close()
}()
log.Println("trace started")
ctx := wsReadClientCtx(wsc.conn)
err := startTraceInfo(ctx, wsc.conn, wsc.client)
sendWsCloseMessage(wsc.conn, err)
}
// console serves madmin.GetLogs
// on a Websocket connection.
func (wsc *wsAdminClient) console() {
defer func() {
log.Println("console logs stopped")
// close connection after return
wsc.conn.close()
}()
log.Println("console logs started")
ctx := wsReadClientCtx(wsc.conn)
err := startConsoleLog(ctx, wsc.conn, wsc.client)
sendWsCloseMessage(wsc.conn, err)
}
func (wsc *wsS3Client) watch(params *watchOptions) {
defer func() {
log.Println("watch stopped")
// close connection after return
wsc.conn.close()
}()
log.Println("watch started")
ctx := wsReadClientCtx(wsc.conn)
err := startWatch(ctx, wsc.conn, wsc.client, params)
sendWsCloseMessage(wsc.conn, err)
}
func (wsc *wsAdminClient) heal(opts *healOptions) {
defer func() {
log.Println("heal stopped")
// close connection after return
wsc.conn.close()
}()
log.Println("heal started")
ctx := wsReadClientCtx(wsc.conn)
err := startHeal(ctx, wsc.conn, wsc.client, opts)
sendWsCloseMessage(wsc.conn, err)
}
// sendWsCloseMessage sends Websocket Connection Close Message indicating the Status Code
// see https://tools.ietf.org/html/rfc6455#page-45
func sendWsCloseMessage(conn WSConn, err error) {
if err != nil {
// If connection exceeded read deadline send Close
// Message Policy Violation code since we don't want
// to let the receiver figure out the read deadline.
// This is a generic code designed if there is a
// need to hide specific details about the policy.
if nErr, ok := err.(net.Error); ok && nErr.Timeout() {
conn.writeMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, ""))
return
}
// else, internal server error
conn.writeMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error()))
return
}
// normal closure
conn.writeMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
}