Limit concurrent downloads & uploads (#2313)

This commit is contained in:
Alex
2022-09-23 12:35:55 -05:00
committed by GitHub
parent c4c6d48abf
commit d1511c5eb0
20 changed files with 412 additions and 20 deletions

View File

@@ -0,0 +1,70 @@
// Code generated by go-swagger; DO NOT EDIT.
// This file is part of MinIO Console Server
// Copyright (c) 2022 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
package models
// This file was generated by the swagger tool.
// Editing this file might prove futile when you re-run the swagger generate command
import (
"context"
"github.com/go-openapi/strfmt"
"github.com/go-openapi/swag"
)
// EnvironmentConstants environment constants
//
// swagger:model environmentConstants
type EnvironmentConstants struct {
// max concurrent downloads
MaxConcurrentDownloads int64 `json:"maxConcurrentDownloads,omitempty"`
// max concurrent uploads
MaxConcurrentUploads int64 `json:"maxConcurrentUploads,omitempty"`
}
// Validate validates this environment constants
func (m *EnvironmentConstants) Validate(formats strfmt.Registry) error {
return nil
}
// ContextValidate validates this environment constants based on context it is used
func (m *EnvironmentConstants) ContextValidate(ctx context.Context, formats strfmt.Registry) error {
return nil
}
// MarshalBinary interface implementation
func (m *EnvironmentConstants) MarshalBinary() ([]byte, error) {
if m == nil {
return nil, nil
}
return swag.WriteJSON(m)
}
// UnmarshalBinary interface implementation
func (m *EnvironmentConstants) UnmarshalBinary(b []byte) error {
var res EnvironmentConstants
if err := swag.ReadJSON(b, &res); err != nil {
return err
}
*m = res
return nil
}

View File

@@ -47,6 +47,9 @@ type SessionResponse struct {
// distributed mode
DistributedMode bool `json:"distributedMode,omitempty"`
// env constants
EnvConstants *EnvironmentConstants `json:"envConstants,omitempty"`
// features
Features []string `json:"features"`
@@ -69,6 +72,10 @@ func (m *SessionResponse) Validate(formats strfmt.Registry) error {
res = append(res, err)
}
if err := m.validateEnvConstants(formats); err != nil {
res = append(res, err)
}
if err := m.validateStatus(formats); err != nil {
res = append(res, err)
}
@@ -105,6 +112,25 @@ func (m *SessionResponse) validateAllowResources(formats strfmt.Registry) error
return nil
}
func (m *SessionResponse) validateEnvConstants(formats strfmt.Registry) error {
if swag.IsZero(m.EnvConstants) { // not required
return nil
}
if m.EnvConstants != nil {
if err := m.EnvConstants.Validate(formats); err != nil {
if ve, ok := err.(*errors.Validation); ok {
return ve.ValidateName("envConstants")
} else if ce, ok := err.(*errors.CompositeError); ok {
return ce.ValidateName("envConstants")
}
return err
}
}
return nil
}
var sessionResponseTypeStatusPropEnum []interface{}
func init() {
@@ -152,6 +178,10 @@ func (m *SessionResponse) ContextValidate(ctx context.Context, formats strfmt.Re
res = append(res, err)
}
if err := m.contextValidateEnvConstants(ctx, formats); err != nil {
res = append(res, err)
}
if len(res) > 0 {
return errors.CompositeValidationError(res...)
}
@@ -178,6 +208,22 @@ func (m *SessionResponse) contextValidateAllowResources(ctx context.Context, for
return nil
}
func (m *SessionResponse) contextValidateEnvConstants(ctx context.Context, formats strfmt.Registry) error {
if m.EnvConstants != nil {
if err := m.EnvConstants.ContextValidate(ctx, formats); err != nil {
if ve, ok := err.(*errors.Validation); ok {
return ve.ValidateName("envConstants")
} else if ce, ok := err.(*errors.CompositeError); ok {
return ce.ValidateName("envConstants")
}
return err
}
}
return nil
}
// MarshalBinary interface implementation
func (m *SessionResponse) MarshalBinary() ([]byte, error) {
if m == nil {

View File

@@ -99,7 +99,9 @@ import {
import {
makeid,
removeTrace,
storeCallForObjectWithID,
storeFormDataWithID,
} from "../../../../ObjectBrowser/transferManager";
import {
cancelObjectInList,
@@ -777,12 +779,15 @@ const ListObjects = () => {
`${bucketName}-${object.name}-${new Date().getTime()}-${Math.random()}`
);
const ID = makeid(8);
const downloadCall = download(
bucketName,
encodeURLString(object.name),
object.version_id,
object.size,
null,
ID,
(progress) => {
dispatch(
updateProgress({
@@ -801,7 +806,6 @@ const ListObjects = () => {
dispatch(cancelObjectInList(identityDownload));
}
);
const ID = makeid(8);
storeCallForObjectWithID(ID, downloadCall);
dispatch(
setNewObject({
@@ -818,8 +822,6 @@ const ListObjects = () => {
errorMessage: "",
})
);
downloadCall.send();
};
const openPath = (idElement: string) => {
@@ -865,6 +867,7 @@ const ListObjects = () => {
const fileWebkitRelativePath = get(file, "webkitRelativePath", "");
let relativeFolderPath = folderPath;
const ID = makeid(8);
// File was uploaded via drag & drop
if (filePath !== "") {
@@ -924,6 +927,8 @@ const ListObjects = () => {
if (xhr.status >= 200 && xhr.status < 300) {
dispatch(completeObject(identity));
resolve({ status: xhr.status });
removeTrace(ID);
} else {
// reject promise if there was a server error
if (errorMessages[xhr.status]) {
@@ -936,6 +941,7 @@ const ListObjects = () => {
errorMessage = "something went wrong";
}
}
dispatch(
failObject({
instanceID: identity,
@@ -943,6 +949,8 @@ const ListObjects = () => {
})
);
reject({ status: xhr.status, message: errorMessage });
removeTrace(ID);
}
};
@@ -990,7 +998,6 @@ const ListObjects = () => {
const formData = new FormData();
if (file.size !== undefined) {
formData.append(file.size.toString(), blobFile, fileName);
const ID = makeid(8);
storeCallForObjectWithID(ID, xhr);
dispatch(
setNewObject({
@@ -1008,7 +1015,8 @@ const ListObjects = () => {
})
);
xhr.send(formData);
storeFormDataWithID(ID, formData);
storeCallForObjectWithID(ID, xhr);
}
});
};

View File

@@ -302,12 +302,15 @@ const ObjectDetailPanel = ({
return;
}
const ID = makeid(8);
const downloadCall = download(
bucketName,
internalPaths,
object.version_id,
parseInt(object.size || "0"),
null,
ID,
(progress) => {
dispatch(
updateProgress({
@@ -326,7 +329,7 @@ const ObjectDetailPanel = ({
dispatch(cancelObjectInList(identityDownload));
}
);
const ID = makeid(8);
storeCallForObjectWithID(ID, downloadCall);
dispatch(
setNewObject({
@@ -343,8 +346,6 @@ const ObjectDetailPanel = ({
errorMessage: "",
})
);
downloadCall.send();
};
const closeDeleteModal = (closeAndReload: boolean) => {

View File

@@ -256,12 +256,15 @@ const VersionsNavigator = ({
`${bucketName}-${object.name}-${new Date().getTime()}-${Math.random()}`
);
const ID = makeid(8);
const downloadCall = download(
bucketName,
internalPaths,
object.version_id,
parseInt(object.size || "0"),
null,
ID,
(progress) => {
dispatch(
updateProgress({
@@ -280,7 +283,7 @@ const VersionsNavigator = ({
dispatch(cancelObjectInList(identityDownload));
}
);
const ID = makeid(8);
storeCallForObjectWithID(ID, downloadCall);
dispatch(
setNewObject({
@@ -297,8 +300,6 @@ const VersionsNavigator = ({
errorMessage: "",
})
);
downloadCall.send();
};
const onShareItem = (item: IFileInfo) => {

View File

@@ -17,6 +17,7 @@
import { BucketObjectItem } from "./ListObjects/types";
import { IAllowResources } from "../../../types";
import { encodeURLString } from "../../../../../common/utils";
import { removeTrace } from "../../../ObjectBrowser/transferManager";
export const download = (
bucketName: string,
@@ -24,6 +25,7 @@ export const download = (
versionID: any,
fileSize: number,
overrideFileName: string | null = null,
id: string,
progressCallback: (progress: number) => void,
completeCallback: () => void,
errorCallback: (msg: string) => void,
@@ -74,6 +76,8 @@ export const download = (
completeCallback();
}
removeTrace(id);
var link = document.createElement("a");
link.href = window.URL.createObjectURL(req.response);
link.download = filename;
@@ -104,7 +108,6 @@ export const download = (
abortCallback();
}
};
//req.send();
return req;
};

View File

@@ -170,7 +170,6 @@ const ObjectHandled = ({
<button
onClick={() => {
if (!objectToDisplay.done) {
console.log("//abort");
const call = callForObjectID(objectToDisplay.ID);
if (call) {
call.abort();

View File

@@ -0,0 +1,112 @@
// This file is part of MinIO Console Server
// Copyright (c) 2021 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
import React, { Fragment, useEffect } from "react";
import { AppState, useAppDispatch } from "../../../../store";
import { useSelector } from "react-redux";
import {
callForObjectID,
formDataFromID,
} from "../../ObjectBrowser/transferManager";
import {
newDownloadInit,
newUploadInit,
} from "../../ObjectBrowser/objectBrowserSlice";
const TrafficMonitor = () => {
const dispatch = useAppDispatch();
const objects = useSelector(
(state: AppState) => state.objectBrowser.objectManager.objectsToManage
);
const limitVars = useSelector(
(state: AppState) => state.console.session.envConstants
);
const currentDIP = useSelector(
(state: AppState) => state.objectBrowser.objectManager.currentDownloads
);
const currentUIP = useSelector(
(state: AppState) => state.objectBrowser.objectManager.currentUploads
);
const limitUploads = limitVars?.maxConcurrentUploads || 10;
const limitDownloads = limitVars?.maxConcurrentDownloads || 20;
useEffect(() => {
if (objects.length > 0) {
const filterDownloads = objects.filter(
(object) =>
object.type === "download" &&
!object.done &&
!currentDIP.includes(object.ID)
);
const filterUploads = objects.filter(
(object) =>
object.type === "upload" &&
!object.done &&
!currentUIP.includes(object.ID)
);
const remainingDownloadSlots = limitDownloads - currentDIP.length;
if (
filterDownloads.length > 0 &&
(remainingDownloadSlots > 0 || limitDownloads === 0)
) {
const itemsToDownload = filterDownloads.slice(
0,
remainingDownloadSlots
);
itemsToDownload.forEach((item) => {
const objectRequest = callForObjectID(item.ID);
if (objectRequest) {
objectRequest.send();
}
dispatch(newDownloadInit(item.ID));
});
}
const remainingUploadSlots = limitUploads - currentUIP.length;
if (
filterUploads.length > 0 &&
(remainingUploadSlots > 0 || limitUploads === 0)
) {
const itemsToUpload = filterUploads.slice(0, remainingUploadSlots);
itemsToUpload.forEach((item) => {
const uploadRequest = callForObjectID(item.ID);
const formDataID = formDataFromID(item.ID);
if (uploadRequest && formDataID) {
uploadRequest.send(formDataID);
}
dispatch(newUploadInit(item.ID));
});
}
}
}, [objects, limitUploads, limitDownloads, currentDIP, currentUIP, dispatch]);
return <Fragment />;
};
export default TrafficMonitor;

View File

@@ -19,6 +19,7 @@ import Console from "./Console";
import { useSelector } from "react-redux";
import CommandBar from "./CommandBar";
import { selFeatures } from "./consoleSlice";
import TrafficMonitor from "./Common/ObjectManager/TrafficMonitor";
const ConsoleKBar = () => {
const features = useSelector(selFeatures);
@@ -33,6 +34,7 @@ const ConsoleKBar = () => {
enableHistory: true,
}}
>
<TrafficMonitor />
<CommandBar />
<Console />
</KBarProvider>

View File

@@ -84,12 +84,15 @@ const RenameLongFileName = ({
}-${new Date().getTime()}-${Math.random()}`
);
const ID = makeid(8);
const downloadCall = download(
bucketName,
internalPaths,
actualInfo.version_id,
parseInt(actualInfo.size || "0"),
newFileName,
ID,
(progress) => {
dispatch(
updateProgress({
@@ -108,7 +111,7 @@ const RenameLongFileName = ({
dispatch(cancelObjectInList(identityDownload));
}
);
const ID = makeid(8);
storeCallForObjectWithID(ID, downloadCall);
dispatch(
setNewObject({
@@ -125,8 +128,6 @@ const RenameLongFileName = ({
errorMessage: "",
})
);
downloadCall.send();
closeModal();
};

View File

@@ -36,6 +36,9 @@ const initialState: ObjectBrowserState = {
objectsToManage: [],
managerOpen: false,
newItems: false,
startedItems: [],
currentDownloads: [],
currentUploads: [],
},
searchObjects: "",
versionedFile: "",
@@ -119,6 +122,18 @@ export const objectBrowserSlice = createSlice({
state.objectManager.objectsToManage[objectToComplete].waitingForFile =
false;
state.objectManager.objectsToManage[objectToComplete].done = true;
// We cancel from in-progress lists
const type = state.objectManager.objectsToManage[objectToComplete].type;
const ID = state.objectManager.objectsToManage[objectToComplete].ID;
if (type === "download") {
state.objectManager.currentDownloads =
state.objectManager.currentDownloads.filter((item) => item !== ID);
} else if (type === "upload") {
state.objectManager.currentUploads =
state.objectManager.currentUploads.filter((item) => item !== ID);
}
},
failObject: (
state,
@@ -133,6 +148,18 @@ export const objectBrowserSlice = createSlice({
state.objectManager.objectsToManage[objectToFail].done = true;
state.objectManager.objectsToManage[objectToFail].errorMessage =
action.payload.msg;
// We cancel from in-progress lists
const type = state.objectManager.objectsToManage[objectToFail].type;
const ID = state.objectManager.objectsToManage[objectToFail].ID;
if (type === "download") {
state.objectManager.currentDownloads =
state.objectManager.currentDownloads.filter((item) => item !== ID);
} else if (type === "upload") {
state.objectManager.currentUploads =
state.objectManager.currentUploads.filter((item) => item !== ID);
}
},
cancelObjectInList: (state, action: PayloadAction<string>) => {
const objectToCancel = state.objectManager.objectsToManage.findIndex(
@@ -146,6 +173,18 @@ export const objectBrowserSlice = createSlice({
state.objectManager.objectsToManage[objectToCancel].cancelled = true;
state.objectManager.objectsToManage[objectToCancel].done = true;
state.objectManager.objectsToManage[objectToCancel].percentage = 0;
// We cancel from in-progress lists
const type = state.objectManager.objectsToManage[objectToCancel].type;
const ID = state.objectManager.objectsToManage[objectToCancel].ID;
if (type === "download") {
state.objectManager.currentDownloads =
state.objectManager.currentDownloads.filter((item) => item !== ID);
} else if (type === "upload") {
state.objectManager.currentUploads =
state.objectManager.currentUploads.filter((item) => item !== ID);
}
},
deleteFromList: (state, action: PayloadAction<string>) => {
const notObject = state.objectManager.objectsToManage.filter(
@@ -208,6 +247,18 @@ export const objectBrowserSlice = createSlice({
setSimplePathHandler: (state, action: PayloadAction<string>) => {
state.simplePath = action.payload;
},
newDownloadInit: (state, action: PayloadAction<string>) => {
state.objectManager.currentDownloads = [
...state.objectManager.currentDownloads,
action.payload,
];
},
newUploadInit: (state, action: PayloadAction<string>) => {
state.objectManager.currentUploads = [
...state.objectManager.currentUploads,
action.payload,
];
},
},
});
export const {
@@ -234,6 +285,8 @@ export const {
setObjectDetailsView,
setSelectedObjectView,
setSimplePathHandler,
newDownloadInit,
newUploadInit,
} = objectBrowserSlice.actions;
export default objectBrowserSlice.reducer;

View File

@@ -15,6 +15,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
let objectCalls: { [key: string]: XMLHttpRequest } = {};
let formDataElements: { [key: string]: FormData } = {};
export const storeCallForObjectWithID = (id: string, call: XMLHttpRequest) => {
objectCalls[id] = call;
@@ -24,6 +25,19 @@ export const callForObjectID = (id: string): XMLHttpRequest => {
return objectCalls[id];
};
export const storeFormDataWithID = (id: string, formData: FormData) => {
formDataElements[id] = formData;
};
export const formDataFromID = (id: string): FormData => {
return formDataElements[id];
};
export const removeTrace = (id: string) => {
delete objectCalls[id];
delete formDataElements[id];
};
export const makeid = (length: number) => {
var result = "";
var characters =

View File

@@ -78,14 +78,13 @@ export interface ObjectBrowserState {
simplePath: string | null;
}
export interface ObjectBrowserReducer {
objectBrowser: ObjectBrowserState;
}
export interface ObjectManager {
objectsToManage: IFileItem[];
managerOpen: boolean;
newItems: boolean;
startedItems: string[];
currentDownloads: string[];
currentUploads: string[];
}
export interface IFileItem {

View File

@@ -31,6 +31,7 @@ const initialState: ConsoleState = {
permissions: {},
allowResources: null,
customStyles: null,
envConstants: null,
},
};

View File

@@ -26,6 +26,11 @@ export interface IAllowResources {
resource: string;
}
export interface IEnvironmentContants {
maxConcurrentDownloads: number;
maxConcurrentUploads: number;
}
export interface ISessionResponse {
status: string;
features: string[];
@@ -35,6 +40,7 @@ export interface ISessionResponse {
permissions: ISessionPermissions;
allowResources: IAllowResources[] | null;
customStyles?: string | null;
envConstants?: IEnvironmentContants | null;
}
export interface ButtonProps {

View File

@@ -254,3 +254,21 @@ func getPrometheusJobID() string {
func getPrometheusExtraLabels() string {
return env.Get(PrometheusExtraLabels, "")
}
func getMaxConcurrentUploadsLimit() int64 {
cu, err := strconv.ParseInt(env.Get(ConsoleMaxConcurrentUploads, "10"), 10, 64)
if err != nil {
return 10
}
return cu
}
func getMaxConcurrentDownloadsLimit() int64 {
cu, err := strconv.ParseInt(env.Get(ConsoleMaxConcurrentDownloads, "20"), 10, 64)
if err != nil {
return 20
}
return cu
}

View File

@@ -51,6 +51,8 @@ const (
PrometheusExtraLabels = "CONSOLE_PROMETHEUS_EXTRA_LABELS"
ConsoleLogQueryURL = "CONSOLE_LOG_QUERY_URL"
ConsoleLogQueryAuthToken = "CONSOLE_LOG_QUERY_AUTH_TOKEN"
ConsoleMaxConcurrentUploads = "CONSOLE_MAX_CONCURRENT_UPLOADS"
ConsoleMaxConcurrentDownloads = "CONSOLE_MAX_CONCURRENT_DOWNLOADS"
LogSearchQueryAuthToken = "LOGSEARCH_QUERY_AUTH_TOKEN"
SlashSeparator = "/"
)

View File

@@ -5570,6 +5570,17 @@ func init() {
}
}
},
"environmentConstants": {
"type": "object",
"properties": {
"maxConcurrentDownloads": {
"type": "integer"
},
"maxConcurrentUploads": {
"type": "integer"
}
}
},
"error": {
"type": "object",
"required": [
@@ -5978,6 +5989,9 @@ func init() {
"type": "string"
}
},
"isDirectPV": {
"type": "boolean"
},
"loginStrategy": {
"type": "string",
"enum": [
@@ -6888,6 +6902,9 @@ func init() {
"distributedMode": {
"type": "boolean"
},
"envConstants": {
"$ref": "#/definitions/environmentConstants"
},
"features": {
"type": "array",
"items": {
@@ -13440,6 +13457,17 @@ func init() {
}
}
},
"environmentConstants": {
"type": "object",
"properties": {
"maxConcurrentDownloads": {
"type": "integer"
},
"maxConcurrentUploads": {
"type": "integer"
}
}
},
"error": {
"type": "object",
"required": [
@@ -13848,6 +13876,9 @@ func init() {
"type": "string"
}
},
"isDirectPV": {
"type": "boolean"
},
"loginStrategy": {
"type": "string",
"enum": [
@@ -14758,6 +14789,9 @@ func init() {
"distributedMode": {
"type": "boolean"
},
"envConstants": {
"$ref": "#/definitions/environmentConstants"
},
"features": {
"type": "array",
"items": {

View File

@@ -238,6 +238,13 @@ func getSessionResponse(ctx context.Context, session *models.Principal) (*models
if err != nil {
return nil, ErrorWithContext(ctx, err)
}
// environment constants
var envConstants models.EnvironmentConstants
envConstants.MaxConcurrentUploads = getMaxConcurrentUploadsLimit()
envConstants.MaxConcurrentDownloads = getMaxConcurrentDownloadsLimit()
sessionResp := &models.SessionResponse{
Features: getListOfEnabledFeatures(session),
Status: models.SessionResponseStatusOk,
@@ -246,6 +253,7 @@ func getSessionResponse(ctx context.Context, session *models.Principal) (*models
Permissions: resourcePermissions,
AllowResources: allowResources,
CustomStyles: customStyles,
EnvConstants: &envConstants,
}
return sessionResp, nil
}

View File

@@ -4009,6 +4009,8 @@ definitions:
type: array
items:
type: string
isDirectPV:
type: boolean
loginOauth2AuthRequest:
type: object
required:
@@ -4109,6 +4111,8 @@ definitions:
type: array
items:
$ref: "#/definitions/permissionResource"
envConstants:
$ref: "#/definitions/environmentConstants"
widgetResult:
type: object
@@ -5239,6 +5243,7 @@ definitions:
type: string
shortName:
type: string
checkVersionResponse:
type: object
properties:
@@ -5258,6 +5263,7 @@ definitions:
type: array
items:
type: string
aUserPolicyResponse:
type: object
properties:
@@ -5311,3 +5317,11 @@ definitions:
type: object
kmsListIdentitiesResponse:
type: object
environmentConstants:
type: object
properties:
maxConcurrentUploads:
type: integer
maxConcurrentDownloads:
type: integer