feat: Add notification support for bucketCreates and removal (#10075)

This commit is contained in:
Harshavardhana
2020-07-20 12:52:49 -07:00
committed by GitHub
parent 9fd836e51f
commit 2955aae8e4
30 changed files with 624 additions and 494 deletions

View File

@@ -671,7 +671,7 @@ func signStreamingRequest(req *http.Request, accessKey, secretKey string, currTi
// Returns new HTTP request object.
func newTestStreamingRequest(method, urlStr string, dataLength, chunkSize int64, body io.ReadSeeker) (*http.Request, error) {
if method == "" {
method = "POST"
method = http.MethodPost
}
req, err := http.NewRequest(method, urlStr, nil)
@@ -1036,7 +1036,7 @@ func getMD5HashBase64(data []byte) string {
// Returns new HTTP request object.
func newTestRequest(method, urlStr string, contentLength int64, body io.ReadSeeker) (*http.Request, error) {
if method == "" {
method = "POST"
method = http.MethodPost
}
// Save for subsequent use
@@ -1164,7 +1164,7 @@ func newTestSignedRequestV4(method, urlStr string, contentLength int64, body io.
// Return new WebRPC request object.
func newWebRPCRequest(methodRPC, authorization string, body io.ReadSeeker) (*http.Request, error) {
req, err := http.NewRequest("POST", "/minio/webrpc", nil)
req, err := http.NewRequest(http.MethodPost, "/minio/webrpc", nil)
if err != nil {
return nil, err
}
@@ -1515,7 +1515,7 @@ func getCompleteMultipartUploadURL(endPoint, bucketName, objectName, uploadID st
}
// return URL for listen bucket notification.
func getListenBucketNotificationURL(endPoint, bucketName string, prefixes, suffixes, events []string) string {
func getListenNotificationURL(endPoint, bucketName string, prefixes, suffixes, events []string) string {
queryValue := url.Values{}
queryValue["prefix"] = prefixes
@@ -1693,7 +1693,7 @@ func ExecObjectLayerAPIAnonTest(t *testing.T, obj ObjectLayer, testName, bucketN
}
// HEAD HTTTP request doesn't contain response body.
if anonReq.Method != "HEAD" {
if anonReq.Method != http.MethodHead {
// read the response body.
var actualContent []byte
actualContent, err = ioutil.ReadAll(rec.Body)
@@ -1724,7 +1724,7 @@ func ExecObjectLayerAPIAnonTest(t *testing.T, obj ObjectLayer, testName, bucketN
// call the handler using the HTTP Request.
apiRouter.ServeHTTP(rec, anonReq)
// verify the response body for `ErrAccessDenied` message =.
if anonReq.Method != "HEAD" {
if anonReq.Method != http.MethodHead {
// read the response body.
actualContent, err := ioutil.ReadAll(rec.Body)
if err != nil {
@@ -1781,7 +1781,7 @@ func ExecObjectLayerAPINilTest(t TestErrHandler, bucketName, objectName, instanc
// HEAD HTTP Request doesn't contain body in its response,
// for other type of HTTP requests compare the response body content with the expected one.
if req.Method != "HEAD" {
if req.Method != http.MethodHead {
// read the response body.
actualContent, err := ioutil.ReadAll(rec.Body)
if err != nil {
@@ -1975,76 +1975,76 @@ func registerBucketLevelFunc(bucket *mux.Router, api objectAPIHandlers, apiFunct
switch apiFunction {
case "PostPolicy":
// Register PostPolicy handler.
bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(api.PostPolicyBucketHandler)
bucket.Methods(http.MethodPost).HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(api.PostPolicyBucketHandler)
case "HeadObject":
// Register HeadObject handler.
bucket.Methods("Head").Path("/{object:.+}").HandlerFunc(api.HeadObjectHandler)
case "GetObject":
// Register GetObject handler.
bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(api.GetObjectHandler)
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(api.GetObjectHandler)
case "PutObject":
// Register PutObject handler.
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(api.PutObjectHandler)
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(api.PutObjectHandler)
case "DeleteObject":
// Register Delete Object handler.
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(api.DeleteObjectHandler)
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(api.DeleteObjectHandler)
case "CopyObject":
// Register Copy Object handler.
bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(api.CopyObjectHandler)
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(api.CopyObjectHandler)
case "PutBucketPolicy":
// Register PutBucket Policy handler.
bucket.Methods("PUT").HandlerFunc(api.PutBucketPolicyHandler).Queries("policy", "")
bucket.Methods(http.MethodPut).HandlerFunc(api.PutBucketPolicyHandler).Queries("policy", "")
case "DeleteBucketPolicy":
// Register Delete bucket HTTP policy handler.
bucket.Methods("DELETE").HandlerFunc(api.DeleteBucketPolicyHandler).Queries("policy", "")
bucket.Methods(http.MethodDelete).HandlerFunc(api.DeleteBucketPolicyHandler).Queries("policy", "")
case "GetBucketPolicy":
// Register Get Bucket policy HTTP Handler.
bucket.Methods("GET").HandlerFunc(api.GetBucketPolicyHandler).Queries("policy", "")
bucket.Methods(http.MethodGet).HandlerFunc(api.GetBucketPolicyHandler).Queries("policy", "")
case "GetBucketLifecycle":
bucket.Methods("GET").HandlerFunc(api.GetBucketLifecycleHandler).Queries("lifecycle", "")
bucket.Methods(http.MethodGet).HandlerFunc(api.GetBucketLifecycleHandler).Queries("lifecycle", "")
case "PutBucketLifecycle":
bucket.Methods("PUT").HandlerFunc(api.PutBucketLifecycleHandler).Queries("lifecycle", "")
bucket.Methods(http.MethodPut).HandlerFunc(api.PutBucketLifecycleHandler).Queries("lifecycle", "")
case "DeleteBucketLifecycle":
bucket.Methods("DELETE").HandlerFunc(api.DeleteBucketLifecycleHandler).Queries("lifecycle", "")
bucket.Methods(http.MethodDelete).HandlerFunc(api.DeleteBucketLifecycleHandler).Queries("lifecycle", "")
case "GetBucketLocation":
// Register GetBucketLocation handler.
bucket.Methods("GET").HandlerFunc(api.GetBucketLocationHandler).Queries("location", "")
bucket.Methods(http.MethodGet).HandlerFunc(api.GetBucketLocationHandler).Queries("location", "")
case "HeadBucket":
// Register HeadBucket handler.
bucket.Methods("HEAD").HandlerFunc(api.HeadBucketHandler)
bucket.Methods(http.MethodHead).HandlerFunc(api.HeadBucketHandler)
case "DeleteMultipleObjects":
// Register DeleteMultipleObjects handler.
bucket.Methods("POST").HandlerFunc(api.DeleteMultipleObjectsHandler).Queries("delete", "")
bucket.Methods(http.MethodPost).HandlerFunc(api.DeleteMultipleObjectsHandler).Queries("delete", "")
case "NewMultipart":
// Register New Multipart upload handler.
bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(api.NewMultipartUploadHandler).Queries("uploads", "")
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(api.NewMultipartUploadHandler).Queries("uploads", "")
case "CopyObjectPart":
// Register CopyObjectPart handler.
bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(api.CopyObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(api.CopyObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
case "PutObjectPart":
// Register PutObjectPart handler.
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(api.PutObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(api.PutObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
case "ListObjectParts":
// Register ListObjectParts handler.
bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(api.ListObjectPartsHandler).Queries("uploadId", "{uploadId:.*}")
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(api.ListObjectPartsHandler).Queries("uploadId", "{uploadId:.*}")
case "ListMultipartUploads":
// Register ListMultipartUploads handler.
bucket.Methods("GET").HandlerFunc(api.ListMultipartUploadsHandler).Queries("uploads", "")
bucket.Methods(http.MethodGet).HandlerFunc(api.ListMultipartUploadsHandler).Queries("uploads", "")
case "CompleteMultipart":
// Register Complete Multipart Upload handler.
bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(api.CompleteMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}")
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(api.CompleteMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}")
case "AbortMultipart":
// Register AbortMultipart Handler.
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(api.AbortMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}")
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(api.AbortMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}")
case "GetBucketNotification":
// Register GetBucketNotification Handler.
bucket.Methods("GET").HandlerFunc(api.GetBucketNotificationHandler).Queries("notification", "")
bucket.Methods(http.MethodGet).HandlerFunc(api.GetBucketNotificationHandler).Queries("notification", "")
case "PutBucketNotification":
// Register PutBucketNotification Handler.
bucket.Methods("PUT").HandlerFunc(api.PutBucketNotificationHandler).Queries("notification", "")
case "ListenBucketNotification":
// Register ListenBucketNotification Handler.
bucket.Methods("GET").HandlerFunc(api.ListenBucketNotificationHandler).Queries("events", "{events:.*}")
bucket.Methods(http.MethodPut).HandlerFunc(api.PutBucketNotificationHandler).Queries("notification", "")
case "ListenNotification":
// Register ListenNotification Handler.
bucket.Methods(http.MethodGet).HandlerFunc(api.ListenNotificationHandler).Queries("events", "{events:.*}")
}
}
}
@@ -2087,7 +2087,7 @@ func registerAPIFunctions(muxRouter *mux.Router, objLayer ObjectLayer, apiFuncti
}
// Register ListBuckets handler.
apiRouter.Methods("GET").HandlerFunc(api.ListBucketsHandler)
apiRouter.Methods(http.MethodGet).HandlerFunc(api.ListBucketsHandler)
// Register all bucket level handlers.
registerBucketLevelFunc(bucketRouter, api, apiFunctions...)
}
@@ -2306,7 +2306,7 @@ func uploadTestObject(t *testing.T, apiRouter http.Handler, creds auth.Credentia
if !asMultipart {
srcData := NewDummyDataGen(partSizes[0], 0)
req, err := newTestSignedRequestV4("PUT", getPutObjectURL("", bucketName, objectName),
req, err := newTestSignedRequestV4(http.MethodPut, getPutObjectURL("", bucketName, objectName),
partSizes[0], srcData, creds.AccessKey, creds.SecretKey, metadata)
if err != nil {
t.Fatalf("Unexpected err: %#v", err)
@@ -2320,7 +2320,7 @@ func uploadTestObject(t *testing.T, apiRouter http.Handler, creds auth.Credentia
// object when reading).
// Initiate mp upload
reqI, err := newTestSignedRequestV4("POST", getNewMultipartURL("", bucketName, objectName),
reqI, err := newTestSignedRequestV4(http.MethodPost, getNewMultipartURL("", bucketName, objectName),
0, nil, creds.AccessKey, creds.SecretKey, metadata)
if err != nil {
t.Fatalf("Unexpected err: %#v", err)
@@ -2343,7 +2343,7 @@ func uploadTestObject(t *testing.T, apiRouter http.Handler, creds auth.Credentia
partID := i + 1
partSrc := NewDummyDataGen(partLen, cumulativeSum)
cumulativeSum += partLen
req, errP := newTestSignedRequestV4("PUT",
req, errP := newTestSignedRequestV4(http.MethodPut,
getPutObjectPartURL("", bucketName, objectName, upID, fmt.Sprintf("%d", partID)),
partLen, partSrc, creds.AccessKey, creds.SecretKey, metadata)
if errP != nil {
@@ -2369,7 +2369,7 @@ func uploadTestObject(t *testing.T, apiRouter http.Handler, creds auth.Credentia
if err != nil {
t.Fatalf("Unexpected err: %#v", err)
}
reqC, errP := newTestSignedRequestV4("POST",
reqC, errP := newTestSignedRequestV4(http.MethodPost,
getCompleteMultipartUploadURL("", bucketName, objectName, upID),
int64(len(compMpBody)), bytes.NewReader(compMpBody),
creds.AccessKey, creds.SecretKey, metadata)