Simplify implementation

This commit is contained in:
Hoang, Phuong
2021-07-23 17:29:46 -07:00
committed by Bridget McErlean
parent 79d1616ecb
commit 3cbd7976bd
8 changed files with 70 additions and 233 deletions

View File

@@ -1,5 +1,5 @@
/*
Copyright 2020 the Velero contributors.
Copyright 2021 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@@ -1,5 +1,5 @@
/*
Copyright 2018, 2021 the Velero contributors.
Copyright 2021 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -44,8 +44,8 @@ func newAdaptedV1BackupItemAction(
return r
}
// getBackupItemAction returns the backup item action for this restartableAdaptedV1BackupItemAction. It does *not* restart the
// plugin process.
// getBackupItemAction returns the backup item action for this restartableAdaptedV1BackupItemAction.
// It does *not* restart the plugin process.
func (r *restartableAdaptedV1BackupItemAction) getBackupItemAction() (backupitemactionv1.BackupItemAction, error) {
plugin, err := r.sharedPluginProcess.getByKindAndName(r.key)
if err != nil {
@@ -60,7 +60,8 @@ func (r *restartableAdaptedV1BackupItemAction) getBackupItemAction() (backupitem
return backupItemAction, nil
}
// getDelegate restarts the plugin process (if needed) and returns the backup item action for this restartableAdaptedV1BackupItemAction.
// getDelegate restarts the plugin process (if needed) and returns the backup item
// action for this restartableAdaptedV1BackupItemAction.
func (r *restartableAdaptedV1BackupItemAction) getDelegate() (backupitemactionv1.BackupItemAction, error) {
if err := r.sharedPluginProcess.resetIfNeeded(); err != nil {
return nil, err
@@ -80,7 +81,8 @@ func (r *restartableAdaptedV1BackupItemAction) AppliesTo() (velero.ResourceSelec
}
// Execute restarts the plugin's process if needed, then delegates the call.
func (r *restartableAdaptedV1BackupItemAction) Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, error) {
func (r *restartableAdaptedV1BackupItemAction) Execute(
item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, error) {
delegate, err := r.getDelegate()
if err != nil {
return nil, nil, err
@@ -91,7 +93,10 @@ func (r *restartableAdaptedV1BackupItemAction) Execute(item runtime.Unstructured
// Version 2: simply discard ctx and call version 1 function.
// ExecuteV2 restarts the plugin's process if needed, then delegates the call.
func (r *restartableAdaptedV1BackupItemAction) ExecuteV2(ctx context.Context, item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, error) {
func (r *restartableAdaptedV1BackupItemAction) ExecuteV2(
ctx context.Context, item runtime.Unstructured, backup *api.Backup) (
runtime.Unstructured, []velero.ResourceIdentifier, error) {
delegate, err := r.getDelegate()
if err != nil {
return nil, nil, err

View File

@@ -58,8 +58,8 @@ func (r *restartableAdaptedV1ObjectStore) reinitialize(dispensed interface{}) er
return r.init(objectStore, r.config)
}
// getObjectStore returns the object store for this restartableObjectStore. It does *not* restart the
// plugin process.
// getObjectStore returns the object store for this restartableObjectStore.
// It does *not* restart the plugin process.
func (r *restartableAdaptedV1ObjectStore) getObjectStore() (objectstorev1.ObjectStore, error) {
plugin, err := r.sharedPluginProcess.getByKindAndName(r.key)
if err != nil {
@@ -139,7 +139,8 @@ func (r *restartableAdaptedV1ObjectStore) GetObject(bucket string, key string) (
}
// ListCommonPrefixes restarts the plugin's process if needed, then delegates the call.
func (r *restartableAdaptedV1ObjectStore) ListCommonPrefixes(bucket string, prefix string, delimiter string) ([]string, error) {
func (r *restartableAdaptedV1ObjectStore) ListCommonPrefixes(
bucket string, prefix string, delimiter string) ([]string, error) {
delegate, err := r.getDelegate()
if err != nil {
return nil, err
@@ -166,7 +167,8 @@ func (r *restartableAdaptedV1ObjectStore) DeleteObject(bucket string, key string
}
// CreateSignedURL restarts the plugin's process if needed, then delegates the call.
func (r *restartableAdaptedV1ObjectStore) CreateSignedURL(bucket string, key string, ttl time.Duration) (string, error) {
func (r *restartableAdaptedV1ObjectStore) CreateSignedURL(
bucket string, key string, ttl time.Duration) (string, error) {
delegate, err := r.getDelegate()
if err != nil {
return "", err
@@ -176,7 +178,8 @@ func (r *restartableAdaptedV1ObjectStore) CreateSignedURL(bucket string, key str
// Version 2. Simply discard ctx.
// PutObjectV2 restarts the plugin's process if needed, then delegates the call.
func (r *restartableAdaptedV1ObjectStore) PutObjectV2(ctx context.Context, bucket string, key string, body io.Reader) error {
func (r *restartableAdaptedV1ObjectStore) PutObjectV2(
ctx context.Context, bucket string, key string, body io.Reader) error {
delegate, err := r.getDelegate()
if err != nil {
return err
@@ -194,7 +197,8 @@ func (r *restartableAdaptedV1ObjectStore) ObjectExistsV2(ctx context.Context, bu
}
// GetObjectV2 restarts the plugin's process if needed, then delegates the call.
func (r *restartableAdaptedV1ObjectStore) GetObjectV2(ctx context.Context, bucket string, key string) (io.ReadCloser, error) {
func (r *restartableAdaptedV1ObjectStore) GetObjectV2(
ctx context.Context, bucket string, key string) (io.ReadCloser, error) {
delegate, err := r.getDelegate()
if err != nil {
return nil, err
@@ -213,7 +217,8 @@ func (r *restartableAdaptedV1ObjectStore) ListCommonPrefixesV2(
}
// ListObjectsV2 restarts the plugin's process if needed, then delegates the call.
func (r *restartableAdaptedV1ObjectStore) ListObjectsV2(ctx context.Context, bucket string, prefix string) ([]string, error) {
func (r *restartableAdaptedV1ObjectStore) ListObjectsV2(
ctx context.Context, bucket string, prefix string) ([]string, error) {
delegate, err := r.getDelegate()
if err != nil {
return nil, err
@@ -231,7 +236,8 @@ func (r *restartableAdaptedV1ObjectStore) DeleteObjectV2(ctx context.Context, bu
}
// CreateSignedURLV2 restarts the plugin's process if needed, then delegates the call.
func (r *restartableAdaptedV1ObjectStore) CreateSignedURLV2(ctx context.Context, bucket string, key string, ttl time.Duration) (string, error) {
func (r *restartableAdaptedV1ObjectStore) CreateSignedURLV2(
ctx context.Context, bucket string, key string, ttl time.Duration) (string, error) {
delegate, err := r.getDelegate()
if err != nil {
return "", err

View File

@@ -43,8 +43,8 @@ func newAdaptedV1RestoreItemAction(
return r
}
// getRestoreItemAction returns the restore item action for this restartableRestoreItemAction. It does *not* restart the
// plugin process.
// getRestoreItemAction returns the restore item action for this restartableRestoreItemAction.
// It does *not* restart the plugin process.
func (r *restartableAdaptedV1RestoreItemAction) getRestoreItemAction() (restoreitemactionv1.RestoreItemAction, error) {
plugin, err := r.sharedPluginProcess.getByKindAndName(r.key)
if err != nil {

View File

@@ -76,48 +76,7 @@ func (c *BackupItemActionGRPCClient) AppliesTo() (velero.ResourceSelector, error
}
func (c *BackupItemActionGRPCClient) Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, error) {
itemJSON, err := json.Marshal(item.UnstructuredContent())
if err != nil {
return nil, nil, errors.WithStack(err)
}
backupJSON, err := json.Marshal(backup)
if err != nil {
return nil, nil, errors.WithStack(err)
}
req := &proto.ExecuteRequest{
Plugin: c.plugin,
Item: itemJSON,
Backup: backupJSON,
}
res, err := c.grpcClient.Execute(context.Background(), req)
if err != nil {
return nil, nil, fromGRPCError(err)
}
var updatedItem unstructured.Unstructured
if err := json.Unmarshal(res.Item, &updatedItem); err != nil {
return nil, nil, errors.WithStack(err)
}
var additionalItems []velero.ResourceIdentifier
for _, itm := range res.AdditionalItems {
newItem := velero.ResourceIdentifier{
GroupResource: schema.GroupResource{
Group: itm.Group,
Resource: itm.Resource,
},
Namespace: itm.Namespace,
Name: itm.Name,
}
additionalItems = append(additionalItems, newItem)
}
return &updatedItem, additionalItems, nil
return c.ExecuteV2(context.Background(), item, backup)
}
func (c *BackupItemActionGRPCClient) ExecuteV2(

View File

@@ -71,28 +71,7 @@ func (c *DeleteItemActionGRPCClient) AppliesTo() (velero.ResourceSelector, error
}
func (c *DeleteItemActionGRPCClient) Execute(input *velero.DeleteItemActionExecuteInput) error {
itemJSON, err := json.Marshal(input.Item.UnstructuredContent())
if err != nil {
return errors.WithStack(err)
}
backupJSON, err := json.Marshal(input.Backup)
if err != nil {
return errors.WithStack(err)
}
req := &proto.DeleteItemActionExecuteRequest{
Plugin: c.plugin,
Item: itemJSON,
Backup: backupJSON,
}
// First return item is just an empty struct no matter what.
if _, err = c.grpcClient.Execute(context.Background(), req); err != nil {
return fromGRPCError(err)
}
return nil
return c.ExecuteV2(context.Background(), input)
}
func (c *DeleteItemActionGRPCClient) ExecuteV2(ctx context.Context, input *velero.DeleteItemActionExecuteInput) error {

View File

@@ -69,157 +69,9 @@ func (c *ObjectStoreGRPCClient) Init(config map[string]string) error {
// PutObject creates a new object using the data in body within the specified
// object storage bucket with the given key.
func (c *ObjectStoreGRPCClient) PutObject(bucket, key string, body io.Reader) error {
stream, err := c.grpcClient.PutObject(context.Background())
if err != nil {
return fromGRPCError(err)
}
// read from the provider io.Reader into chunks, and send each one over
// the gRPC stream
chunk := make([]byte, byteChunkSize)
for {
n, err := body.Read(chunk)
if err == io.EOF {
if _, resErr := stream.CloseAndRecv(); resErr != nil {
return fromGRPCError(resErr)
}
return nil
}
if err != nil {
stream.CloseSend()
return errors.WithStack(err)
}
if err := stream.Send(&proto.PutObjectRequest{Plugin: c.plugin, Bucket: bucket, Key: key, Body: chunk[0:n]}); err != nil {
return fromGRPCError(err)
}
}
return c.PutObjectV2(context.Background(), bucket, key, body)
}
// ObjectExists checks if there is an object with the given key in the object storage bucket.
func (c *ObjectStoreGRPCClient) ObjectExists(bucket, key string) (bool, error) {
req := &proto.ObjectExistsRequest{
Plugin: c.plugin,
Bucket: bucket,
Key: key,
}
res, err := c.grpcClient.ObjectExists(context.Background(), req)
if err != nil {
return false, err
}
return res.Exists, nil
}
// GetObject retrieves the object with the given key from the specified
// bucket in object storage.
func (c *ObjectStoreGRPCClient) GetObject(bucket, key string) (io.ReadCloser, error) {
req := &proto.GetObjectRequest{
Plugin: c.plugin,
Bucket: bucket,
Key: key,
}
stream, err := c.grpcClient.GetObject(context.Background(), req)
if err != nil {
return nil, fromGRPCError(err)
}
receive := func() ([]byte, error) {
data, err := stream.Recv()
if err == io.EOF {
// we need to return io.EOF errors unwrapped so that
// calling code sees them as io.EOF and knows to stop
// reading.
return nil, err
}
if err != nil {
return nil, fromGRPCError(err)
}
return data.Data, nil
}
close := func() error {
if err := stream.CloseSend(); err != nil {
return fromGRPCError(err)
}
return nil
}
return &StreamReadCloser{receive: receive, close: close}, nil
}
// ListCommonPrefixes gets a list of all object key prefixes that come
// after the provided prefix and before the provided delimiter (this is
// often used to simulate a directory hierarchy in object storage).
func (c *ObjectStoreGRPCClient) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
req := &proto.ListCommonPrefixesRequest{
Plugin: c.plugin,
Bucket: bucket,
Prefix: prefix,
Delimiter: delimiter,
}
res, err := c.grpcClient.ListCommonPrefixes(context.Background(), req)
if err != nil {
return nil, fromGRPCError(err)
}
return res.Prefixes, nil
}
// ListObjects gets a list of all objects in bucket that have the same prefix.
func (c *ObjectStoreGRPCClient) ListObjects(bucket, prefix string) ([]string, error) {
req := &proto.ListObjectsRequest{
Plugin: c.plugin,
Bucket: bucket,
Prefix: prefix,
}
res, err := c.grpcClient.ListObjects(context.Background(), req)
if err != nil {
return nil, fromGRPCError(err)
}
return res.Keys, nil
}
// DeleteObject removes object with the specified key from the given
// bucket.
func (c *ObjectStoreGRPCClient) DeleteObject(bucket, key string) error {
req := &proto.DeleteObjectRequest{
Plugin: c.plugin,
Bucket: bucket,
Key: key,
}
if _, err := c.grpcClient.DeleteObject(context.Background(), req); err != nil {
return fromGRPCError(err)
}
return nil
}
// CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl.
func (c *ObjectStoreGRPCClient) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) {
req := &proto.CreateSignedURLRequest{
Plugin: c.plugin,
Bucket: bucket,
Key: key,
Ttl: int64(ttl),
}
res, err := c.grpcClient.CreateSignedURL(context.Background(), req)
if err != nil {
return "", fromGRPCError(err)
}
return res.Url, nil
}
// Version 2
// PutObjectV2 creates a new object using the data in body within the specified
// object storage bucket with the given key.
func (c *ObjectStoreGRPCClient) PutObjectV2(ctx context.Context, bucket, key string, body io.Reader) error {
@@ -250,6 +102,11 @@ func (c *ObjectStoreGRPCClient) PutObjectV2(ctx context.Context, bucket, key str
}
}
// ObjectExists checks if there is an object with the given key in the object storage bucket.
func (c *ObjectStoreGRPCClient) ObjectExists(bucket, key string) (bool, error) {
return c.ObjectExistsV2(context.Background(), bucket, key)
}
// ObjectExistsV2 checks if there is an object with the given key in the object storage bucket.
func (c *ObjectStoreGRPCClient) ObjectExistsV2(ctx context.Context, bucket, key string) (bool, error) {
req := &proto.ObjectExistsRequest{
@@ -266,6 +123,12 @@ func (c *ObjectStoreGRPCClient) ObjectExistsV2(ctx context.Context, bucket, key
return res.Exists, nil
}
// GetObject retrieves the object with the given key from the specified
// bucket in object storage.
func (c *ObjectStoreGRPCClient) GetObject(bucket, key string) (io.ReadCloser, error) {
return c.GetObjectV2(context.Background(), bucket, key)
}
// GetObjectV2 retrieves the object with the given key from the specified
// bucket in object storage.
func (c *ObjectStoreGRPCClient) GetObjectV2(ctx context.Context, bucket, key string) (io.ReadCloser, error) {
@@ -305,6 +168,13 @@ func (c *ObjectStoreGRPCClient) GetObjectV2(ctx context.Context, bucket, key str
return &StreamReadCloser{receive: receive, close: close}, nil
}
// ListCommonPrefixes gets a list of all object key prefixes that come
// after the provided prefix and before the provided delimiter (this is
// often used to simulate a directory hierarchy in object storage).
func (c *ObjectStoreGRPCClient) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
return c.ListCommonPrefixesV2(context.Background(), bucket, prefix, delimiter)
}
// ListCommonPrefixesV2 gets a list of all object key prefixes that come
// after the provided prefix and before the provided delimiter (this is
// often used to simulate a directory hierarchy in object storage).
@@ -325,6 +195,11 @@ func (c *ObjectStoreGRPCClient) ListCommonPrefixesV2(
return res.Prefixes, nil
}
// ListObjects gets a list of all objects in bucket that have the same prefix.
func (c *ObjectStoreGRPCClient) ListObjects(bucket, prefix string) ([]string, error) {
return c.ListObjectsV2(context.Background(), bucket, prefix)
}
// ListObjectsV2 gets a list of all objects in bucket that have the same prefix.
func (c *ObjectStoreGRPCClient) ListObjectsV2(
ctx context.Context, bucket, prefix string) ([]string, error) {
@@ -342,8 +217,13 @@ func (c *ObjectStoreGRPCClient) ListObjectsV2(
return res.Keys, nil
}
// DeleteObjectV2 removes object with the specified key from the given
// DeleteObject removes object with the specified key from the given
// bucket.
func (c *ObjectStoreGRPCClient) DeleteObject(bucket, key string) error {
return c.DeleteObjectV2(context.Background(), bucket, key)
}
// DeleteObjectV2 removes object with the specified key from the given bucket.
func (c *ObjectStoreGRPCClient) DeleteObjectV2(
ctx context.Context, bucket, key string) error {
req := &proto.DeleteObjectRequest{
@@ -359,6 +239,11 @@ func (c *ObjectStoreGRPCClient) DeleteObjectV2(
return nil
}
// CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl.
func (c *ObjectStoreGRPCClient) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) {
return c.CreateSignedURLV2(context.Background(), bucket, key, ttl)
}
// CreateSignedURLV2 creates a pre-signed URL for the given bucket and key that expires after ttl.
func (c *ObjectStoreGRPCClient) CreateSignedURLV2(
ctx context.Context, bucket, key string, ttl time.Duration) (string, error) {

View File

@@ -149,6 +149,11 @@ func NewServer() Server {
objectStore: NewObjectStorePlugin(serverLogger(log)),
restoreItemAction: NewRestoreItemActionPlugin(serverLogger(log)),
deleteItemAction: NewDeleteItemActionPlugin(serverLogger(log)),
backupItemActionV2: NewBackupItemActionPlugin(serverLogger(log)),
volumeSnapshotterV2: NewVolumeSnapshotterPlugin(serverLogger(log)),
objectStoreV2: NewObjectStorePlugin(serverLogger(log)),
restoreItemActionV2: NewRestoreItemActionPlugin(serverLogger(log)),
deleteItemActionV2: NewDeleteItemActionPlugin(serverLogger(log)),
}
}
@@ -330,8 +335,6 @@ func (s *server) Serve() {
string(PluginKindRestoreItemAction): s.restoreItemAction,
string(PluginKindDeleteItemAction): s.deleteItemAction,
// Version 2
// TODO: check to see if need pluginLister for V2
// string(PluginKindPluginLister): NewPluginListerPlugin(pluginLister),
string(PluginKindBackupItemActionV2): s.backupItemActionV2,
string(PluginKindVolumeSnapshotterV2): s.volumeSnapshotterV2,
string(PluginKindObjectStoreV2): s.objectStoreV2,