Merge pull request #1390 from carlisia/c-check-for-backup-III

Fail backup if it already exists in object storage
This commit is contained in:
Nolan Brubaker
2019-04-24 17:31:48 -04:00
committed by GitHub
22 changed files with 884 additions and 144 deletions

View File

@@ -0,0 +1 @@
Fail backup if it already exists in object storage.

View File

@@ -23,7 +23,9 @@ import (
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/aws/request"
v4 "github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
@@ -42,10 +44,18 @@ const (
signatureVersionKey = "signatureVersion"
)
type s3Interface interface {
HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error)
GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error)
ListObjectsV2Pages(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error
DeleteObject(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error)
GetObjectRequest(input *s3.GetObjectInput) (req *request.Request, output *s3.GetObjectOutput)
}
type ObjectStore struct {
log logrus.FieldLogger
s3 *s3.S3
preSignS3 *s3.S3
s3 s3Interface
preSignS3 s3Interface
s3Uploader *s3manager.Uploader
kmsKeyID string
signatureVersion string
@@ -191,6 +201,48 @@ func (o *ObjectStore) PutObject(bucket, key string, body io.Reader) error {
return errors.Wrapf(err, "error putting object %s", key)
}
const notFoundCode = "NotFound"
// ObjectExists checks if there is an object with the given key in the object storage bucket.
func (o *ObjectStore) ObjectExists(bucket, key string) (bool, error) {
log := o.log.WithFields(
logrus.Fields{
"bucket": bucket,
"key": key,
},
)
req := &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
}
log.Debug("Checking if object exists")
if _, err := o.s3.HeadObject(req); err != nil {
log.Debug("Checking for AWS specific error information")
if aerr, ok := err.(awserr.Error); ok {
log.WithFields(
logrus.Fields{
"code": aerr.Code(),
"message": aerr.Message(),
},
).Debugf("awserr.Error contents (origErr=%v)", aerr.OrigErr())
// The code will be NotFound if the key doesn't exist.
// See https://github.com/aws/aws-sdk-go/issues/1208 and https://github.com/aws/aws-sdk-go/pull/1213.
log.Debugf("Checking for code=%s", notFoundCode)
if aerr.Code() == notFoundCode {
log.Debug("Object doesn't exist - got not found")
return false, nil
}
}
return false, errors.WithStack(err)
}
log.Debug("Object exists")
return true, nil
}
func (o *ObjectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
req := &s3.GetObjectInput{
Bucket: &bucket,

View File

@@ -1,5 +1,5 @@
/*
Copyright 2018 the Velero contributors.
Copyright 2018, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -19,7 +19,16 @@ package aws
import (
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/heptio/velero/pkg/util/test"
)
func TestIsValidSignatureVersion(t *testing.T) {
@@ -27,3 +36,90 @@ func TestIsValidSignatureVersion(t *testing.T) {
assert.True(t, isValidSignatureVersion("4"))
assert.False(t, isValidSignatureVersion("3"))
}
type mockS3 struct {
mock.Mock
}
func (m *mockS3) HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
args := m.Called(input)
return args.Get(0).(*s3.HeadObjectOutput), args.Error(1)
}
func (m *mockS3) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
args := m.Called(input)
return args.Get(0).(*s3.GetObjectOutput), args.Error(1)
}
func (m *mockS3) ListObjectsV2Pages(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error {
args := m.Called(input, fn)
return args.Error(0)
}
func (m *mockS3) DeleteObject(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
args := m.Called(input)
return args.Get(0).(*s3.DeleteObjectOutput), args.Error(1)
}
func (m *mockS3) GetObjectRequest(input *s3.GetObjectInput) (req *request.Request, output *s3.GetObjectOutput) {
args := m.Called(input)
return args.Get(0).(*request.Request), args.Get(1).(*s3.GetObjectOutput)
}
func TestObjectExists(t *testing.T) {
tests := []struct {
name string
errorResponse error
expectedExists bool
expectedError string
}{
{
name: "exists",
errorResponse: nil,
expectedExists: true,
},
{
name: "doesn't exist",
errorResponse: awserr.New(s3.ErrCodeNoSuchKey, "no such key", nil),
expectedExists: false,
expectedError: "NoSuchKey: no such key",
},
{
name: "error checking for existence",
errorResponse: errors.Errorf("bad"),
expectedExists: false,
expectedError: "bad",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := new(mockS3)
defer s.AssertExpectations(t)
o := &ObjectStore{
log: test.NewLogger(),
s3: s,
}
bucket := "b"
key := "k"
req := &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
}
s.On("HeadObject", req).Return(&s3.HeadObjectOutput{}, tc.errorResponse)
exists, err := o.ObjectExists(bucket, key)
if tc.expectedError != "" {
assert.EqualError(t, err, tc.expectedError)
return
}
require.NoError(t, err)
assert.Equal(t, tc.expectedExists, exists)
})
}
}

View File

@@ -37,9 +37,97 @@ const (
storageAccountConfigKey = "storageAccount"
)
type containerGetter interface {
getContainer(bucket string) (container, error)
}
type azureContainerGetter struct {
blobService *storage.BlobStorageClient
}
func (cg *azureContainerGetter) getContainer(bucket string) (container, error) {
container := cg.blobService.GetContainerReference(bucket)
if container == nil {
return nil, errors.Errorf("unable to get container reference for bucket %v", bucket)
}
return &azureContainer{
container: container,
}, nil
}
type container interface {
ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
}
type azureContainer struct {
container *storage.Container
}
func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
return c.container.ListBlobs(params)
}
type blobGetter interface {
getBlob(bucket, key string) (blob, error)
}
type azureBlobGetter struct {
blobService *storage.BlobStorageClient
}
func (bg *azureBlobGetter) getBlob(bucket, key string) (blob, error) {
container := bg.blobService.GetContainerReference(bucket)
if container == nil {
return nil, errors.Errorf("unable to get container reference for bucket %v", bucket)
}
blob := container.GetBlobReference(key)
if blob == nil {
return nil, errors.Errorf("unable to get blob reference for key %v", key)
}
return &azureBlob{
blob: blob,
}, nil
}
type blob interface {
CreateBlockBlobFromReader(blob io.Reader, options *storage.PutBlobOptions) error
Exists() (bool, error)
Get(options *storage.GetBlobOptions) (io.ReadCloser, error)
Delete(options *storage.DeleteBlobOptions) error
GetSASURI(options *storage.BlobSASOptions) (string, error)
}
type azureBlob struct {
blob *storage.Blob
}
func (b *azureBlob) CreateBlockBlobFromReader(blob io.Reader, options *storage.PutBlobOptions) error {
return b.blob.CreateBlockBlobFromReader(blob, options)
}
func (b *azureBlob) Exists() (bool, error) {
return b.blob.Exists()
}
func (b *azureBlob) Get(options *storage.GetBlobOptions) (io.ReadCloser, error) {
return b.blob.Get(options)
}
func (b *azureBlob) Delete(options *storage.DeleteBlobOptions) error {
return b.blob.Delete(options)
}
func (b *azureBlob) GetSASURI(options *storage.BlobSASOptions) (string, error) {
return b.blob.GetSASURI(*options)
}
type ObjectStore struct {
blobClient *storage.BlobStorageClient
log logrus.FieldLogger
containerGetter containerGetter
blobGetter blobGetter
log logrus.FieldLogger
}
func NewObjectStore(logger logrus.FieldLogger) *ObjectStore {
@@ -122,18 +210,18 @@ func (o *ObjectStore) Init(config map[string]string) error {
}
blobClient := storageClient.GetBlobService()
o.blobClient = &blobClient
o.containerGetter = &azureContainerGetter{
blobService: &blobClient,
}
o.blobGetter = &azureBlobGetter{
blobService: &blobClient,
}
return nil
}
func (o *ObjectStore) PutObject(bucket, key string, body io.Reader) error {
container, err := getContainerReference(o.blobClient, bucket)
if err != nil {
return err
}
blob, err := getBlobReference(container, key)
blob, err := o.blobGetter.getBlob(bucket, key)
if err != nil {
return err
}
@@ -141,13 +229,22 @@ func (o *ObjectStore) PutObject(bucket, key string, body io.Reader) error {
return errors.WithStack(blob.CreateBlockBlobFromReader(body, nil))
}
func (o *ObjectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
container, err := getContainerReference(o.blobClient, bucket)
func (o *ObjectStore) ObjectExists(bucket, key string) (bool, error) {
blob, err := o.blobGetter.getBlob(bucket, key)
if err != nil {
return nil, err
return false, err
}
blob, err := getBlobReference(container, key)
exists, err := blob.Exists()
if err != nil {
return false, errors.WithStack(err)
}
return exists, nil
}
func (o *ObjectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
blob, err := o.blobGetter.getBlob(bucket, key)
if err != nil {
return nil, err
}
@@ -161,7 +258,7 @@ func (o *ObjectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
}
func (o *ObjectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
container, err := getContainerReference(o.blobClient, bucket)
container, err := o.containerGetter.getContainer(bucket)
if err != nil {
return nil, err
}
@@ -180,7 +277,7 @@ func (o *ObjectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]st
}
func (o *ObjectStore) ListObjects(bucket, prefix string) ([]string, error) {
container, err := getContainerReference(o.blobClient, bucket)
container, err := o.containerGetter.getContainer(bucket)
if err != nil {
return nil, err
}
@@ -203,12 +300,7 @@ func (o *ObjectStore) ListObjects(bucket, prefix string) ([]string, error) {
}
func (o *ObjectStore) DeleteObject(bucket string, key string) error {
container, err := getContainerReference(o.blobClient, bucket)
if err != nil {
return err
}
blob, err := getBlobReference(container, key)
blob, err := o.blobGetter.getBlob(bucket, key)
if err != nil {
return err
}
@@ -217,12 +309,7 @@ func (o *ObjectStore) DeleteObject(bucket string, key string) error {
}
func (o *ObjectStore) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) {
container, err := getContainerReference(o.blobClient, bucket)
if err != nil {
return "", err
}
blob, err := getBlobReference(container, key)
blob, err := o.blobGetter.getBlob(bucket, key)
if err != nil {
return "", err
}
@@ -236,23 +323,5 @@ func (o *ObjectStore) CreateSignedURL(bucket, key string, ttl time.Duration) (st
},
}
return blob.GetSASURI(opts)
}
func getContainerReference(blobClient *storage.BlobStorageClient, bucket string) (*storage.Container, error) {
container := blobClient.GetContainerReference(bucket)
if container == nil {
return nil, errors.Errorf("unable to get container reference for bucket %v", bucket)
}
return container, nil
}
func getBlobReference(container *storage.Container, key string) (*storage.Blob, error) {
blob := container.GetBlobReference(key)
if blob == nil {
return nil, errors.Errorf("unable to get blob reference for key %v", key)
}
return blob, nil
return blob.GetSASURI(&opts)
}

View File

@@ -0,0 +1,152 @@
/*
Copyright 2018, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"io"
"testing"
"github.com/Azure/azure-sdk-for-go/storage"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestObjectExists(t *testing.T) {
tests := []struct {
name string
getBlobError error
exists bool
errorResponse error
expectedExists bool
expectedError string
}{
{
name: "getBlob error",
exists: false,
errorResponse: errors.New("getBlob"),
expectedExists: false,
expectedError: "getBlob",
},
{
name: "exists",
exists: true,
errorResponse: nil,
expectedExists: true,
},
{
name: "doesn't exist",
exists: false,
errorResponse: nil,
expectedExists: false,
},
{
name: "error checking for existence",
exists: false,
errorResponse: errors.New("bad"),
expectedExists: false,
expectedError: "bad",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
blobGetter := new(mockBlobGetter)
defer blobGetter.AssertExpectations(t)
o := &ObjectStore{
blobGetter: blobGetter,
}
bucket := "b"
key := "k"
blob := new(mockBlob)
defer blob.AssertExpectations(t)
blobGetter.On("getBlob", bucket, key).Return(blob, tc.getBlobError)
blob.On("Exists").Return(tc.exists, tc.errorResponse)
exists, err := o.ObjectExists(bucket, key)
if tc.expectedError != "" {
assert.EqualError(t, err, tc.expectedError)
return
}
require.NoError(t, err)
assert.Equal(t, tc.expectedExists, exists)
})
}
}
type mockBlobGetter struct {
mock.Mock
}
func (m *mockBlobGetter) getBlob(bucket string, key string) (blob, error) {
args := m.Called(bucket, key)
return args.Get(0).(blob), args.Error(1)
}
type mockBlob struct {
mock.Mock
}
func (m *mockBlob) CreateBlockBlobFromReader(blob io.Reader, options *storage.PutBlobOptions) error {
args := m.Called(blob, options)
return args.Error(0)
}
func (m *mockBlob) Exists() (bool, error) {
args := m.Called()
return args.Bool(0), args.Error(1)
}
func (m *mockBlob) Get(options *storage.GetBlobOptions) (io.ReadCloser, error) {
args := m.Called(options)
return args.Get(0).(io.ReadCloser), args.Error(1)
}
func (m *mockBlob) Delete(options *storage.DeleteBlobOptions) error {
args := m.Called(options)
return args.Error(0)
}
func (m *mockBlob) GetSASURI(options *storage.BlobSASOptions) (string, error) {
args := m.Called(options)
return args.String(0), args.Error(1)
}
type mockContainerGetter struct {
mock.Mock
}
func (m *mockContainerGetter) getContainer(bucket string) (container, error) {
args := m.Called(bucket)
return args.Get(0).(container), args.Error(1)
}
type mockContainer struct {
mock.Mock
}
func (m *mockContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
args := m.Called(params)
return args.Get(0).(storage.BlobListResponse), args.Error(1)
}

View File

@@ -39,6 +39,7 @@ const credentialsEnvVar = "GOOGLE_APPLICATION_CREDENTIALS"
type bucketWriter interface {
// getWriteCloser returns an io.WriteCloser that can be used to upload data to the specified bucket for the specified key.
getWriteCloser(bucket, key string) io.WriteCloser
getAttrs(bucket, key string) (*storage.ObjectAttrs, error)
}
type writer struct {
@@ -49,6 +50,10 @@ func (w *writer) getWriteCloser(bucket, key string) io.WriteCloser {
return w.client.Bucket(bucket).Object(key).NewWriter(context.Background())
}
func (w *writer) getAttrs(bucket, key string) (*storage.ObjectAttrs, error) {
return w.client.Bucket(bucket).Object(key).Attrs(context.Background())
}
type ObjectStore struct {
log logrus.FieldLogger
client *storage.Client
@@ -117,6 +122,17 @@ func (o *ObjectStore) PutObject(bucket, key string, body io.Reader) error {
return closeErr
}
func (o *ObjectStore) ObjectExists(bucket, key string) (bool, error) {
if _, err := o.bucketWriter.getAttrs(bucket, key); err != nil {
if err == storage.ErrObjectNotExist {
return false, nil
}
return false, errors.WithStack(err)
}
return true, nil
}
func (o *ObjectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
r, err := o.client.Bucket(bucket).Object(key).NewReader(context.Background())
if err != nil {

View File

@@ -21,7 +21,9 @@ import (
"strings"
"testing"
"cloud.google.com/go/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
velerotest "github.com/heptio/velero/pkg/util/test"
)
@@ -45,6 +47,8 @@ func newMockWriteCloser(writeErr, closeErr error) *mockWriteCloser {
type fakeWriter struct {
wc *mockWriteCloser
attrsErr error
}
func newFakeWriter(wc *mockWriteCloser) *fakeWriter {
@@ -55,6 +59,10 @@ func (fw *fakeWriter) getWriteCloser(bucket, name string) io.WriteCloser {
return fw.wc
}
func (fw *fakeWriter) getAttrs(bucket, key string) (*storage.ObjectAttrs, error) {
return new(storage.ObjectAttrs), fw.attrsErr
}
func TestPutObject(t *testing.T) {
tests := []struct {
name string
@@ -93,8 +101,54 @@ func TestPutObject(t *testing.T) {
o.bucketWriter = newFakeWriter(wc)
err := o.PutObject("bucket", "key", strings.NewReader("contents"))
assert.Equal(t, test.expectedErr, err)
})
}
}
func TestObjectExists(t *testing.T) {
tests := []struct {
name string
errorResponse error
expectedExists bool
expectedError string
}{
{
name: "exists",
errorResponse: nil,
expectedExists: true,
},
{
name: "doesn't exist",
errorResponse: storage.ErrObjectNotExist,
expectedExists: false,
},
{
name: "error checking for existence",
errorResponse: errors.New("bad"),
expectedExists: false,
expectedError: "bad",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
o := NewObjectStore(velerotest.NewLogger())
w := newFakeWriter(nil)
o.bucketWriter = w
w.attrsErr = tc.errorResponse
bucket := "b"
key := "k"
exists, err := o.ObjectExists(bucket, key)
if tc.expectedError != "" {
assert.EqualError(t, err, tc.expectedError)
return
}
require.NoError(t, err)
assert.Equal(t, tc.expectedExists, exists)
})
}
}

View File

@@ -70,6 +70,19 @@ func (o *InMemoryObjectStore) PutObject(bucket, key string, body io.Reader) erro
return nil
}
func (o *InMemoryObjectStore) ObjectExists(bucket, key string) (bool, error) {
bucketData, ok := o.Data[bucket]
if !ok {
return false, errors.New("bucket not found")
}
if _, ok = bucketData[key]; !ok {
return false, errors.New("key not found")
}
return true, nil
}
func (o *InMemoryObjectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
bucketData, ok := o.Data[bucket]
if !ok {

View File

@@ -144,6 +144,27 @@ func (_m *ObjectStore) ListObjects(bucket string, prefix string) ([]string, erro
return r0, r1
}
// ObjectExists provides a mock function with given fields: bucket, key
func (_m *ObjectStore) ObjectExists(bucket string, key string) (bool, error) {
ret := _m.Called(bucket, key)
var r0 bool
if rf, ok := ret.Get(0).(func(string, string) bool); ok {
r0 = rf(bucket, key)
} else {
r0 = ret.Get(0).(bool)
}
var r1 error
if rf, ok := ret.Get(1).(func(string, string) error); ok {
r1 = rf(bucket, key)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PutObject provides a mock function with given fields: bucket, key, body
func (_m *ObjectStore) PutObject(bucket string, key string, body io.Reader) error {
ret := _m.Called(bucket, key, body)

View File

@@ -158,11 +158,16 @@ func (c *backupController) processBackup(key string) error {
log.Debug("Running processBackup")
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return errors.Wrap(err, "error splitting queue key")
log.WithError(err).Errorf("error splitting key")
return nil
}
log.Debug("Getting backup")
original, err := c.lister.Backups(ns).Get(name)
if apierrors.IsNotFound(err) {
log.Debugf("backup %s not found", name)
return nil
}
if err != nil {
return errors.Wrap(err, "error getting backup")
}
@@ -280,11 +285,12 @@ func (c *backupController) prepareBackupRequest(backup *velerov1api.Backup) *pkg
}
request.Labels[velerov1api.StorageLocationLabel] = request.Spec.StorageLocation
// validate the included/excluded resources and namespaces
// validate the included/excluded resources
for _, err := range collections.ValidateIncludesExcludes(request.Spec.IncludedResources, request.Spec.ExcludedResources) {
request.Status.ValidationErrors = append(request.Status.ValidationErrors, fmt.Sprintf("Invalid included/excluded resource lists: %v", err))
}
// validate the included/excluded namespaces
for _, err := range collections.ValidateIncludesExcludes(request.Spec.IncludedNamespaces, request.Spec.ExcludedNamespaces) {
request.Status.ValidationErrors = append(request.Status.ValidationErrors, fmt.Sprintf("Invalid included/excluded namespace lists: %v", err))
}
@@ -446,9 +452,18 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {
return err
}
var errs []error
exists, err := backupStore.BackupExists(backup.StorageLocation.Spec.StorageType.ObjectStorage.Bucket, backup.Name)
if exists || err != nil {
backup.Status.Phase = velerov1api.BackupPhaseFailed
backup.Status.CompletionTimestamp.Time = c.clock.Now()
if err != nil {
return errors.Wrapf(err, "error checking if backup already exists in object storage")
}
return errors.Errorf("backup already exists in object storage")
}
// Do the actual backup
var errs []error
if err := c.backupper.Backup(log, backup, backupFile, actions, pluginManager); err != nil {
errs = append(errs, err)
backup.Status.Phase = velerov1api.BackupPhaseFailed

View File

@@ -58,20 +58,17 @@ func (b *fakeBackupper) Backup(logger logrus.FieldLogger, backup *pkgbackup.Requ
func TestProcessBackupNonProcessedItems(t *testing.T) {
tests := []struct {
name string
key string
backup *v1.Backup
expectedErr string
name string
key string
backup *v1.Backup
}{
{
name: "bad key returns error",
key: "bad/key/here",
expectedErr: "error splitting queue key: unexpected key format: \"bad/key/here\"",
name: "bad key does not return error",
key: "bad/key/here",
},
{
name: "backup not found in lister returns error",
key: "nonexistent/backup",
expectedErr: "error getting backup: backup.velero.io \"backup\" not found",
name: "backup not found in lister does not return error",
key: "nonexistent/backup",
},
{
name: "FailedValidation backup is not processed",
@@ -112,12 +109,7 @@ func TestProcessBackupNonProcessedItems(t *testing.T) {
}
err := c.processBackup(test.key)
if test.expectedErr != "" {
require.Error(t, err)
assert.Equal(t, test.expectedErr, err.Error())
} else {
assert.Nil(t, err)
}
assert.Nil(t, err)
// Any backup that would actually proceed to validation will cause a segfault because this
// test hasn't set up the necessary controller dependencies for validation/etc. So the lack
@@ -255,18 +247,21 @@ func TestDefaultBackupTTL(t *testing.T) {
}
func TestProcessBackupCompletions(t *testing.T) {
defaultBackupLocation := velerotest.NewTestBackupStorageLocation().WithName("loc-1").BackupStorageLocation
defaultBackupLocation := velerotest.NewTestBackupStorageLocation().WithName("loc-1").WithObjectStorage("store-1").BackupStorageLocation
now, err := time.Parse(time.RFC1123Z, time.RFC1123Z)
require.NoError(t, err)
now = now.Local()
tests := []struct {
name string
backup *v1.Backup
backupLocation *v1.BackupStorageLocation
expectedResult *v1.Backup
name string
backup *v1.Backup
backupLocation *v1.BackupStorageLocation
expectedResult *v1.Backup
backupExists bool
existenceCheckError error
}{
// Completed
{
name: "backup with no backup location gets the default",
backup: velerotest.NewTestBackup().WithName("backup-1").Backup,
@@ -294,7 +289,7 @@ func TestProcessBackupCompletions(t *testing.T) {
{
name: "backup with a specific backup location keeps it",
backup: velerotest.NewTestBackup().WithName("backup-1").WithStorageLocation("alt-loc").Backup,
backupLocation: velerotest.NewTestBackupStorageLocation().WithName("alt-loc").BackupStorageLocation,
backupLocation: velerotest.NewTestBackupStorageLocation().WithName("alt-loc").WithObjectStorage("store-1").BackupStorageLocation,
expectedResult: &v1.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: v1.DefaultNamespace,
@@ -340,6 +335,83 @@ func TestProcessBackupCompletions(t *testing.T) {
},
},
},
{
name: "backup without an existing backup will succeed",
backupExists: false,
backup: velerotest.NewTestBackup().WithName("backup-1").Backup,
backupLocation: defaultBackupLocation,
expectedResult: &v1.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: v1.DefaultNamespace,
Name: "backup-1",
Labels: map[string]string{
"velero.io/storage-location": "loc-1",
},
},
Spec: v1.BackupSpec{
StorageLocation: defaultBackupLocation.Name,
},
Status: v1.BackupStatus{
Phase: v1.BackupPhaseCompleted,
Version: 1,
StartTimestamp: metav1.NewTime(now),
CompletionTimestamp: metav1.NewTime(now),
Expiration: metav1.NewTime(now),
},
},
},
// Failed
{
name: "backup with existing backup will fail",
backupExists: true,
backup: velerotest.NewTestBackup().WithName("backup-1").Backup,
backupLocation: defaultBackupLocation,
expectedResult: &v1.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: v1.DefaultNamespace,
Name: "backup-1",
Labels: map[string]string{
"velero.io/storage-location": "loc-1",
},
},
Spec: v1.BackupSpec{
StorageLocation: defaultBackupLocation.Name,
},
Status: v1.BackupStatus{
Phase: v1.BackupPhaseFailed,
Version: 1,
StartTimestamp: metav1.NewTime(now),
CompletionTimestamp: metav1.NewTime(now),
Expiration: metav1.NewTime(now),
},
},
},
{
name: "error when checking if backup exists will cause backup to fail",
backup: velerotest.NewTestBackup().WithName("backup-1").Backup,
existenceCheckError: errors.New("Backup already exists in object storage"),
backupLocation: defaultBackupLocation,
expectedResult: &v1.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: v1.DefaultNamespace,
Name: "backup-1",
Labels: map[string]string{
"velero.io/storage-location": "loc-1",
},
},
Spec: v1.BackupSpec{
StorageLocation: defaultBackupLocation.Name,
},
Status: v1.BackupStatus{
Phase: v1.BackupPhaseFailed,
Version: 1,
StartTimestamp: metav1.NewTime(now),
CompletionTimestamp: metav1.NewTime(now),
Expiration: metav1.NewTime(now),
},
},
},
}
for _, test := range tests {
@@ -380,6 +452,7 @@ func TestProcessBackupCompletions(t *testing.T) {
completionTimestampIsPresent := func(buf *bytes.Buffer) bool {
return strings.Contains(buf.String(), `"completionTimestamp": "2006-01-02T22:04:05Z"`)
}
backupStore.On("BackupExists", test.backupLocation.Spec.StorageType.ObjectStorage.Bucket, test.backup.Name).Return(test.backupExists, test.existenceCheckError)
backupStore.On("PutBackup", test.backup.Name, mock.MatchedBy(completionTimestampIsPresent), mock.Anything, mock.Anything, mock.Anything).Return(nil)
// add the test's backup to the informer/lister store

View File

@@ -63,6 +63,27 @@ func (_m *BackupStore) GetBackupContents(name string) (io.ReadCloser, error) {
return r0, r1
}
// BackupExists provides a mock function with given fields: bucket, backupName
func (_m *BackupStore) BackupExists(bucket string, backupName string) (bool, error) {
ret := _m.Called(bucket, backupName)
var r0 bool
if rf, ok := ret.Get(0).(func(string, string) bool); ok {
r0 = rf(bucket, backupName)
} else {
r0 = ret.Get(0).(bool)
}
var r1 error
if rf, ok := ret.Get(1).(func(string, string) error); ok {
r1 = rf(bucket, backupName)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetBackupMetadata provides a mock function with given fields: name
func (_m *BackupStore) GetBackupMetadata(name string) (*v1.Backup, error) {
ret := _m.Called(name)

View File

@@ -47,6 +47,10 @@ type BackupStore interface {
GetBackupMetadata(name string) (*velerov1api.Backup, error)
GetBackupVolumeSnapshots(name string) ([]*volume.Snapshot, error)
GetBackupContents(name string) (io.ReadCloser, error)
// BackupExists checks if the backup metadata file exists in object storage.
BackupExists(bucket, backupName string) (bool, error)
DeleteBackup(name string) error
PutRestoreLog(backup, restore string, log io.Reader) error
@@ -288,6 +292,10 @@ func (s *objectBackupStore) GetBackupContents(name string) (io.ReadCloser, error
return s.objectStore.GetObject(s.bucket, s.layout.getBackupContentsKey(name))
}
func (s *objectBackupStore) BackupExists(bucket, backupName string) (bool, error) {
return s.objectStore.ObjectExists(bucket, s.layout.getBackupMetadataKey(backupName))
}
func (s *objectBackupStore) DeleteBackup(name string) error {
objects, err := s.objectStore.ListObjects(s.bucket, s.layout.getBackupDir(name))
if err != nil {

View File

@@ -120,6 +120,15 @@ func (r *restartableObjectStore) PutObject(bucket string, key string, body io.Re
return delegate.PutObject(bucket, key, body)
}
// ObjectExists restarts the plugin's process if needed, then delegates the call.
func (r *restartableObjectStore) ObjectExists(bucket, key string) (bool, error) {
delegate, err := r.getDelegate()
if err != nil {
return false, err
}
return delegate.ObjectExists(bucket, key)
}
// GetObject restarts the plugin's process if needed, then delegates the call.
func (r *restartableObjectStore) GetObject(bucket string, key string) (io.ReadCloser, error) {
delegate, err := r.getDelegate()

View File

@@ -96,6 +96,22 @@ func (c *ObjectStoreGRPCClient) PutObject(bucket, key string, body io.Reader) er
}
}
// ObjectExists checks if there is an object with the given key in the object storage bucket.
func (c *ObjectStoreGRPCClient) ObjectExists(bucket, key string) (bool, error) {
req := &proto.ObjectExistsRequest{
Plugin: c.plugin,
Bucket: bucket,
Key: key,
}
res, err := c.grpcClient.ObjectExists(context.Background(), req)
if err != nil {
return false, err
}
return res.Exists, nil
}
// GetObject retrieves the object with the given key from the specified
// bucket in object storage.
func (c *ObjectStoreGRPCClient) GetObject(bucket, key string) (io.ReadCloser, error) {

View File

@@ -128,6 +128,27 @@ func (s *ObjectStoreGRPCServer) PutObject(stream proto.ObjectStore_PutObjectServ
return nil
}
// ObjectExists checks if there is an object with the given key in the object storage bucket.
func (s *ObjectStoreGRPCServer) ObjectExists(ctx context.Context, req *proto.ObjectExistsRequest) (response *proto.ObjectExistsResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, newGRPCError(err)
}
exists, err := impl.ObjectExists(req.Bucket, req.Key)
if err != nil {
return nil, newGRPCError(err)
}
return &proto.ObjectExistsResponse{Exists: exists}, nil
}
// GetObject retrieves the object with the given key from the specified
// bucket in object storage.
func (s *ObjectStoreGRPCServer) GetObject(req *proto.GetObjectRequest, stream proto.ObjectStore_GetObjectServer) (err error) {

View File

@@ -18,6 +18,8 @@ It has these top-level messages:
BackupItemActionAppliesToRequest
BackupItemActionAppliesToResponse
PutObjectRequest
ObjectExistsRequest
ObjectExistsResponse
GetObjectRequest
Bytes
ListCommonPrefixesRequest

View File

@@ -57,6 +57,54 @@ func (m *PutObjectRequest) GetBody() []byte {
return nil
}
type ObjectExistsRequest struct {
Plugin string `protobuf:"bytes,1,opt,name=plugin" json:"plugin,omitempty"`
Bucket string `protobuf:"bytes,2,opt,name=bucket" json:"bucket,omitempty"`
Key string `protobuf:"bytes,3,opt,name=key" json:"key,omitempty"`
}
func (m *ObjectExistsRequest) Reset() { *m = ObjectExistsRequest{} }
func (m *ObjectExistsRequest) String() string { return proto.CompactTextString(m) }
func (*ObjectExistsRequest) ProtoMessage() {}
func (*ObjectExistsRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{1} }
func (m *ObjectExistsRequest) GetPlugin() string {
if m != nil {
return m.Plugin
}
return ""
}
func (m *ObjectExistsRequest) GetBucket() string {
if m != nil {
return m.Bucket
}
return ""
}
func (m *ObjectExistsRequest) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
type ObjectExistsResponse struct {
Exists bool `protobuf:"varint,1,opt,name=exists" json:"exists,omitempty"`
}
func (m *ObjectExistsResponse) Reset() { *m = ObjectExistsResponse{} }
func (m *ObjectExistsResponse) String() string { return proto.CompactTextString(m) }
func (*ObjectExistsResponse) ProtoMessage() {}
func (*ObjectExistsResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{2} }
func (m *ObjectExistsResponse) GetExists() bool {
if m != nil {
return m.Exists
}
return false
}
type GetObjectRequest struct {
Plugin string `protobuf:"bytes,1,opt,name=plugin" json:"plugin,omitempty"`
Bucket string `protobuf:"bytes,2,opt,name=bucket" json:"bucket,omitempty"`
@@ -66,7 +114,7 @@ type GetObjectRequest struct {
func (m *GetObjectRequest) Reset() { *m = GetObjectRequest{} }
func (m *GetObjectRequest) String() string { return proto.CompactTextString(m) }
func (*GetObjectRequest) ProtoMessage() {}
func (*GetObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{1} }
func (*GetObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{3} }
func (m *GetObjectRequest) GetPlugin() string {
if m != nil {
@@ -96,7 +144,7 @@ type Bytes struct {
func (m *Bytes) Reset() { *m = Bytes{} }
func (m *Bytes) String() string { return proto.CompactTextString(m) }
func (*Bytes) ProtoMessage() {}
func (*Bytes) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{2} }
func (*Bytes) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{4} }
func (m *Bytes) GetData() []byte {
if m != nil {
@@ -115,7 +163,7 @@ type ListCommonPrefixesRequest struct {
func (m *ListCommonPrefixesRequest) Reset() { *m = ListCommonPrefixesRequest{} }
func (m *ListCommonPrefixesRequest) String() string { return proto.CompactTextString(m) }
func (*ListCommonPrefixesRequest) ProtoMessage() {}
func (*ListCommonPrefixesRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{3} }
func (*ListCommonPrefixesRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{5} }
func (m *ListCommonPrefixesRequest) GetPlugin() string {
if m != nil {
@@ -152,7 +200,7 @@ type ListCommonPrefixesResponse struct {
func (m *ListCommonPrefixesResponse) Reset() { *m = ListCommonPrefixesResponse{} }
func (m *ListCommonPrefixesResponse) String() string { return proto.CompactTextString(m) }
func (*ListCommonPrefixesResponse) ProtoMessage() {}
func (*ListCommonPrefixesResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{4} }
func (*ListCommonPrefixesResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{6} }
func (m *ListCommonPrefixesResponse) GetPrefixes() []string {
if m != nil {
@@ -170,7 +218,7 @@ type ListObjectsRequest struct {
func (m *ListObjectsRequest) Reset() { *m = ListObjectsRequest{} }
func (m *ListObjectsRequest) String() string { return proto.CompactTextString(m) }
func (*ListObjectsRequest) ProtoMessage() {}
func (*ListObjectsRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{5} }
func (*ListObjectsRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{7} }
func (m *ListObjectsRequest) GetPlugin() string {
if m != nil {
@@ -200,7 +248,7 @@ type ListObjectsResponse struct {
func (m *ListObjectsResponse) Reset() { *m = ListObjectsResponse{} }
func (m *ListObjectsResponse) String() string { return proto.CompactTextString(m) }
func (*ListObjectsResponse) ProtoMessage() {}
func (*ListObjectsResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{6} }
func (*ListObjectsResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{8} }
func (m *ListObjectsResponse) GetKeys() []string {
if m != nil {
@@ -218,7 +266,7 @@ type DeleteObjectRequest struct {
func (m *DeleteObjectRequest) Reset() { *m = DeleteObjectRequest{} }
func (m *DeleteObjectRequest) String() string { return proto.CompactTextString(m) }
func (*DeleteObjectRequest) ProtoMessage() {}
func (*DeleteObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{7} }
func (*DeleteObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{9} }
func (m *DeleteObjectRequest) GetPlugin() string {
if m != nil {
@@ -251,7 +299,7 @@ type CreateSignedURLRequest struct {
func (m *CreateSignedURLRequest) Reset() { *m = CreateSignedURLRequest{} }
func (m *CreateSignedURLRequest) String() string { return proto.CompactTextString(m) }
func (*CreateSignedURLRequest) ProtoMessage() {}
func (*CreateSignedURLRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{8} }
func (*CreateSignedURLRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{10} }
func (m *CreateSignedURLRequest) GetPlugin() string {
if m != nil {
@@ -288,7 +336,7 @@ type CreateSignedURLResponse struct {
func (m *CreateSignedURLResponse) Reset() { *m = CreateSignedURLResponse{} }
func (m *CreateSignedURLResponse) String() string { return proto.CompactTextString(m) }
func (*CreateSignedURLResponse) ProtoMessage() {}
func (*CreateSignedURLResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{9} }
func (*CreateSignedURLResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{11} }
func (m *CreateSignedURLResponse) GetUrl() string {
if m != nil {
@@ -305,7 +353,7 @@ type ObjectStoreInitRequest struct {
func (m *ObjectStoreInitRequest) Reset() { *m = ObjectStoreInitRequest{} }
func (m *ObjectStoreInitRequest) String() string { return proto.CompactTextString(m) }
func (*ObjectStoreInitRequest) ProtoMessage() {}
func (*ObjectStoreInitRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{10} }
func (*ObjectStoreInitRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{12} }
func (m *ObjectStoreInitRequest) GetPlugin() string {
if m != nil {
@@ -323,6 +371,8 @@ func (m *ObjectStoreInitRequest) GetConfig() map[string]string {
func init() {
proto.RegisterType((*PutObjectRequest)(nil), "generated.PutObjectRequest")
proto.RegisterType((*ObjectExistsRequest)(nil), "generated.ObjectExistsRequest")
proto.RegisterType((*ObjectExistsResponse)(nil), "generated.ObjectExistsResponse")
proto.RegisterType((*GetObjectRequest)(nil), "generated.GetObjectRequest")
proto.RegisterType((*Bytes)(nil), "generated.Bytes")
proto.RegisterType((*ListCommonPrefixesRequest)(nil), "generated.ListCommonPrefixesRequest")
@@ -348,6 +398,7 @@ const _ = grpc.SupportPackageIsVersion4
type ObjectStoreClient interface {
Init(ctx context.Context, in *ObjectStoreInitRequest, opts ...grpc.CallOption) (*Empty, error)
PutObject(ctx context.Context, opts ...grpc.CallOption) (ObjectStore_PutObjectClient, error)
ObjectExists(ctx context.Context, in *ObjectExistsRequest, opts ...grpc.CallOption) (*ObjectExistsResponse, error)
GetObject(ctx context.Context, in *GetObjectRequest, opts ...grpc.CallOption) (ObjectStore_GetObjectClient, error)
ListCommonPrefixes(ctx context.Context, in *ListCommonPrefixesRequest, opts ...grpc.CallOption) (*ListCommonPrefixesResponse, error)
ListObjects(ctx context.Context, in *ListObjectsRequest, opts ...grpc.CallOption) (*ListObjectsResponse, error)
@@ -406,6 +457,15 @@ func (x *objectStorePutObjectClient) CloseAndRecv() (*Empty, error) {
return m, nil
}
func (c *objectStoreClient) ObjectExists(ctx context.Context, in *ObjectExistsRequest, opts ...grpc.CallOption) (*ObjectExistsResponse, error) {
out := new(ObjectExistsResponse)
err := grpc.Invoke(ctx, "/generated.ObjectStore/ObjectExists", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *objectStoreClient) GetObject(ctx context.Context, in *GetObjectRequest, opts ...grpc.CallOption) (ObjectStore_GetObjectClient, error) {
stream, err := grpc.NewClientStream(ctx, &_ObjectStore_serviceDesc.Streams[1], c.cc, "/generated.ObjectStore/GetObject", opts...)
if err != nil {
@@ -479,6 +539,7 @@ func (c *objectStoreClient) CreateSignedURL(ctx context.Context, in *CreateSigne
type ObjectStoreServer interface {
Init(context.Context, *ObjectStoreInitRequest) (*Empty, error)
PutObject(ObjectStore_PutObjectServer) error
ObjectExists(context.Context, *ObjectExistsRequest) (*ObjectExistsResponse, error)
GetObject(*GetObjectRequest, ObjectStore_GetObjectServer) error
ListCommonPrefixes(context.Context, *ListCommonPrefixesRequest) (*ListCommonPrefixesResponse, error)
ListObjects(context.Context, *ListObjectsRequest) (*ListObjectsResponse, error)
@@ -534,6 +595,24 @@ func (x *objectStorePutObjectServer) Recv() (*PutObjectRequest, error) {
return m, nil
}
func _ObjectStore_ObjectExists_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ObjectExistsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ObjectStoreServer).ObjectExists(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/generated.ObjectStore/ObjectExists",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ObjectStoreServer).ObjectExists(ctx, req.(*ObjectExistsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ObjectStore_GetObject_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(GetObjectRequest)
if err := stream.RecvMsg(m); err != nil {
@@ -635,6 +714,10 @@ var _ObjectStore_serviceDesc = grpc.ServiceDesc{
MethodName: "Init",
Handler: _ObjectStore_Init_Handler,
},
{
MethodName: "ObjectExists",
Handler: _ObjectStore_ObjectExists_Handler,
},
{
MethodName: "ListCommonPrefixes",
Handler: _ObjectStore_ListCommonPrefixes_Handler,
@@ -670,39 +753,42 @@ var _ObjectStore_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("ObjectStore.proto", fileDescriptor1) }
var fileDescriptor1 = []byte{
// 531 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x5f, 0x8b, 0xd3, 0x40,
0x10, 0x67, 0x9b, 0x5c, 0x31, 0xd3, 0x82, 0x71, 0x4f, 0x6a, 0xcc, 0xa9, 0xd4, 0x45, 0x21, 0x22,
0x16, 0x39, 0x5f, 0xce, 0x3f, 0x0f, 0x62, 0x2d, 0x22, 0x14, 0x3c, 0x52, 0x45, 0x1f, 0x7c, 0x49,
0x9b, 0xb9, 0x5e, 0x6c, 0x9a, 0xc4, 0x64, 0x23, 0xe6, 0xd1, 0xcf, 0xe0, 0x37, 0xf1, 0x13, 0x4a,
0x36, 0x7b, 0xed, 0xa6, 0x97, 0xbb, 0x42, 0xe9, 0xdb, 0xcc, 0xec, 0xcc, 0xfc, 0x7e, 0xb3, 0xbb,
0xbf, 0x81, 0x5b, 0x9f, 0xa6, 0x3f, 0x70, 0xc6, 0x27, 0x3c, 0x4e, 0x71, 0x90, 0xa4, 0x31, 0x8f,
0xa9, 0x31, 0xc7, 0x08, 0x53, 0x8f, 0xa3, 0x6f, 0x77, 0x27, 0xe7, 0x5e, 0x8a, 0x7e, 0x75, 0xc0,
0xce, 0xc1, 0x3c, 0xcd, 0x79, 0x55, 0xe0, 0xe2, 0xcf, 0x1c, 0x33, 0x4e, 0x7b, 0xd0, 0x4e, 0xc2,
0x7c, 0x1e, 0x44, 0x16, 0xe9, 0x13, 0xc7, 0x70, 0xa5, 0x57, 0xc6, 0xa7, 0xf9, 0x6c, 0x81, 0xdc,
0x6a, 0x55, 0xf1, 0xca, 0xa3, 0x26, 0x68, 0x0b, 0x2c, 0x2c, 0x4d, 0x04, 0x4b, 0x93, 0x52, 0xd0,
0xa7, 0xb1, 0x5f, 0x58, 0x7a, 0x9f, 0x38, 0x5d, 0x57, 0xd8, 0xec, 0x33, 0x98, 0x1f, 0x70, 0xdf,
0x48, 0xec, 0x08, 0x0e, 0xde, 0x15, 0x1c, 0xb3, 0x12, 0xd2, 0xf7, 0xb8, 0x27, 0x1a, 0x75, 0x5d,
0x61, 0xb3, 0x3f, 0x04, 0xee, 0x8e, 0x83, 0x8c, 0x0f, 0xe3, 0xe5, 0x32, 0x8e, 0x4e, 0x53, 0x3c,
0x0b, 0x7e, 0x63, 0xb6, 0x2b, 0xf8, 0x3d, 0x30, 0x7c, 0x0c, 0x83, 0x65, 0xc0, 0x31, 0x95, 0x14,
0xd6, 0x01, 0xd1, 0x4d, 0x00, 0x88, 0xa1, 0xcb, 0x6e, 0xc2, 0x63, 0x27, 0x60, 0x37, 0x51, 0xc8,
0x92, 0x38, 0xca, 0x90, 0xda, 0x70, 0x23, 0x91, 0x31, 0x8b, 0xf4, 0x35, 0xc7, 0x70, 0x57, 0x3e,
0xfb, 0x0e, 0xb4, 0xac, 0xac, 0x6e, 0x6c, 0x67, 0xd6, 0x6b, 0x5e, 0x5a, 0x8d, 0xd7, 0x13, 0x38,
0xac, 0x75, 0x97, 0x84, 0x28, 0xe8, 0x0b, 0x2c, 0x2e, 0xc8, 0x08, 0x9b, 0x7d, 0x85, 0xc3, 0xf7,
0x18, 0x22, 0xc7, 0x7d, 0x3f, 0x5e, 0x08, 0xbd, 0x61, 0x8a, 0x1e, 0xc7, 0x49, 0x30, 0x8f, 0xd0,
0xff, 0xe2, 0x8e, 0xf7, 0xf7, 0x05, 0x4d, 0xd0, 0x38, 0x0f, 0xc5, 0x63, 0x68, 0x6e, 0x69, 0xb2,
0xa7, 0x70, 0xe7, 0x12, 0x9a, 0x9c, 0xda, 0x04, 0x2d, 0x4f, 0x43, 0x89, 0x55, 0x9a, 0xec, 0x1f,
0x81, 0x9e, 0x22, 0xa3, 0x8f, 0x51, 0xb0, 0x75, 0xee, 0x11, 0xb4, 0x67, 0x71, 0x74, 0x16, 0xcc,
0xad, 0x56, 0x5f, 0x73, 0x3a, 0xc7, 0xcf, 0x06, 0x2b, 0xd1, 0x0d, 0x9a, 0x5b, 0x0d, 0x86, 0x22,
0x7f, 0x14, 0xf1, 0xb4, 0x70, 0x65, 0xb1, 0xfd, 0x12, 0x3a, 0x4a, 0xf8, 0x62, 0x32, 0xb2, 0x9e,
0xec, 0x36, 0x1c, 0xfc, 0xf2, 0xc2, 0x1c, 0xe5, 0x15, 0x54, 0xce, 0xab, 0xd6, 0x09, 0x39, 0xfe,
0xab, 0x43, 0x47, 0x41, 0xa2, 0xaf, 0x41, 0x2f, 0xd1, 0xe8, 0xc3, 0xad, 0x4c, 0x6c, 0x53, 0x49,
0x19, 0x2d, 0x13, 0x5e, 0xd0, 0x37, 0x60, 0xac, 0x36, 0x03, 0x3d, 0x52, 0x8e, 0x37, 0xf7, 0xc5,
0xe5, 0x5a, 0x87, 0x94, 0xd5, 0x2b, 0xb5, 0xd7, 0xaa, 0x37, 0x77, 0x40, 0xad, 0x5a, 0x48, 0xf9,
0x39, 0xa1, 0x5e, 0xf5, 0xf5, 0xeb, 0xa2, 0xa1, 0x8f, 0x94, 0xcc, 0x2b, 0x65, 0x6d, 0x3f, 0xde,
0x92, 0x25, 0x9f, 0x7c, 0x0c, 0x1d, 0xe5, 0xff, 0xd3, 0xfb, 0x1b, 0x55, 0x75, 0xd5, 0xd9, 0x0f,
0xae, 0x3a, 0x96, 0xdd, 0xde, 0x42, 0x57, 0x95, 0x08, 0x55, 0xf3, 0x1b, 0xb4, 0xd3, 0x70, 0xdd,
0xdf, 0xe0, 0xe6, 0xc6, 0xef, 0xac, 0x3d, 0x5b, 0xb3, 0x4e, 0x6c, 0x76, 0x5d, 0x4a, 0xc5, 0x6d,
0xda, 0x16, 0x9b, 0xfe, 0xc5, 0xff, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe6, 0x73, 0xe9, 0xf8, 0x17,
0x06, 0x00, 0x00,
// 577 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0xd6, 0xc6, 0x69, 0x54, 0x4f, 0x22, 0x61, 0xb6, 0x55, 0x30, 0x2e, 0x94, 0xb0, 0x02, 0x29,
0x08, 0x11, 0xa1, 0x72, 0x29, 0xd0, 0x03, 0x22, 0x44, 0x08, 0x29, 0x52, 0x2b, 0x07, 0x04, 0x07,
0x2e, 0x4e, 0x3c, 0x4d, 0x4d, 0x1c, 0x3b, 0xd8, 0x6b, 0x54, 0x1f, 0x79, 0x25, 0x9e, 0x89, 0x07,
0x41, 0xbb, 0xde, 0x26, 0xde, 0xfc, 0x10, 0xa9, 0xca, 0x6d, 0x66, 0x76, 0xe7, 0x9b, 0x6f, 0x66,
0xfd, 0x8d, 0xe1, 0xee, 0xf9, 0xf0, 0x07, 0x8e, 0xf8, 0x80, 0xc7, 0x09, 0x76, 0x66, 0x49, 0xcc,
0x63, 0x6a, 0x8e, 0x31, 0xc2, 0xc4, 0xe3, 0xe8, 0x3b, 0x8d, 0xc1, 0x95, 0x97, 0xa0, 0x5f, 0x1c,
0xb0, 0x2b, 0xb0, 0x2e, 0x32, 0x5e, 0x24, 0xb8, 0xf8, 0x33, 0xc3, 0x94, 0xd3, 0x26, 0xd4, 0x66,
0x61, 0x36, 0x0e, 0x22, 0x9b, 0xb4, 0x48, 0xdb, 0x74, 0x95, 0x27, 0xe2, 0xc3, 0x6c, 0x34, 0x41,
0x6e, 0x57, 0x8a, 0x78, 0xe1, 0x51, 0x0b, 0x8c, 0x09, 0xe6, 0xb6, 0x21, 0x83, 0xc2, 0xa4, 0x14,
0xaa, 0xc3, 0xd8, 0xcf, 0xed, 0x6a, 0x8b, 0xb4, 0x1b, 0xae, 0xb4, 0xd9, 0x57, 0x38, 0x28, 0xca,
0xf4, 0xae, 0x83, 0x94, 0xa7, 0x3b, 0x2b, 0xc6, 0x3a, 0x70, 0xa8, 0x03, 0xa7, 0xb3, 0x38, 0x4a,
0x51, 0x20, 0xa0, 0x8c, 0x48, 0xe4, 0x7d, 0x57, 0x79, 0xec, 0x33, 0x58, 0x1f, 0x71, 0xd7, 0x2d,
0xb3, 0x23, 0xd8, 0x7b, 0x9f, 0x73, 0x4c, 0x45, 0xef, 0xbe, 0xc7, 0x3d, 0x09, 0xd4, 0x70, 0xa5,
0xcd, 0x7e, 0x13, 0xb8, 0xdf, 0x0f, 0x52, 0xde, 0x8d, 0xa7, 0xd3, 0x38, 0xba, 0x48, 0xf0, 0x32,
0xb8, 0xc6, 0x5b, 0x8f, 0xe0, 0x01, 0x98, 0x3e, 0x86, 0xc1, 0x34, 0xe0, 0x98, 0x28, 0x0a, 0x8b,
0x80, 0x44, 0x93, 0x05, 0xe4, 0xf4, 0x05, 0x9a, 0xf4, 0xd8, 0x29, 0x38, 0xeb, 0x28, 0xa8, 0x61,
0x39, 0xb0, 0x3f, 0x53, 0x31, 0x9b, 0xb4, 0x8c, 0xb6, 0xe9, 0xce, 0x7d, 0xf6, 0x1d, 0xa8, 0xc8,
0x2c, 0x26, 0x76, 0x6b, 0xd6, 0x0b, 0x5e, 0x86, 0xc6, 0xeb, 0x19, 0x1c, 0x68, 0xe8, 0x8a, 0x10,
0x85, 0xea, 0x04, 0xf3, 0x1b, 0x32, 0xd2, 0x16, 0x9f, 0xd0, 0x07, 0x0c, 0x91, 0xe3, 0xae, 0x1f,
0x2f, 0x84, 0x66, 0x37, 0x41, 0x8f, 0xe3, 0x20, 0x18, 0x47, 0xe8, 0x7f, 0x71, 0xfb, 0xbb, 0xd3,
0x82, 0x05, 0x06, 0xe7, 0xa1, 0x7c, 0x0c, 0xc3, 0x15, 0x26, 0x7b, 0x0e, 0xf7, 0x56, 0xaa, 0xa9,
0xae, 0x2d, 0x30, 0xb2, 0x24, 0x54, 0xb5, 0x84, 0xc9, 0xfe, 0x10, 0x68, 0x96, 0xf4, 0xfc, 0x29,
0x0a, 0xb6, 0xf6, 0xdd, 0x83, 0xda, 0x28, 0x8e, 0x2e, 0x83, 0xb1, 0x5d, 0x69, 0x19, 0xed, 0xfa,
0xc9, 0x8b, 0xce, 0x5c, 0xfd, 0x9d, 0xf5, 0x50, 0x9d, 0xae, 0xbc, 0xdf, 0x8b, 0x78, 0x92, 0xbb,
0x2a, 0xd9, 0x79, 0x0d, 0xf5, 0x52, 0xf8, 0xa6, 0x33, 0xb2, 0xe8, 0xec, 0x10, 0xf6, 0x7e, 0x79,
0x61, 0x86, 0x6a, 0x04, 0x85, 0xf3, 0xa6, 0x72, 0x4a, 0x4e, 0xfe, 0x56, 0xa1, 0x5e, 0xaa, 0x44,
0xdf, 0x42, 0x55, 0x54, 0xa3, 0x8f, 0xb7, 0x32, 0x71, 0xac, 0xd2, 0x95, 0xde, 0x74, 0xc6, 0x73,
0x7a, 0x06, 0xe6, 0x7c, 0x45, 0xd1, 0xa3, 0xd2, 0xf1, 0xf2, 0xe2, 0x5a, 0xcd, 0x6d, 0x13, 0x7a,
0x0e, 0x8d, 0xf2, 0x76, 0xa0, 0xc7, 0x2b, 0x14, 0xb4, 0x7d, 0xe4, 0x3c, 0xda, 0x78, 0xae, 0x9e,
0xe8, 0x0c, 0xcc, 0xf9, 0xfa, 0xd0, 0xe8, 0x2c, 0x2f, 0x15, 0x8d, 0x8e, 0xdc, 0x0d, 0x2f, 0x09,
0xf5, 0x0a, 0x2d, 0xe9, 0x2a, 0xa4, 0x4f, 0x4a, 0x37, 0x37, 0xee, 0x09, 0xe7, 0xe9, 0x96, 0x5b,
0x8a, 0x60, 0x1f, 0xea, 0x25, 0x41, 0xd1, 0x87, 0x4b, 0x59, 0xba, 0x8c, 0x9d, 0xe3, 0x4d, 0xc7,
0x0a, 0xed, 0x1d, 0x34, 0xca, 0x9a, 0xd3, 0xe6, 0xb7, 0x46, 0x8c, 0x6b, 0xde, 0xef, 0x1b, 0xdc,
0x59, 0xfa, 0xdc, 0xb5, 0xef, 0x60, 0xbd, 0xf0, 0x1c, 0xf6, 0xbf, 0x2b, 0x05, 0xb7, 0x61, 0x4d,
0xfe, 0xc3, 0x5e, 0xfd, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x0b, 0x4e, 0x76, 0xa3, 0xf1, 0x06, 0x00,
0x00,
}

View File

@@ -242,25 +242,26 @@ var _RestoreItemAction_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("RestoreItemAction.proto", fileDescriptor3) }
var fileDescriptor3 = []byte{
// 319 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0xcd, 0x4e, 0xf3, 0x30,
0x10, 0x94, 0xdb, 0xaa, 0x55, 0xb7, 0xd5, 0xf7, 0x81, 0x0f, 0x60, 0x15, 0x21, 0x42, 0x0e, 0xa8,
0xe2, 0xa7, 0x87, 0x72, 0xe4, 0x54, 0x24, 0xa8, 0x7a, 0x75, 0x79, 0x81, 0x34, 0x59, 0x5a, 0x8b,
0x24, 0x36, 0xb6, 0x23, 0x71, 0xe3, 0x11, 0x78, 0x46, 0xde, 0x04, 0xe5, 0x57, 0xb4, 0x81, 0xd0,
0x9b, 0x77, 0x3d, 0xb3, 0x33, 0x63, 0x2f, 0x1c, 0x73, 0x34, 0x56, 0x6a, 0x5c, 0x58, 0x8c, 0x66,
0xbe, 0x15, 0x32, 0x9e, 0x28, 0x2d, 0xad, 0xa4, 0xfd, 0x35, 0xc6, 0xa8, 0x3d, 0x8b, 0xc1, 0x68,
0xb8, 0xdc, 0x78, 0x1a, 0x83, 0xfc, 0xc2, 0xfd, 0x20, 0x70, 0x56, 0x23, 0x3d, 0xbc, 0xa1, 0x9f,
0x58, 0xe4, 0xf8, 0x9a, 0xa0, 0xb1, 0xf4, 0x08, 0xba, 0x2a, 0x4c, 0xd6, 0x22, 0x66, 0xc4, 0x21,
0xe3, 0x3e, 0x2f, 0x2a, 0x4a, 0xa1, 0x23, 0x2c, 0x46, 0xac, 0xe5, 0x90, 0xf1, 0x90, 0x67, 0x67,
0xca, 0xa0, 0xa7, 0xf3, 0x71, 0xac, 0x9d, 0xb5, 0xcb, 0x92, 0x5e, 0xc0, 0xbf, 0x14, 0xf1, 0xa8,
0x65, 0x74, 0xef, 0xf9, 0x2f, 0x89, 0x62, 0x9d, 0x0c, 0xb0, 0xd3, 0x75, 0xdf, 0xc1, 0xf9, 0xdd,
0x90, 0x51, 0x32, 0x36, 0x58, 0x29, 0x93, 0x6f, 0xca, 0x73, 0xf8, 0xef, 0x05, 0x81, 0x48, 0xe1,
0x5e, 0x98, 0x52, 0x0d, 0x6b, 0x39, 0xed, 0xf1, 0x60, 0x7a, 0x3a, 0xa9, 0xc2, 0x4f, 0x38, 0x1a,
0x99, 0x68, 0x1f, 0x17, 0x01, 0xc6, 0x56, 0x3c, 0x0b, 0xd4, 0x7c, 0x97, 0xe5, 0xde, 0xc1, 0x79,
0xcd, 0xc0, 0x4c, 0xa9, 0x50, 0xa0, 0x79, 0x92, 0x7f, 0xbc, 0x89, 0x1b, 0x81, 0xdb, 0x44, 0x2e,
0xfc, 0xcf, 0xe1, 0xa0, 0x74, 0xb2, 0xc4, 0x10, 0x7d, 0x2b, 0x75, 0x36, 0x67, 0x30, 0x3d, 0xf9,
0xc1, 0x6c, 0x09, 0xe1, 0x35, 0xd2, 0xf4, 0x93, 0xc0, 0x61, 0x4d, 0x8f, 0x6e, 0xa0, 0x5f, 0x69,
0xd2, 0xeb, 0xed, 0x89, 0xcd, 0xb9, 0x46, 0x37, 0x7b, 0xa2, 0x8b, 0x20, 0x2b, 0xe8, 0x15, 0x7f,
0x43, 0x2f, 0x9b, 0x98, 0xdb, 0x1b, 0x35, 0xba, 0xda, 0x0b, 0x9b, 0x6b, 0xac, 0xba, 0xd9, 0xa6,
0xde, 0x7e, 0x05, 0x00, 0x00, 0xff, 0xff, 0x96, 0x3a, 0x66, 0xfa, 0xdd, 0x02, 0x00, 0x00,
// 332 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0xdd, 0x4e, 0xc2, 0x30,
0x14, 0x4e, 0x81, 0x80, 0x1c, 0x88, 0x3f, 0xbd, 0xd0, 0x06, 0x63, 0x9c, 0xbb, 0x30, 0xc4, 0x1f,
0x2e, 0xf0, 0xd2, 0x2b, 0x4c, 0x94, 0x70, 0x5b, 0x7c, 0x81, 0xb1, 0x1d, 0xa1, 0x61, 0x5b, 0x6b,
0xdb, 0x25, 0xbe, 0x85, 0xcf, 0xe0, 0xa3, 0xf9, 0x26, 0x86, 0x31, 0x96, 0xc1, 0x74, 0x72, 0xd7,
0x73, 0xfa, 0x7d, 0xe7, 0xfb, 0xbe, 0xf6, 0xc0, 0x19, 0x47, 0x63, 0xa5, 0xc6, 0x89, 0xc5, 0x68,
0xe4, 0x5b, 0x21, 0xe3, 0x81, 0xd2, 0xd2, 0x4a, 0xda, 0x9e, 0x63, 0x8c, 0xda, 0xb3, 0x18, 0xf4,
0xba, 0xd3, 0x85, 0xa7, 0x31, 0x58, 0x5f, 0xb8, 0x9f, 0x04, 0x2e, 0x4b, 0xa4, 0xe7, 0x0f, 0xf4,
0x13, 0x8b, 0x1c, 0xdf, 0x13, 0x34, 0x96, 0x9e, 0x42, 0x53, 0x85, 0xc9, 0x5c, 0xc4, 0x8c, 0x38,
0xa4, 0xdf, 0xe6, 0x59, 0x45, 0x29, 0x34, 0x84, 0xc5, 0x88, 0xd5, 0x1c, 0xd2, 0xef, 0xf2, 0xf4,
0x4c, 0x19, 0xb4, 0xf4, 0x7a, 0x1c, 0xab, 0xa7, 0xed, 0x4d, 0x49, 0xaf, 0xe1, 0x70, 0x85, 0x78,
0xd1, 0x32, 0x7a, 0xf2, 0xfc, 0x65, 0xa2, 0x58, 0x23, 0x05, 0xec, 0x74, 0xdd, 0x2f, 0x02, 0xce,
0xdf, 0x8e, 0x8c, 0x92, 0xb1, 0xc1, 0x5c, 0x9a, 0x14, 0xa4, 0xc7, 0x70, 0xe4, 0x05, 0x81, 0x58,
0xc1, 0xbd, 0x70, 0x45, 0x35, 0xac, 0xe6, 0xd4, 0xfb, 0x9d, 0xe1, 0xc5, 0x20, 0x4f, 0x3f, 0xe0,
0x68, 0x64, 0xa2, 0x7d, 0x9c, 0x04, 0x18, 0x5b, 0xf1, 0x26, 0x50, 0xf3, 0x5d, 0x16, 0x75, 0xa0,
0x63, 0x96, 0x42, 0xf1, 0x42, 0x8e, 0x03, 0x5e, 0x6c, 0xb9, 0x8f, 0x70, 0x55, 0xb2, 0x38, 0x52,
0x2a, 0x14, 0x68, 0x5e, 0xe5, 0x3f, 0xcf, 0xe6, 0x46, 0xe0, 0x56, 0x91, 0xb3, 0x84, 0x63, 0x38,
0xde, 0x78, 0x9d, 0x62, 0x88, 0xbe, 0x95, 0x3a, 0x9d, 0xd3, 0x19, 0x9e, 0xff, 0x12, 0x67, 0x03,
0xe1, 0x25, 0xd2, 0xf0, 0x9b, 0xc0, 0x49, 0x49, 0x8f, 0x2e, 0xa0, 0x9d, 0x6b, 0xd2, 0xbb, 0xed,
0x89, 0xd5, 0xb9, 0x7a, 0xf7, 0x7b, 0xa2, 0xb3, 0x20, 0x33, 0x68, 0x65, 0xbf, 0x47, 0x6f, 0xaa,
0x98, 0xdb, 0x4b, 0xd7, 0xbb, 0xdd, 0x0b, 0xbb, 0xd6, 0x98, 0x35, 0xd3, 0x65, 0x7e, 0xf8, 0x09,
0x00, 0x00, 0xff, 0xff, 0x1b, 0x4c, 0xdc, 0xb7, 0x00, 0x03, 0x00, 0x00,
}

View File

@@ -10,6 +10,16 @@ message PutObjectRequest {
bytes body = 4;
}
message ObjectExistsRequest {
string plugin = 1;
string bucket = 2;
string key = 3;
}
message ObjectExistsResponse {
bool exists = 1;
}
message GetObjectRequest {
string plugin = 1;
string bucket = 2;
@@ -67,6 +77,7 @@ message ObjectStoreInitRequest {
service ObjectStore {
rpc Init(ObjectStoreInitRequest) returns (Empty);
rpc PutObject(stream PutObjectRequest) returns (Empty);
rpc ObjectExists(ObjectExistsRequest) returns (ObjectExistsResponse);
rpc GetObject(GetObjectRequest) returns (stream Bytes);
rpc ListCommonPrefixes(ListCommonPrefixesRequest) returns (ListCommonPrefixesResponse);
rpc ListObjects(ListObjectsRequest) returns (ListObjectsResponse);

View File

@@ -33,6 +33,9 @@ type ObjectStore interface {
// object storage bucket with the given key.
PutObject(bucket, key string, body io.Reader) error
// ObjectExists checks if there is an object with the given key in the object storage bucket.
ObjectExists(bucket, key string) (bool, error)
// GetObject retrieves the object with the given key from the specified
// bucket in object storage.
GetObject(bucket, key string) (io.ReadCloser, error)

View File

@@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/util/diff"
core "k8s.io/client-go/testing"
)
@@ -101,9 +102,8 @@ func AssertDeepEqual(t *testing.T, expected, actual interface{}) bool {
}
if !equality.Semantic.DeepEqual(expected, actual) {
fmt.Printf("expected = %+v\n", expected)
fmt.Printf("actual = %+v\n", actual)
return assert.Fail(t, fmt.Sprintf("Objects not equal"))
s := diff.ObjectDiff(expected, actual)
return assert.Fail(t, fmt.Sprintf("Objects not equal:\n\n%s", s))
}
return true