From 7dfe58d37fccac86deab4ec72c7ec7ce2fdebec6 Mon Sep 17 00:00:00 2001 From: Carlisia Date: Thu, 14 Mar 2019 18:25:52 -0700 Subject: [PATCH] Split plugin framework into its own package Signed-off-by: Carlisia --- pkg/cmd/server/plugin/plugin.go | 2 +- pkg/plugin/client_builder.go | 14 +- pkg/plugin/client_builder_test.go | 13 +- pkg/plugin/framework/backup_item_action.go | 43 ++++ .../framework/backup_item_action_client.go | 112 ++++++++ .../backup_item_action_server.go} | 116 +-------- .../backup_item_action_test.go | 2 +- pkg/plugin/framework/block_store.go | 43 ++++ pkg/plugin/framework/block_store_client.go | 168 ++++++++++++ .../block_store_server.go} | 172 +------------ .../{ => framework}/client_dispenser.go | 11 +- .../{ => framework}/client_dispenser_test.go | 8 +- pkg/plugin/framework/doc.go | 3 + pkg/plugin/framework/examples_test.go | 93 +++++++ pkg/plugin/{ => framework}/handle_panic.go | 2 +- pkg/plugin/{ => framework}/interface.go | 4 +- pkg/plugin/{ => framework}/logger.go | 2 +- pkg/plugin/{ => framework}/logger_test.go | 2 +- pkg/plugin/framework/object_store.go | 44 ++++ pkg/plugin/framework/object_store_client.go | 163 ++++++++++++ .../object_store_server.go} | 168 +----------- pkg/plugin/{ => framework}/plugin_base.go | 16 +- .../{ => framework}/plugin_base_test.go | 6 +- pkg/plugin/{ => framework}/plugin_kinds.go | 5 +- .../{ => framework}/plugin_kinds_test.go | 4 +- pkg/plugin/{ => framework}/plugin_lister.go | 7 +- pkg/plugin/framework/restore_item_action.go | 43 ++++ .../framework/restore_item_action_client.go | 111 ++++++++ .../framework/restore_item_action_server.go | 130 ++++++++++ pkg/plugin/{ => framework}/server.go | 2 +- pkg/plugin/{ => framework}/server_mux.go | 5 +- pkg/plugin/{ => framework}/stream_reader.go | 4 +- .../{ => framework}/stream_reader_test.go | 4 +- pkg/plugin/manager.go | 17 +- pkg/plugin/manager_test.go | 47 ++-- pkg/plugin/process.go | 6 +- pkg/plugin/process_test.go | 14 +- pkg/plugin/registry.go | 39 +-- pkg/plugin/restartable_backup_item_action.go | 3 +- .../restartable_backup_item_action_test.go | 7 +- pkg/plugin/restartable_block_store.go | 3 +- pkg/plugin/restartable_block_store_test.go | 11 +- pkg/plugin/restartable_delegate_test.go | 4 +- pkg/plugin/restartable_object_store.go | 3 +- pkg/plugin/restartable_object_store_test.go | 11 +- pkg/plugin/restartable_restore_item_action.go | 3 +- .../restartable_restore_item_action_test.go | 7 +- pkg/plugin/restore_item_action.go | 241 ------------------ 48 files changed, 1114 insertions(+), 824 deletions(-) create mode 100644 pkg/plugin/framework/backup_item_action.go create mode 100644 pkg/plugin/framework/backup_item_action_client.go rename pkg/plugin/{backup_item_action.go => framework/backup_item_action_server.go} (50%) rename pkg/plugin/{ => framework}/backup_item_action_test.go (99%) create mode 100644 pkg/plugin/framework/block_store.go create mode 100644 pkg/plugin/framework/block_store_client.go rename pkg/plugin/{block_store.go => framework/block_store_server.go} (54%) rename pkg/plugin/{ => framework}/client_dispenser.go (90%) rename pkg/plugin/{ => framework}/client_dispenser_test.go (93%) create mode 100644 pkg/plugin/framework/doc.go create mode 100644 pkg/plugin/framework/examples_test.go rename pkg/plugin/{ => framework}/handle_panic.go (97%) rename pkg/plugin/{ => framework}/interface.go (92%) rename pkg/plugin/{ => framework}/logger.go (99%) rename pkg/plugin/{ => framework}/logger_test.go (98%) create mode 100644 pkg/plugin/framework/object_store.go create mode 100644 pkg/plugin/framework/object_store_client.go rename pkg/plugin/{object_store.go => framework/object_store_server.go} (54%) rename pkg/plugin/{ => framework}/plugin_base.go (72%) rename pkg/plugin/{ => framework}/plugin_base_test.go (91%) rename pkg/plugin/{ => framework}/plugin_kinds.go (96%) rename pkg/plugin/{ => framework}/plugin_kinds_test.go (93%) rename pkg/plugin/{ => framework}/plugin_lister.go (97%) create mode 100644 pkg/plugin/framework/restore_item_action.go create mode 100644 pkg/plugin/framework/restore_item_action_client.go create mode 100644 pkg/plugin/framework/restore_item_action_server.go rename pkg/plugin/{ => framework}/server.go (99%) rename pkg/plugin/{ => framework}/server_mux.go (97%) rename pkg/plugin/{ => framework}/stream_reader.go (96%) rename pkg/plugin/{ => framework}/stream_reader_test.go (95%) delete mode 100644 pkg/plugin/restore_item_action.go diff --git a/pkg/cmd/server/plugin/plugin.go b/pkg/cmd/server/plugin/plugin.go index 0bf2e0c1b..87c999517 100644 --- a/pkg/cmd/server/plugin/plugin.go +++ b/pkg/cmd/server/plugin/plugin.go @@ -26,7 +26,7 @@ import ( "github.com/heptio/velero/pkg/cloudprovider/azure" "github.com/heptio/velero/pkg/cloudprovider/gcp" velerodiscovery "github.com/heptio/velero/pkg/discovery" - veleroplugin "github.com/heptio/velero/pkg/plugin" + veleroplugin "github.com/heptio/velero/pkg/plugin/framework" "github.com/heptio/velero/pkg/restore" ) diff --git a/pkg/plugin/client_builder.go b/pkg/plugin/client_builder.go index 9a9b87f58..0c64dda0b 100644 --- a/pkg/plugin/client_builder.go +++ b/pkg/plugin/client_builder.go @@ -22,6 +22,8 @@ import ( "github.com/hashicorp/go-hclog" hcplugin "github.com/hashicorp/go-plugin" "github.com/sirupsen/logrus" + + "github.com/heptio/velero/pkg/plugin/framework" ) // clientBuilder builds go-plugin Clients. @@ -56,14 +58,14 @@ func newLogrusAdapter(pluginLogger logrus.FieldLogger, logLevel logrus.Level) *l func (b *clientBuilder) clientConfig() *hcplugin.ClientConfig { return &hcplugin.ClientConfig{ - HandshakeConfig: Handshake, + HandshakeConfig: framework.Handshake, AllowedProtocols: []hcplugin.Protocol{hcplugin.ProtocolGRPC}, Plugins: map[string]hcplugin.Plugin{ - string(PluginKindBackupItemAction): NewBackupItemActionPlugin(clientLogger(b.clientLogger)), - string(PluginKindBlockStore): NewBlockStorePlugin(clientLogger(b.clientLogger)), - string(PluginKindObjectStore): NewObjectStorePlugin(clientLogger(b.clientLogger)), - string(PluginKindPluginLister): &PluginListerPlugin{}, - string(PluginKindRestoreItemAction): NewRestoreItemActionPlugin(clientLogger(b.clientLogger)), + string(framework.PluginKindBackupItemAction): framework.NewBackupItemActionPlugin(framework.ClientLogger(b.clientLogger)), + string(framework.PluginKindBlockStore): framework.NewBlockStorePlugin(framework.ClientLogger(b.clientLogger)), + string(framework.PluginKindObjectStore): framework.NewObjectStorePlugin(framework.ClientLogger(b.clientLogger)), + string(framework.PluginKindPluginLister): &framework.PluginListerPlugin{}, + string(framework.PluginKindRestoreItemAction): framework.NewRestoreItemActionPlugin(framework.ClientLogger(b.clientLogger)), }, Logger: b.pluginLogger, Cmd: exec.Command(b.commandName, b.commandArgs...), diff --git a/pkg/plugin/client_builder_test.go b/pkg/plugin/client_builder_test.go index 8929297b2..c9c4cf91a 100644 --- a/pkg/plugin/client_builder_test.go +++ b/pkg/plugin/client_builder_test.go @@ -24,6 +24,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/heptio/velero/pkg/plugin/framework" "github.com/heptio/velero/pkg/util/test" ) @@ -47,14 +48,14 @@ func TestClientConfig(t *testing.T) { cb := newClientBuilder("velero", logger, logLevel) expected := &hcplugin.ClientConfig{ - HandshakeConfig: Handshake, + HandshakeConfig: framework.Handshake, AllowedProtocols: []hcplugin.Protocol{hcplugin.ProtocolGRPC}, Plugins: map[string]hcplugin.Plugin{ - string(PluginKindBackupItemAction): NewBackupItemActionPlugin(clientLogger(logger)), - string(PluginKindBlockStore): NewBlockStorePlugin(clientLogger(logger)), - string(PluginKindObjectStore): NewObjectStorePlugin(clientLogger(logger)), - string(PluginKindPluginLister): &PluginListerPlugin{}, - string(PluginKindRestoreItemAction): NewRestoreItemActionPlugin(clientLogger(logger)), + string(framework.PluginKindBackupItemAction): framework.NewBackupItemActionPlugin(framework.ClientLogger(logger)), + string(framework.PluginKindBlockStore): framework.NewBlockStorePlugin(framework.ClientLogger(logger)), + string(framework.PluginKindObjectStore): framework.NewObjectStorePlugin(framework.ClientLogger(logger)), + string(framework.PluginKindPluginLister): &framework.PluginListerPlugin{}, + string(framework.PluginKindRestoreItemAction): framework.NewRestoreItemActionPlugin(framework.ClientLogger(logger)), }, Logger: cb.pluginLogger, Cmd: exec.Command(cb.commandName, cb.commandArgs...), diff --git a/pkg/plugin/framework/backup_item_action.go b/pkg/plugin/framework/backup_item_action.go new file mode 100644 index 000000000..e1e6e0766 --- /dev/null +++ b/pkg/plugin/framework/backup_item_action.go @@ -0,0 +1,43 @@ +/* +Copyright 2019 the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" + + proto "github.com/heptio/velero/pkg/plugin/generated" +) + +// BackupItemActionPlugin is an implementation of go-plugin's Plugin +// interface with support for gRPC for the backup/ItemAction +// interface. +type BackupItemActionPlugin struct { + plugin.NetRPCUnsupportedPlugin + *pluginBase +} + +// GRPCClient returns a clientDispenser for BackupItemAction gRPC clients. +func (p *BackupItemActionPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { + return newClientDispenser(p.clientLogger, c, newBackupItemActionGRPCClient), nil +} + +// GRPCServer registers a BackupItemAction gRPC server. +func (p *BackupItemActionPlugin) GRPCServer(s *grpc.Server) error { + proto.RegisterBackupItemActionServer(s, &BackupItemActionGRPCServer{mux: p.serverMux}) + return nil +} diff --git a/pkg/plugin/framework/backup_item_action_client.go b/pkg/plugin/framework/backup_item_action_client.go new file mode 100644 index 000000000..d01629cb7 --- /dev/null +++ b/pkg/plugin/framework/backup_item_action_client.go @@ -0,0 +1,112 @@ +/* +Copyright 2017, 2019 the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "encoding/json" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + api "github.com/heptio/velero/pkg/apis/velero/v1" + proto "github.com/heptio/velero/pkg/plugin/generated" + "github.com/heptio/velero/pkg/plugin/velero" +) + +// NewBackupItemActionPlugin constructs a BackupItemActionPlugin. +func NewBackupItemActionPlugin(options ...PluginOption) *BackupItemActionPlugin { + return &BackupItemActionPlugin{ + pluginBase: newPluginBase(options...), + } +} + +// BackupItemActionGRPCClient implements the backup/ItemAction interface and uses a +// gRPC client to make calls to the plugin server. +type BackupItemActionGRPCClient struct { + *clientBase + grpcClient proto.BackupItemActionClient +} + +func newBackupItemActionGRPCClient(base *clientBase, clientConn *grpc.ClientConn) interface{} { + return &BackupItemActionGRPCClient{ + clientBase: base, + grpcClient: proto.NewBackupItemActionClient(clientConn), + } +} + +func (c *BackupItemActionGRPCClient) AppliesTo() (velero.ResourceSelector, error) { + res, err := c.grpcClient.AppliesTo(context.Background(), &proto.AppliesToRequest{Plugin: c.plugin}) + if err != nil { + return velero.ResourceSelector{}, err + } + + return velero.ResourceSelector{ + IncludedNamespaces: res.IncludedNamespaces, + ExcludedNamespaces: res.ExcludedNamespaces, + IncludedResources: res.IncludedResources, + ExcludedResources: res.ExcludedResources, + LabelSelector: res.Selector, + }, nil +} + +func (c *BackupItemActionGRPCClient) Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, error) { + itemJSON, err := json.Marshal(item.UnstructuredContent()) + if err != nil { + return nil, nil, err + } + + backupJSON, err := json.Marshal(backup) + if err != nil { + return nil, nil, err + } + + req := &proto.ExecuteRequest{ + Plugin: c.plugin, + Item: itemJSON, + Backup: backupJSON, + } + + res, err := c.grpcClient.Execute(context.Background(), req) + if err != nil { + return nil, nil, err + } + + var updatedItem unstructured.Unstructured + if err := json.Unmarshal(res.Item, &updatedItem); err != nil { + return nil, nil, err + } + + var additionalItems []velero.ResourceIdentifier + + for _, itm := range res.AdditionalItems { + newItem := velero.ResourceIdentifier{ + GroupResource: schema.GroupResource{ + Group: itm.Group, + Resource: itm.Resource, + }, + Namespace: itm.Namespace, + Name: itm.Name, + } + + additionalItems = append(additionalItems, newItem) + } + + return &updatedItem, additionalItems, nil +} diff --git a/pkg/plugin/backup_item_action.go b/pkg/plugin/framework/backup_item_action_server.go similarity index 50% rename from pkg/plugin/backup_item_action.go rename to pkg/plugin/framework/backup_item_action_server.go index 47b53b6af..a21769335 100644 --- a/pkg/plugin/backup_item_action.go +++ b/pkg/plugin/framework/backup_item_action_server.go @@ -14,133 +14,21 @@ See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "encoding/json" - plugin "github.com/hashicorp/go-plugin" "github.com/pkg/errors" "golang.org/x/net/context" - "google.golang.org/grpc" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" api "github.com/heptio/velero/pkg/apis/velero/v1" proto "github.com/heptio/velero/pkg/plugin/generated" "github.com/heptio/velero/pkg/plugin/velero" ) -// BackupItemActionPlugin is an implementation of go-plugin's Plugin -// interface with support for gRPC for the backup/ItemAction -// interface. -type BackupItemActionPlugin struct { - plugin.NetRPCUnsupportedPlugin - *pluginBase -} - -// NewBackupItemActionPlugin constructs a BackupItemActionPlugin. -func NewBackupItemActionPlugin(options ...pluginOption) *BackupItemActionPlugin { - return &BackupItemActionPlugin{ - pluginBase: newPluginBase(options...), - } -} - -////////////////////////////////////////////////////////////////////////////// -// client code -////////////////////////////////////////////////////////////////////////////// - -// GRPCClient returns a clientDispenser for BackupItemAction gRPC clients. -func (p *BackupItemActionPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { - return newClientDispenser(p.clientLogger, c, newBackupItemActionGRPCClient), nil -} - -// BackupItemActionGRPCClient implements the backup/ItemAction interface and uses a -// gRPC client to make calls to the plugin server. -type BackupItemActionGRPCClient struct { - *clientBase - grpcClient proto.BackupItemActionClient -} - -func newBackupItemActionGRPCClient(base *clientBase, clientConn *grpc.ClientConn) interface{} { - return &BackupItemActionGRPCClient{ - clientBase: base, - grpcClient: proto.NewBackupItemActionClient(clientConn), - } -} - -func (c *BackupItemActionGRPCClient) AppliesTo() (velero.ResourceSelector, error) { - res, err := c.grpcClient.AppliesTo(context.Background(), &proto.AppliesToRequest{Plugin: c.plugin}) - if err != nil { - return velero.ResourceSelector{}, err - } - - return velero.ResourceSelector{ - IncludedNamespaces: res.IncludedNamespaces, - ExcludedNamespaces: res.ExcludedNamespaces, - IncludedResources: res.IncludedResources, - ExcludedResources: res.ExcludedResources, - LabelSelector: res.Selector, - }, nil -} - -func (c *BackupItemActionGRPCClient) Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, error) { - itemJSON, err := json.Marshal(item.UnstructuredContent()) - if err != nil { - return nil, nil, err - } - - backupJSON, err := json.Marshal(backup) - if err != nil { - return nil, nil, err - } - - req := &proto.ExecuteRequest{ - Plugin: c.plugin, - Item: itemJSON, - Backup: backupJSON, - } - - res, err := c.grpcClient.Execute(context.Background(), req) - if err != nil { - return nil, nil, err - } - - var updatedItem unstructured.Unstructured - if err := json.Unmarshal(res.Item, &updatedItem); err != nil { - return nil, nil, err - } - - var additionalItems []velero.ResourceIdentifier - - for _, itm := range res.AdditionalItems { - newItem := velero.ResourceIdentifier{ - GroupResource: schema.GroupResource{ - Group: itm.Group, - Resource: itm.Resource, - }, - Namespace: itm.Namespace, - Name: itm.Name, - } - - additionalItems = append(additionalItems, newItem) - } - - return &updatedItem, additionalItems, nil -} - -////////////////////////////////////////////////////////////////////////////// -// server code -////////////////////////////////////////////////////////////////////////////// - -// GRPCServer registers a BackupItemAction gRPC server. -func (p *BackupItemActionPlugin) GRPCServer(s *grpc.Server) error { - proto.RegisterBackupItemActionServer(s, &BackupItemActionGRPCServer{mux: p.serverMux}) - return nil -} - -// BackupItemActionGRPCServer implements the proto-generated BackupItemActionServer interface, and accepts +// BackupItemActionGRPCServer implements the proto-generated BackupItemAction interface, and accepts // gRPC calls and forwards them to an implementation of the pluggable interface. type BackupItemActionGRPCServer struct { mux *serverMux diff --git a/pkg/plugin/backup_item_action_test.go b/pkg/plugin/framework/backup_item_action_test.go similarity index 99% rename from pkg/plugin/backup_item_action_test.go rename to pkg/plugin/framework/backup_item_action_test.go index 1e842dccd..4bdd92e20 100644 --- a/pkg/plugin/backup_item_action_test.go +++ b/pkg/plugin/framework/backup_item_action_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "encoding/json" diff --git a/pkg/plugin/framework/block_store.go b/pkg/plugin/framework/block_store.go new file mode 100644 index 000000000..1e5387734 --- /dev/null +++ b/pkg/plugin/framework/block_store.go @@ -0,0 +1,43 @@ +/* +Copyright 2019 the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" + + proto "github.com/heptio/velero/pkg/plugin/generated" +) + +// BlockStorePlugin is an implementation of go-plugin's Plugin +// interface with support for gRPC for the cloudprovider/BlockStore +// interface. +type BlockStorePlugin struct { + plugin.NetRPCUnsupportedPlugin + *pluginBase +} + +// GRPCClient returns a BlockStore gRPC client. +func (p *BlockStorePlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { + return newClientDispenser(p.clientLogger, c, newBlockStoreGRPCClient), nil +} + +// GRPCServer registers a BlockStore gRPC server. +func (p *BlockStorePlugin) GRPCServer(s *grpc.Server) error { + proto.RegisterBlockStoreServer(s, &BlockStoreGRPCServer{mux: p.serverMux}) + return nil +} diff --git a/pkg/plugin/framework/block_store_client.go b/pkg/plugin/framework/block_store_client.go new file mode 100644 index 000000000..eeda639b3 --- /dev/null +++ b/pkg/plugin/framework/block_store_client.go @@ -0,0 +1,168 @@ +/* +Copyright 2017, 2019 the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "encoding/json" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + + proto "github.com/heptio/velero/pkg/plugin/generated" +) + +// NewBlockStorePlugin constructs a BlockStorePlugin. +func NewBlockStorePlugin(options ...PluginOption) *BlockStorePlugin { + return &BlockStorePlugin{ + pluginBase: newPluginBase(options...), + } +} + +// BlockStoreGRPCClient implements the cloudprovider.BlockStore interface and uses a +// gRPC client to make calls to the plugin server. +type BlockStoreGRPCClient struct { + *clientBase + grpcClient proto.BlockStoreClient +} + +func newBlockStoreGRPCClient(base *clientBase, clientConn *grpc.ClientConn) interface{} { + return &BlockStoreGRPCClient{ + clientBase: base, + grpcClient: proto.NewBlockStoreClient(clientConn), + } +} + +// Init prepares the BlockStore for usage using the provided map of +// configuration key-value pairs. It returns an error if the BlockStore +// cannot be initialized from the provided config. +func (c *BlockStoreGRPCClient) Init(config map[string]string) error { + _, err := c.grpcClient.Init(context.Background(), &proto.InitRequest{Plugin: c.plugin, Config: config}) + + return err +} + +// CreateVolumeFromSnapshot creates a new block volume, initialized from the provided snapshot, +// and with the specified type and IOPS (if using provisioned IOPS). +func (c *BlockStoreGRPCClient) CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ string, iops *int64) (string, error) { + req := &proto.CreateVolumeRequest{ + Plugin: c.plugin, + SnapshotID: snapshotID, + VolumeType: volumeType, + VolumeAZ: volumeAZ, + } + + if iops == nil { + req.Iops = 0 + } else { + req.Iops = *iops + } + + res, err := c.grpcClient.CreateVolumeFromSnapshot(context.Background(), req) + if err != nil { + return "", err + } + + return res.VolumeID, nil +} + +// GetVolumeInfo returns the type and IOPS (if using provisioned IOPS) for a specified block +// volume. +func (c *BlockStoreGRPCClient) GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, error) { + res, err := c.grpcClient.GetVolumeInfo(context.Background(), &proto.GetVolumeInfoRequest{Plugin: c.plugin, VolumeID: volumeID, VolumeAZ: volumeAZ}) + if err != nil { + return "", nil, err + } + + var iops *int64 + if res.Iops != 0 { + iops = &res.Iops + } + + return res.VolumeType, iops, nil +} + +// CreateSnapshot creates a snapshot of the specified block volume, and applies the provided +// set of tags to the snapshot. +func (c *BlockStoreGRPCClient) CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (string, error) { + req := &proto.CreateSnapshotRequest{ + Plugin: c.plugin, + VolumeID: volumeID, + VolumeAZ: volumeAZ, + Tags: tags, + } + + res, err := c.grpcClient.CreateSnapshot(context.Background(), req) + if err != nil { + return "", err + } + + return res.SnapshotID, nil +} + +// DeleteSnapshot deletes the specified volume snapshot. +func (c *BlockStoreGRPCClient) DeleteSnapshot(snapshotID string) error { + _, err := c.grpcClient.DeleteSnapshot(context.Background(), &proto.DeleteSnapshotRequest{Plugin: c.plugin, SnapshotID: snapshotID}) + + return err +} + +func (c *BlockStoreGRPCClient) GetVolumeID(pv runtime.Unstructured) (string, error) { + encodedPV, err := json.Marshal(pv.UnstructuredContent()) + if err != nil { + return "", err + } + + req := &proto.GetVolumeIDRequest{ + Plugin: c.plugin, + PersistentVolume: encodedPV, + } + + resp, err := c.grpcClient.GetVolumeID(context.Background(), req) + if err != nil { + return "", err + } + + return resp.VolumeID, nil +} + +func (c *BlockStoreGRPCClient) SetVolumeID(pv runtime.Unstructured, volumeID string) (runtime.Unstructured, error) { + encodedPV, err := json.Marshal(pv.UnstructuredContent()) + if err != nil { + return nil, err + } + + req := &proto.SetVolumeIDRequest{ + Plugin: c.plugin, + PersistentVolume: encodedPV, + VolumeID: volumeID, + } + + resp, err := c.grpcClient.SetVolumeID(context.Background(), req) + if err != nil { + return nil, err + } + + var updatedPV unstructured.Unstructured + if err := json.Unmarshal(resp.PersistentVolume, &updatedPV); err != nil { + return nil, err + + } + + return &updatedPV, nil +} diff --git a/pkg/plugin/block_store.go b/pkg/plugin/framework/block_store_server.go similarity index 54% rename from pkg/plugin/block_store.go rename to pkg/plugin/framework/block_store_server.go index 3dac19f71..917fbbf97 100644 --- a/pkg/plugin/block_store.go +++ b/pkg/plugin/framework/block_store_server.go @@ -14,189 +14,19 @@ See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "encoding/json" - "github.com/hashicorp/go-plugin" "github.com/pkg/errors" "golang.org/x/net/context" - "google.golang.org/grpc" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" proto "github.com/heptio/velero/pkg/plugin/generated" "github.com/heptio/velero/pkg/plugin/velero" ) -// BlockStorePlugin is an implementation of go-plugin's Plugin -// interface with support for gRPC for the cloudprovider/BlockStore -// interface. -type BlockStorePlugin struct { - plugin.NetRPCUnsupportedPlugin - *pluginBase -} - -// NewBlockStorePlugin constructs a BlockStorePlugin. -func NewBlockStorePlugin(options ...pluginOption) *BlockStorePlugin { - return &BlockStorePlugin{ - pluginBase: newPluginBase(options...), - } -} - -////////////////////////////////////////////////////////////////////////////// -// client code -////////////////////////////////////////////////////////////////////////////// - -// GRPCClient returns a BlockStore gRPC client. -func (p *BlockStorePlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { - return newClientDispenser(p.clientLogger, c, newBlockStoreGRPCClient), nil -} - -// BlockStoreGRPCClient implements the cloudprovider.BlockStore interface and uses a -// gRPC client to make calls to the plugin server. -type BlockStoreGRPCClient struct { - *clientBase - grpcClient proto.BlockStoreClient -} - -func newBlockStoreGRPCClient(base *clientBase, clientConn *grpc.ClientConn) interface{} { - return &BlockStoreGRPCClient{ - clientBase: base, - grpcClient: proto.NewBlockStoreClient(clientConn), - } -} - -// Init prepares the BlockStore for usage using the provided map of -// configuration key-value pairs. It returns an error if the BlockStore -// cannot be initialized from the provided config. -func (c *BlockStoreGRPCClient) Init(config map[string]string) error { - _, err := c.grpcClient.Init(context.Background(), &proto.InitRequest{Plugin: c.plugin, Config: config}) - - return err -} - -// CreateVolumeFromSnapshot creates a new block volume, initialized from the provided snapshot, -// and with the specified type and IOPS (if using provisioned IOPS). -func (c *BlockStoreGRPCClient) CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ string, iops *int64) (string, error) { - req := &proto.CreateVolumeRequest{ - Plugin: c.plugin, - SnapshotID: snapshotID, - VolumeType: volumeType, - VolumeAZ: volumeAZ, - } - - if iops == nil { - req.Iops = 0 - } else { - req.Iops = *iops - } - - res, err := c.grpcClient.CreateVolumeFromSnapshot(context.Background(), req) - if err != nil { - return "", err - } - - return res.VolumeID, nil -} - -// GetVolumeInfo returns the type and IOPS (if using provisioned IOPS) for a specified block -// volume. -func (c *BlockStoreGRPCClient) GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, error) { - res, err := c.grpcClient.GetVolumeInfo(context.Background(), &proto.GetVolumeInfoRequest{Plugin: c.plugin, VolumeID: volumeID, VolumeAZ: volumeAZ}) - if err != nil { - return "", nil, err - } - - var iops *int64 - if res.Iops != 0 { - iops = &res.Iops - } - - return res.VolumeType, iops, nil -} - -// CreateSnapshot creates a snapshot of the specified block volume, and applies the provided -// set of tags to the snapshot. -func (c *BlockStoreGRPCClient) CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (string, error) { - req := &proto.CreateSnapshotRequest{ - Plugin: c.plugin, - VolumeID: volumeID, - VolumeAZ: volumeAZ, - Tags: tags, - } - - res, err := c.grpcClient.CreateSnapshot(context.Background(), req) - if err != nil { - return "", err - } - - return res.SnapshotID, nil -} - -// DeleteSnapshot deletes the specified volume snapshot. -func (c *BlockStoreGRPCClient) DeleteSnapshot(snapshotID string) error { - _, err := c.grpcClient.DeleteSnapshot(context.Background(), &proto.DeleteSnapshotRequest{Plugin: c.plugin, SnapshotID: snapshotID}) - - return err -} - -func (c *BlockStoreGRPCClient) GetVolumeID(pv runtime.Unstructured) (string, error) { - encodedPV, err := json.Marshal(pv.UnstructuredContent()) - if err != nil { - return "", err - } - - req := &proto.GetVolumeIDRequest{ - Plugin: c.plugin, - PersistentVolume: encodedPV, - } - - resp, err := c.grpcClient.GetVolumeID(context.Background(), req) - if err != nil { - return "", err - } - - return resp.VolumeID, nil -} - -func (c *BlockStoreGRPCClient) SetVolumeID(pv runtime.Unstructured, volumeID string) (runtime.Unstructured, error) { - encodedPV, err := json.Marshal(pv.UnstructuredContent()) - if err != nil { - return nil, err - } - - req := &proto.SetVolumeIDRequest{ - Plugin: c.plugin, - PersistentVolume: encodedPV, - VolumeID: volumeID, - } - - resp, err := c.grpcClient.SetVolumeID(context.Background(), req) - if err != nil { - return nil, err - } - - var updatedPV unstructured.Unstructured - if err := json.Unmarshal(resp.PersistentVolume, &updatedPV); err != nil { - return nil, err - - } - - return &updatedPV, nil -} - -////////////////////////////////////////////////////////////////////////////// -// server code -////////////////////////////////////////////////////////////////////////////// - -// GRPCServer registers a BlockStore gRPC server. -func (p *BlockStorePlugin) GRPCServer(s *grpc.Server) error { - proto.RegisterBlockStoreServer(s, &BlockStoreGRPCServer{mux: p.serverMux}) - return nil -} - // BlockStoreGRPCServer implements the proto-generated BlockStoreServer interface, and accepts // gRPC calls and forwards them to an implementation of the pluggable interface. type BlockStoreGRPCServer struct { diff --git a/pkg/plugin/client_dispenser.go b/pkg/plugin/framework/client_dispenser.go similarity index 90% rename from pkg/plugin/client_dispenser.go rename to pkg/plugin/framework/client_dispenser.go index b3937f092..83e934d72 100644 --- a/pkg/plugin/client_dispenser.go +++ b/pkg/plugin/framework/client_dispenser.go @@ -1,5 +1,5 @@ /* -Copyright 2018 the Heptio Ark contributors. +Copyright 2018, 2019 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -13,7 +13,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package plugin + +package framework import ( "github.com/sirupsen/logrus" @@ -27,7 +28,7 @@ type clientBase struct { } type ClientDispenser interface { - clientFor(name string) interface{} + ClientFor(name string) interface{} } // clientDispenser supports the initialization and retrieval of multiple implementations for a single plugin kind, such as @@ -55,9 +56,9 @@ func newClientDispenser(logger logrus.FieldLogger, clientConn *grpc.ClientConn, } } -// clientFor returns a gRPC client stub for the implementation of a plugin named name. If the client stub does not +// ClientFor returns a gRPC client stub for the implementation of a plugin named name. If the client stub does not // currently exist, clientFor creates it. -func (cd *clientDispenser) clientFor(name string) interface{} { +func (cd *clientDispenser) ClientFor(name string) interface{} { if client, found := cd.clients[name]; found { return client } diff --git a/pkg/plugin/client_dispenser_test.go b/pkg/plugin/framework/client_dispenser_test.go similarity index 93% rename from pkg/plugin/client_dispenser_test.go rename to pkg/plugin/framework/client_dispenser_test.go index c83009dcb..c68de3285 100644 --- a/pkg/plugin/client_dispenser_test.go +++ b/pkg/plugin/framework/client_dispenser_test.go @@ -1,5 +1,5 @@ /* -Copyright 2018 the Heptio Ark contributors. +Copyright 2018, 2019 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "testing" @@ -61,7 +61,7 @@ func TestClientFor(t *testing.T) { cd := newClientDispenser(logger, clientConn, initFunc) - actual := cd.clientFor("pod") + actual := cd.ClientFor("pod") require.IsType(t, &fakeClient{}, actual) typed := actual.(*fakeClient) assert.Equal(t, 1, count) @@ -74,7 +74,7 @@ func TestClientFor(t *testing.T) { assert.Equal(t, clientConn, typed.clientConn) // Make sure we reuse a previous client - actual = cd.clientFor("pod") + actual = cd.ClientFor("pod") require.IsType(t, &fakeClient{}, actual) typed = actual.(*fakeClient) assert.Equal(t, 1, count) diff --git a/pkg/plugin/framework/doc.go b/pkg/plugin/framework/doc.go new file mode 100644 index 000000000..76eca6b51 --- /dev/null +++ b/pkg/plugin/framework/doc.go @@ -0,0 +1,3 @@ +// Package framework is the common package that any plugin client +// will need to import, for example, both plugin authors and Velero core. +package framework diff --git a/pkg/plugin/framework/examples_test.go b/pkg/plugin/framework/examples_test.go new file mode 100644 index 000000000..cb4938842 --- /dev/null +++ b/pkg/plugin/framework/examples_test.go @@ -0,0 +1,93 @@ +/* +Copyright 2019 the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/runtime" +) + +func ExampleNewServer_blockStore() { + NewServer(). // call the server + RegisterBlockStore("example-blockstore", newBlockStore). // register the plugin + Serve() // serve the plugin +} + +func newBlockStore(logger logrus.FieldLogger) (interface{}, error) { + return &BlockStore{FieldLogger: logger}, nil +} + +type BlockStore struct { + FieldLogger logrus.FieldLogger +} + +// Implement all methods for the BlockStore interface... +func (b *BlockStore) Init(config map[string]string) error { + b.FieldLogger.Infof("BlockStore.Init called") + + // ... + + return nil +} + +func (b *BlockStore) CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ string, iops *int64) (volumeID string, err error) { + b.FieldLogger.Infof("CreateVolumeFromSnapshot called") + + // ... + + return "volumeID", nil +} + +func (b *BlockStore) GetVolumeID(pv runtime.Unstructured) (string, error) { + b.FieldLogger.Infof("GetVolumeID called") + + // ... + + return "volumeID", nil +} + +func (b *BlockStore) SetVolumeID(pv runtime.Unstructured, volumeID string) (runtime.Unstructured, error) { + b.FieldLogger.Infof("SetVolumeID called") + + // ... + + return nil, nil +} + +func (b *BlockStore) GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, error) { + b.FieldLogger.Infof("GetVolumeInfo called") + + // ... + + return "volumeFilesystemType", nil, nil +} + +func (b *BlockStore) CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (snapshotID string, err error) { + b.FieldLogger.Infof("CreateSnapshot called") + + // ... + + return "snapshotID", nil +} + +func (b *BlockStore) DeleteSnapshot(snapshotID string) error { + b.FieldLogger.Infof("DeleteSnapshot called") + + // ... + + return nil +} diff --git a/pkg/plugin/handle_panic.go b/pkg/plugin/framework/handle_panic.go similarity index 97% rename from pkg/plugin/handle_panic.go rename to pkg/plugin/framework/handle_panic.go index 1dba79fb8..ae4ec3daa 100644 --- a/pkg/plugin/handle_panic.go +++ b/pkg/plugin/framework/handle_panic.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "google.golang.org/grpc/codes" diff --git a/pkg/plugin/interface.go b/pkg/plugin/framework/interface.go similarity index 92% rename from pkg/plugin/interface.go rename to pkg/plugin/framework/interface.go index c14064422..1394e8e82 100644 --- a/pkg/plugin/interface.go +++ b/pkg/plugin/framework/interface.go @@ -1,5 +1,5 @@ /* -Copyright 2017 the Heptio Ark contributors. +Copyright 2017, 2019 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import plugin "github.com/hashicorp/go-plugin" diff --git a/pkg/plugin/logger.go b/pkg/plugin/framework/logger.go similarity index 99% rename from pkg/plugin/logger.go rename to pkg/plugin/framework/logger.go index a54bae652..85d24dd4b 100644 --- a/pkg/plugin/logger.go +++ b/pkg/plugin/framework/logger.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "github.com/sirupsen/logrus" diff --git a/pkg/plugin/logger_test.go b/pkg/plugin/framework/logger_test.go similarity index 98% rename from pkg/plugin/logger_test.go rename to pkg/plugin/framework/logger_test.go index 2d3b5b9c5..bfbdcf3b8 100644 --- a/pkg/plugin/logger_test.go +++ b/pkg/plugin/framework/logger_test.go @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "testing" diff --git a/pkg/plugin/framework/object_store.go b/pkg/plugin/framework/object_store.go new file mode 100644 index 000000000..16ab3dcf2 --- /dev/null +++ b/pkg/plugin/framework/object_store.go @@ -0,0 +1,44 @@ +/* +Copyright 2019 the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" + + proto "github.com/heptio/velero/pkg/plugin/generated" +) + +// ObjectStorePlugin is an implementation of go-plugin's Plugin +// interface with support for gRPC for the cloudprovider/ObjectStore +// interface. +type ObjectStorePlugin struct { + plugin.NetRPCUnsupportedPlugin + *pluginBase +} + +// GRPCClient returns an ObjectStore gRPC client. +func (p *ObjectStorePlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { + return newClientDispenser(p.clientLogger, c, newObjectStoreGRPCClient), nil + +} + +// GRPCServer registers an ObjectStore gRPC server. +func (p *ObjectStorePlugin) GRPCServer(s *grpc.Server) error { + proto.RegisterObjectStoreServer(s, &ObjectStoreGRPCServer{mux: p.serverMux}) + return nil +} diff --git a/pkg/plugin/framework/object_store_client.go b/pkg/plugin/framework/object_store_client.go new file mode 100644 index 000000000..534efce34 --- /dev/null +++ b/pkg/plugin/framework/object_store_client.go @@ -0,0 +1,163 @@ +/* +Copyright 2017, 2019 the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "io" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + + proto "github.com/heptio/velero/pkg/plugin/generated" +) + +const byteChunkSize = 16384 + +// NewObjectStorePlugin construct an ObjectStorePlugin. +func NewObjectStorePlugin(options ...PluginOption) *ObjectStorePlugin { + return &ObjectStorePlugin{ + pluginBase: newPluginBase(options...), + } +} + +// ObjectStoreGRPCClient implements the ObjectStore interface and uses a +// gRPC client to make calls to the plugin server. +type ObjectStoreGRPCClient struct { + *clientBase + grpcClient proto.ObjectStoreClient +} + +func newObjectStoreGRPCClient(base *clientBase, clientConn *grpc.ClientConn) interface{} { + return &ObjectStoreGRPCClient{ + clientBase: base, + grpcClient: proto.NewObjectStoreClient(clientConn), + } +} + +// Init prepares the ObjectStore for usage using the provided map of +// configuration key-value pairs. It returns an error if the ObjectStore +// cannot be initialized from the provided config. +func (c *ObjectStoreGRPCClient) Init(config map[string]string) error { + _, err := c.grpcClient.Init(context.Background(), &proto.InitRequest{Plugin: c.plugin, Config: config}) + + return err +} + +// PutObject creates a new object using the data in body within the specified +// object storage bucket with the given key. +func (c *ObjectStoreGRPCClient) PutObject(bucket, key string, body io.Reader) error { + stream, err := c.grpcClient.PutObject(context.Background()) + if err != nil { + return err + } + + // read from the provider io.Reader into chunks, and send each one over + // the gRPC stream + chunk := make([]byte, byteChunkSize) + for { + n, err := body.Read(chunk) + if err == io.EOF { + _, resErr := stream.CloseAndRecv() + return resErr + } + if err != nil { + stream.CloseSend() + return err + } + + if err := stream.Send(&proto.PutObjectRequest{Plugin: c.plugin, Bucket: bucket, Key: key, Body: chunk[0:n]}); err != nil { + return err + } + } +} + +// GetObject retrieves the object with the given key from the specified +// bucket in object storage. +func (c *ObjectStoreGRPCClient) GetObject(bucket, key string) (io.ReadCloser, error) { + stream, err := c.grpcClient.GetObject(context.Background(), &proto.GetObjectRequest{Plugin: c.plugin, Bucket: bucket, Key: key}) + if err != nil { + return nil, err + } + + receive := func() ([]byte, error) { + data, err := stream.Recv() + if err != nil { + return nil, err + } + + return data.Data, nil + } + + close := func() error { + return stream.CloseSend() + } + + return &StreamReadCloser{receive: receive, close: close}, nil +} + +// ListCommonPrefixes gets a list of all object key prefixes that come +// after the provided prefix and before the provided delimiter (this is +// often used to simulate a directory hierarchy in object storage). +func (c *ObjectStoreGRPCClient) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) { + req := &proto.ListCommonPrefixesRequest{ + Plugin: c.plugin, + Bucket: bucket, + Prefix: prefix, + Delimiter: delimiter, + } + + res, err := c.grpcClient.ListCommonPrefixes(context.Background(), req) + if err != nil { + return nil, err + } + + return res.Prefixes, nil +} + +// ListObjects gets a list of all objects in bucket that have the same prefix. +func (c *ObjectStoreGRPCClient) ListObjects(bucket, prefix string) ([]string, error) { + res, err := c.grpcClient.ListObjects(context.Background(), &proto.ListObjectsRequest{Plugin: c.plugin, Bucket: bucket, Prefix: prefix}) + if err != nil { + return nil, err + } + + return res.Keys, nil +} + +// DeleteObject removes object with the specified key from the given +// bucket. +func (c *ObjectStoreGRPCClient) DeleteObject(bucket, key string) error { + _, err := c.grpcClient.DeleteObject(context.Background(), &proto.DeleteObjectRequest{Plugin: c.plugin, Bucket: bucket, Key: key}) + + return err +} + +// CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl. +func (c *ObjectStoreGRPCClient) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) { + res, err := c.grpcClient.CreateSignedURL(context.Background(), &proto.CreateSignedURLRequest{ + Plugin: c.plugin, + Bucket: bucket, + Key: key, + Ttl: int64(ttl), + }) + if err != nil { + return "", nil + } + + return res.Url, nil +} diff --git a/pkg/plugin/object_store.go b/pkg/plugin/framework/object_store_server.go similarity index 54% rename from pkg/plugin/object_store.go rename to pkg/plugin/framework/object_store_server.go index ad8710158..f54385043 100644 --- a/pkg/plugin/object_store.go +++ b/pkg/plugin/framework/object_store_server.go @@ -14,185 +14,19 @@ See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "io" "time" - "github.com/hashicorp/go-plugin" "github.com/pkg/errors" "golang.org/x/net/context" - "google.golang.org/grpc" proto "github.com/heptio/velero/pkg/plugin/generated" "github.com/heptio/velero/pkg/plugin/velero" ) -const byteChunkSize = 16384 - -// ObjectStorePlugin is an implementation of go-plugin's Plugin -// interface with support for gRPC for the cloudprovider/ObjectStore -// interface. -type ObjectStorePlugin struct { - plugin.NetRPCUnsupportedPlugin - *pluginBase -} - -// NewObjectStorePlugin construct an ObjectStorePlugin. -func NewObjectStorePlugin(options ...pluginOption) *ObjectStorePlugin { - return &ObjectStorePlugin{ - pluginBase: newPluginBase(options...), - } -} - -////////////////////////////////////////////////////////////////////////////// -// client code -////////////////////////////////////////////////////////////////////////////// - -// GRPCClient returns an ObjectStore gRPC client. -func (p *ObjectStorePlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { - return newClientDispenser(p.clientLogger, c, newObjectStoreGRPCClient), nil - -} - -// ObjectStoreGRPCClient implements the ObjectStore interface and uses a -// gRPC client to make calls to the plugin server. -type ObjectStoreGRPCClient struct { - *clientBase - grpcClient proto.ObjectStoreClient -} - -func newObjectStoreGRPCClient(base *clientBase, clientConn *grpc.ClientConn) interface{} { - return &ObjectStoreGRPCClient{ - clientBase: base, - grpcClient: proto.NewObjectStoreClient(clientConn), - } -} - -// Init prepares the ObjectStore for usage using the provided map of -// configuration key-value pairs. It returns an error if the ObjectStore -// cannot be initialized from the provided config. -func (c *ObjectStoreGRPCClient) Init(config map[string]string) error { - _, err := c.grpcClient.Init(context.Background(), &proto.InitRequest{Plugin: c.plugin, Config: config}) - - return err -} - -// PutObject creates a new object using the data in body within the specified -// object storage bucket with the given key. -func (c *ObjectStoreGRPCClient) PutObject(bucket, key string, body io.Reader) error { - stream, err := c.grpcClient.PutObject(context.Background()) - if err != nil { - return err - } - - // read from the provider io.Reader into chunks, and send each one over - // the gRPC stream - chunk := make([]byte, byteChunkSize) - for { - n, err := body.Read(chunk) - if err == io.EOF { - _, resErr := stream.CloseAndRecv() - return resErr - } - if err != nil { - stream.CloseSend() - return err - } - - if err := stream.Send(&proto.PutObjectRequest{Plugin: c.plugin, Bucket: bucket, Key: key, Body: chunk[0:n]}); err != nil { - return err - } - } -} - -// GetObject retrieves the object with the given key from the specified -// bucket in object storage. -func (c *ObjectStoreGRPCClient) GetObject(bucket, key string) (io.ReadCloser, error) { - stream, err := c.grpcClient.GetObject(context.Background(), &proto.GetObjectRequest{Plugin: c.plugin, Bucket: bucket, Key: key}) - if err != nil { - return nil, err - } - - receive := func() ([]byte, error) { - data, err := stream.Recv() - if err != nil { - return nil, err - } - - return data.Data, nil - } - - close := func() error { - return stream.CloseSend() - } - - return &StreamReadCloser{receive: receive, close: close}, nil -} - -// ListCommonPrefixes gets a list of all object key prefixes that come -// after the provided prefix and before the provided delimiter (this is -// often used to simulate a directory hierarchy in object storage). -func (c *ObjectStoreGRPCClient) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) { - req := &proto.ListCommonPrefixesRequest{ - Plugin: c.plugin, - Bucket: bucket, - Prefix: prefix, - Delimiter: delimiter, - } - - res, err := c.grpcClient.ListCommonPrefixes(context.Background(), req) - if err != nil { - return nil, err - } - - return res.Prefixes, nil -} - -// ListObjects gets a list of all objects in bucket that have the same prefix. -func (c *ObjectStoreGRPCClient) ListObjects(bucket, prefix string) ([]string, error) { - res, err := c.grpcClient.ListObjects(context.Background(), &proto.ListObjectsRequest{Plugin: c.plugin, Bucket: bucket, Prefix: prefix}) - if err != nil { - return nil, err - } - - return res.Keys, nil -} - -// DeleteObject removes object with the specified key from the given -// bucket. -func (c *ObjectStoreGRPCClient) DeleteObject(bucket, key string) error { - _, err := c.grpcClient.DeleteObject(context.Background(), &proto.DeleteObjectRequest{Plugin: c.plugin, Bucket: bucket, Key: key}) - - return err -} - -// CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl. -func (c *ObjectStoreGRPCClient) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) { - res, err := c.grpcClient.CreateSignedURL(context.Background(), &proto.CreateSignedURLRequest{ - Plugin: c.plugin, - Bucket: bucket, - Key: key, - Ttl: int64(ttl), - }) - if err != nil { - return "", nil - } - - return res.Url, nil -} - -////////////////////////////////////////////////////////////////////////////// -// server code -////////////////////////////////////////////////////////////////////////////// - -// GRPCServer registers an ObjectStore gRPC server. -func (p *ObjectStorePlugin) GRPCServer(s *grpc.Server) error { - proto.RegisterObjectStoreServer(s, &ObjectStoreGRPCServer{mux: p.serverMux}) - return nil -} - // ObjectStoreGRPCServer implements the proto-generated ObjectStoreServer interface, and accepts // gRPC calls and forwards them to an implementation of the pluggable interface. type ObjectStoreGRPCServer struct { diff --git a/pkg/plugin/plugin_base.go b/pkg/plugin/framework/plugin_base.go similarity index 72% rename from pkg/plugin/plugin_base.go rename to pkg/plugin/framework/plugin_base.go index 51acd3e95..4a97fa47b 100644 --- a/pkg/plugin/plugin_base.go +++ b/pkg/plugin/framework/plugin_base.go @@ -1,5 +1,5 @@ /* -Copyright 2018 the Heptio Ark contributors. +Copyright 2018, 2019 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,16 +14,18 @@ See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework -import "github.com/sirupsen/logrus" +import ( + "github.com/sirupsen/logrus" +) type pluginBase struct { clientLogger logrus.FieldLogger *serverMux } -func newPluginBase(options ...pluginOption) *pluginBase { +func newPluginBase(options ...PluginOption) *pluginBase { base := new(pluginBase) for _, option := range options { option(base) @@ -31,15 +33,15 @@ func newPluginBase(options ...pluginOption) *pluginBase { return base } -type pluginOption func(base *pluginBase) +type PluginOption func(base *pluginBase) -func clientLogger(logger logrus.FieldLogger) pluginOption { +func ClientLogger(logger logrus.FieldLogger) PluginOption { return func(base *pluginBase) { base.clientLogger = logger } } -func serverLogger(logger logrus.FieldLogger) pluginOption { +func serverLogger(logger logrus.FieldLogger) PluginOption { return func(base *pluginBase) { base.serverMux = newServerMux(logger) } diff --git a/pkg/plugin/plugin_base_test.go b/pkg/plugin/framework/plugin_base_test.go similarity index 91% rename from pkg/plugin/plugin_base_test.go rename to pkg/plugin/framework/plugin_base_test.go index 557240b55..016e04f15 100644 --- a/pkg/plugin/plugin_base_test.go +++ b/pkg/plugin/framework/plugin_base_test.go @@ -1,5 +1,5 @@ /* -Copyright 2018 the Heptio Ark contributors. +Copyright 2018, 2019 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "testing" @@ -26,7 +26,7 @@ import ( func TestClientLogger(t *testing.T) { base := &pluginBase{} logger := test.NewLogger() - f := clientLogger(logger) + f := ClientLogger(logger) f(base) assert.Equal(t, logger, base.clientLogger) } diff --git a/pkg/plugin/plugin_kinds.go b/pkg/plugin/framework/plugin_kinds.go similarity index 96% rename from pkg/plugin/plugin_kinds.go rename to pkg/plugin/framework/plugin_kinds.go index 70d17de7c..5e59d08af 100644 --- a/pkg/plugin/plugin_kinds.go +++ b/pkg/plugin/framework/plugin_kinds.go @@ -1,5 +1,5 @@ /* -Copyright 2018 the Heptio Ark contributors. +Copyright 2018, 2019 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -13,7 +13,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package plugin + +package framework import ( "k8s.io/apimachinery/pkg/util/sets" diff --git a/pkg/plugin/plugin_kinds_test.go b/pkg/plugin/framework/plugin_kinds_test.go similarity index 93% rename from pkg/plugin/plugin_kinds_test.go rename to pkg/plugin/framework/plugin_kinds_test.go index ef614a8a2..caf87d73d 100644 --- a/pkg/plugin/plugin_kinds_test.go +++ b/pkg/plugin/framework/plugin_kinds_test.go @@ -1,5 +1,5 @@ /* -Copyright 2018 the Heptio Ark contributors. +Copyright 2018, 2019 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "testing" diff --git a/pkg/plugin/plugin_lister.go b/pkg/plugin/framework/plugin_lister.go similarity index 97% rename from pkg/plugin/plugin_lister.go rename to pkg/plugin/framework/plugin_lister.go index 3f43c174a..6298fa022 100644 --- a/pkg/plugin/plugin_lister.go +++ b/pkg/plugin/framework/plugin_lister.go @@ -1,5 +1,5 @@ /* -Copyright 2018 the Heptio Ark contributors. +Copyright 2018, 2019 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -13,7 +13,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package plugin + +package framework import ( plugin "github.com/hashicorp/go-plugin" @@ -24,7 +25,7 @@ import ( proto "github.com/heptio/velero/pkg/plugin/generated" ) -// PluginIdenitifer uniquely identifies a plugin by command, kind, and name. +// PluginIdentifier uniquely identifies a plugin by command, kind, and name. type PluginIdentifier struct { Command string Kind PluginKind diff --git a/pkg/plugin/framework/restore_item_action.go b/pkg/plugin/framework/restore_item_action.go new file mode 100644 index 000000000..6dce8cb55 --- /dev/null +++ b/pkg/plugin/framework/restore_item_action.go @@ -0,0 +1,43 @@ +/* +Copyright 2019 the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" + + proto "github.com/heptio/velero/pkg/plugin/generated" +) + +// RestoreItemActionPlugin is an implementation of go-plugin's Plugin +// interface with support for gRPC for the restore/ItemAction +// interface. +type RestoreItemActionPlugin struct { + plugin.NetRPCUnsupportedPlugin + *pluginBase +} + +// GRPCClient returns a RestoreItemAction gRPC client. +func (p *RestoreItemActionPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { + return newClientDispenser(p.clientLogger, c, newRestoreItemActionGRPCClient), nil +} + +// GRPCServer registers a RestoreItemAction gRPC server. +func (p *RestoreItemActionPlugin) GRPCServer(s *grpc.Server) error { + proto.RegisterRestoreItemActionServer(s, &RestoreItemActionGRPCServer{mux: p.serverMux}) + return nil +} diff --git a/pkg/plugin/framework/restore_item_action_client.go b/pkg/plugin/framework/restore_item_action_client.go new file mode 100644 index 000000000..0727bddd8 --- /dev/null +++ b/pkg/plugin/framework/restore_item_action_client.go @@ -0,0 +1,111 @@ +/* +Copyright 2017, 2019 the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "encoding/json" + + "github.com/pkg/errors" + "golang.org/x/net/context" + "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + proto "github.com/heptio/velero/pkg/plugin/generated" + "github.com/heptio/velero/pkg/plugin/velero" +) + +var _ velero.RestoreItemAction = &RestoreItemActionGRPCClient{} + +// NewRestoreItemActionPlugin constructs a RestoreItemActionPlugin. +func NewRestoreItemActionPlugin(options ...PluginOption) *RestoreItemActionPlugin { + return &RestoreItemActionPlugin{ + pluginBase: newPluginBase(options...), + } +} + +// RestoreItemActionGRPCClient implements the backup/ItemAction interface and uses a +// gRPC client to make calls to the plugin server. +type RestoreItemActionGRPCClient struct { + *clientBase + grpcClient proto.RestoreItemActionClient +} + +func newRestoreItemActionGRPCClient(base *clientBase, clientConn *grpc.ClientConn) interface{} { + return &RestoreItemActionGRPCClient{ + clientBase: base, + grpcClient: proto.NewRestoreItemActionClient(clientConn), + } +} + +func (c *RestoreItemActionGRPCClient) AppliesTo() (velero.ResourceSelector, error) { + res, err := c.grpcClient.AppliesTo(context.Background(), &proto.AppliesToRequest{Plugin: c.plugin}) + if err != nil { + return velero.ResourceSelector{}, err + } + + return velero.ResourceSelector{ + IncludedNamespaces: res.IncludedNamespaces, + ExcludedNamespaces: res.ExcludedNamespaces, + IncludedResources: res.IncludedResources, + ExcludedResources: res.ExcludedResources, + LabelSelector: res.Selector, + }, nil +} + +func (c *RestoreItemActionGRPCClient) Execute(input *velero.RestoreItemActionExecuteInput) (*velero.RestoreItemActionExecuteOutput, error) { + itemJSON, err := json.Marshal(input.Item.UnstructuredContent()) + if err != nil { + return nil, err + } + + itemFromBackupJSON, err := json.Marshal(input.ItemFromBackup.UnstructuredContent()) + if err != nil { + return nil, err + } + + restoreJSON, err := json.Marshal(input.Restore) + if err != nil { + return nil, err + } + + req := &proto.RestoreExecuteRequest{ + Plugin: c.plugin, + Item: itemJSON, + ItemFromBackup: itemFromBackupJSON, + Restore: restoreJSON, + } + + res, err := c.grpcClient.Execute(context.Background(), req) + if err != nil { + return nil, err + } + + var updatedItem unstructured.Unstructured + if err := json.Unmarshal(res.Item, &updatedItem); err != nil { + return nil, err + } + + var warning error + if res.Warning != "" { + warning = errors.New(res.Warning) + } + + return &velero.RestoreItemActionExecuteOutput{ + UpdatedItem: &updatedItem, + Warning: warning, + }, nil +} diff --git a/pkg/plugin/framework/restore_item_action_server.go b/pkg/plugin/framework/restore_item_action_server.go new file mode 100644 index 000000000..d2af22e4d --- /dev/null +++ b/pkg/plugin/framework/restore_item_action_server.go @@ -0,0 +1,130 @@ +/* +Copyright 2017, 2019 the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "encoding/json" + + "github.com/pkg/errors" + "golang.org/x/net/context" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + api "github.com/heptio/velero/pkg/apis/velero/v1" + proto "github.com/heptio/velero/pkg/plugin/generated" + "github.com/heptio/velero/pkg/plugin/velero" +) + +// RestoreItemActionGRPCServer implements the proto-generated RestoreItemActionServer interface, and accepts +// gRPC calls and forwards them to an implementation of the pluggable interface. +type RestoreItemActionGRPCServer struct { + mux *serverMux +} + +func (s *RestoreItemActionGRPCServer) getImpl(name string) (velero.RestoreItemAction, error) { + impl, err := s.mux.getHandler(name) + if err != nil { + return nil, err + } + + itemAction, ok := impl.(velero.RestoreItemAction) + if !ok { + return nil, errors.Errorf("%T is not a restore item action", impl) + } + + return itemAction, nil +} + +func (s *RestoreItemActionGRPCServer) AppliesTo(ctx context.Context, req *proto.AppliesToRequest) (response *proto.AppliesToResponse, err error) { + defer func() { + if recoveredErr := handlePanic(recover()); recoveredErr != nil { + err = recoveredErr + } + }() + + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + + appliesTo, err := impl.AppliesTo() + if err != nil { + return nil, err + } + + return &proto.AppliesToResponse{ + IncludedNamespaces: appliesTo.IncludedNamespaces, + ExcludedNamespaces: appliesTo.ExcludedNamespaces, + IncludedResources: appliesTo.IncludedResources, + ExcludedResources: appliesTo.ExcludedResources, + Selector: appliesTo.LabelSelector, + }, nil +} + +func (s *RestoreItemActionGRPCServer) Execute(ctx context.Context, req *proto.RestoreExecuteRequest) (response *proto.RestoreExecuteResponse, err error) { + defer func() { + if recoveredErr := handlePanic(recover()); recoveredErr != nil { + err = recoveredErr + } + }() + + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + + var ( + item unstructured.Unstructured + itemFromBackup unstructured.Unstructured + restoreObj api.Restore + ) + + if err := json.Unmarshal(req.Item, &item); err != nil { + return nil, err + } + + if err := json.Unmarshal(req.ItemFromBackup, &itemFromBackup); err != nil { + return nil, err + } + + if err := json.Unmarshal(req.Restore, &restoreObj); err != nil { + return nil, err + } + + executeOutput, err := impl.Execute(&velero.RestoreItemActionExecuteInput{ + Item: &item, + ItemFromBackup: &itemFromBackup, + Restore: &restoreObj, + }) + if err != nil { + return nil, err + } + + updatedItem, err := json.Marshal(executeOutput.UpdatedItem) + if err != nil { + return nil, err + } + + var warnMessage string + if executeOutput.Warning != nil { + warnMessage = executeOutput.Warning.Error() + } + + return &proto.RestoreExecuteResponse{ + Item: updatedItem, + Warning: warnMessage, + }, nil +} diff --git a/pkg/plugin/server.go b/pkg/plugin/framework/server.go similarity index 99% rename from pkg/plugin/server.go rename to pkg/plugin/framework/server.go index 7af0980a8..3d9d7e480 100644 --- a/pkg/plugin/server.go +++ b/pkg/plugin/framework/server.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "fmt" diff --git a/pkg/plugin/server_mux.go b/pkg/plugin/framework/server_mux.go similarity index 97% rename from pkg/plugin/server_mux.go rename to pkg/plugin/framework/server_mux.go index e1a1b1820..ca1ad0d7c 100644 --- a/pkg/plugin/server_mux.go +++ b/pkg/plugin/framework/server_mux.go @@ -1,5 +1,5 @@ /* -Copyright 2018 the Heptio Ark contributors. +Copyright 2018, 2019 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -13,7 +13,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package plugin + +package framework import ( "github.com/pkg/errors" diff --git a/pkg/plugin/stream_reader.go b/pkg/plugin/framework/stream_reader.go similarity index 96% rename from pkg/plugin/stream_reader.go rename to pkg/plugin/framework/stream_reader.go index 5d6cfe405..4af85fce8 100644 --- a/pkg/plugin/stream_reader.go +++ b/pkg/plugin/framework/stream_reader.go @@ -1,5 +1,5 @@ /* -Copyright 2017 the Heptio Ark contributors. +Copyright 2017, 2019 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "bytes" diff --git a/pkg/plugin/stream_reader_test.go b/pkg/plugin/framework/stream_reader_test.go similarity index 95% rename from pkg/plugin/stream_reader_test.go rename to pkg/plugin/framework/stream_reader_test.go index 73cc83e58..c1fa80c2a 100644 --- a/pkg/plugin/stream_reader_test.go +++ b/pkg/plugin/framework/stream_reader_test.go @@ -1,5 +1,5 @@ /* -Copyright 2017 the Heptio Ark contributors. +Copyright 2017, 2019 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package plugin +package framework import ( "bytes" diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index f1001b1ea..78225e04f 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -21,6 +21,7 @@ import ( "github.com/sirupsen/logrus" + "github.com/heptio/velero/pkg/plugin/framework" "github.com/heptio/velero/pkg/plugin/velero" ) @@ -86,12 +87,12 @@ func (m *manager) CleanupClients() { // getRestartableProcess returns a restartableProcess for a plugin identified by kind and name, creating a // restartableProcess if it is the first time it has been requested. -func (m *manager) getRestartableProcess(kind PluginKind, name string) (RestartableProcess, error) { +func (m *manager) getRestartableProcess(kind framework.PluginKind, name string) (RestartableProcess, error) { m.lock.Lock() defer m.lock.Unlock() logger := m.logger.WithFields(logrus.Fields{ - "kind": PluginKindObjectStore.String(), + "kind": framework.PluginKindObjectStore.String(), "name": name, }) logger.Debug("looking for plugin in registry") @@ -123,7 +124,7 @@ func (m *manager) getRestartableProcess(kind PluginKind, name string) (Restartab // GetObjectStore returns a restartableObjectStore for name. func (m *manager) GetObjectStore(name string) (velero.ObjectStore, error) { - restartableProcess, err := m.getRestartableProcess(PluginKindObjectStore, name) + restartableProcess, err := m.getRestartableProcess(framework.PluginKindObjectStore, name) if err != nil { return nil, err } @@ -135,7 +136,7 @@ func (m *manager) GetObjectStore(name string) (velero.ObjectStore, error) { // GetBlockStore returns a restartableBlockStore for name. func (m *manager) GetBlockStore(name string) (velero.BlockStore, error) { - restartableProcess, err := m.getRestartableProcess(PluginKindBlockStore, name) + restartableProcess, err := m.getRestartableProcess(framework.PluginKindBlockStore, name) if err != nil { return nil, err } @@ -147,7 +148,7 @@ func (m *manager) GetBlockStore(name string) (velero.BlockStore, error) { // GetBackupItemActions returns all backup item actions as restartableBackupItemActions. func (m *manager) GetBackupItemActions() ([]velero.BackupItemAction, error) { - list := m.registry.List(PluginKindBackupItemAction) + list := m.registry.List(framework.PluginKindBackupItemAction) actions := make([]velero.BackupItemAction, 0, len(list)) @@ -167,7 +168,7 @@ func (m *manager) GetBackupItemActions() ([]velero.BackupItemAction, error) { // GetBackupItemAction returns a restartableBackupItemAction for name. func (m *manager) GetBackupItemAction(name string) (velero.BackupItemAction, error) { - restartableProcess, err := m.getRestartableProcess(PluginKindBackupItemAction, name) + restartableProcess, err := m.getRestartableProcess(framework.PluginKindBackupItemAction, name) if err != nil { return nil, err } @@ -178,7 +179,7 @@ func (m *manager) GetBackupItemAction(name string) (velero.BackupItemAction, err // GetRestoreItemActions returns all restore item actions as restartableRestoreItemActions. func (m *manager) GetRestoreItemActions() ([]velero.RestoreItemAction, error) { - list := m.registry.List(PluginKindRestoreItemAction) + list := m.registry.List(framework.PluginKindRestoreItemAction) actions := make([]velero.RestoreItemAction, 0, len(list)) @@ -198,7 +199,7 @@ func (m *manager) GetRestoreItemActions() ([]velero.RestoreItemAction, error) { // GetRestoreItemAction returns a restartableRestoreItemAction for name. func (m *manager) GetRestoreItemAction(name string) (velero.RestoreItemAction, error) { - restartableProcess, err := m.getRestartableProcess(PluginKindRestoreItemAction, name) + restartableProcess, err := m.getRestartableProcess(framework.PluginKindRestoreItemAction, name) if err != nil { return nil, err } diff --git a/pkg/plugin/manager_test.go b/pkg/plugin/manager_test.go index 4f54cc283..8cb64c3f3 100644 --- a/pkg/plugin/manager_test.go +++ b/pkg/plugin/manager_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/heptio/velero/pkg/plugin/framework" "github.com/heptio/velero/pkg/util/test" ) @@ -38,16 +39,16 @@ func (r *mockRegistry) DiscoverPlugins() error { return args.Error(0) } -func (r *mockRegistry) List(kind PluginKind) []PluginIdentifier { +func (r *mockRegistry) List(kind framework.PluginKind) []framework.PluginIdentifier { args := r.Called(kind) - return args.Get(0).([]PluginIdentifier) + return args.Get(0).([]framework.PluginIdentifier) } -func (r *mockRegistry) Get(kind PluginKind, name string) (PluginIdentifier, error) { +func (r *mockRegistry) Get(kind framework.PluginKind, name string) (framework.PluginIdentifier, error) { args := r.Called(kind, name) - var id PluginIdentifier + var id framework.PluginIdentifier if args.Get(0) != nil { - id = args.Get(0).(PluginIdentifier) + id = args.Get(0).(framework.PluginIdentifier) } return id, args.Error(1) } @@ -120,7 +121,7 @@ func TestGetRestartableProcess(t *testing.T) { m.restartableProcessFactory = factory // Test 1: registry error - pluginKind := PluginKindBackupItemAction + pluginKind := framework.PluginKindBackupItemAction pluginName := "pod" registry.On("Get", pluginKind, pluginName).Return(nil, errors.Errorf("registry")).Once() rp, err := m.getRestartableProcess(pluginKind, pluginName) @@ -128,7 +129,7 @@ func TestGetRestartableProcess(t *testing.T) { assert.EqualError(t, err, "registry") // Test 2: registry ok, factory error - podID := PluginIdentifier{ + podID := framework.PluginIdentifier{ Command: "/command", Kind: pluginKind, Name: pluginName, @@ -174,14 +175,14 @@ func TestCleanupClients(t *testing.T) { func TestGetObjectStore(t *testing.T) { getPluginTest(t, - PluginKindObjectStore, + framework.PluginKindObjectStore, "aws", func(m Manager, name string) (interface{}, error) { return m.GetObjectStore(name) }, func(name string, sharedPluginProcess RestartableProcess) interface{} { return &restartableObjectStore{ - key: kindAndName{kind: PluginKindObjectStore, name: name}, + key: kindAndName{kind: framework.PluginKindObjectStore, name: name}, sharedPluginProcess: sharedPluginProcess, } }, @@ -191,14 +192,14 @@ func TestGetObjectStore(t *testing.T) { func TestGetBlockStore(t *testing.T) { getPluginTest(t, - PluginKindBlockStore, + framework.PluginKindBlockStore, "aws", func(m Manager, name string) (interface{}, error) { return m.GetBlockStore(name) }, func(name string, sharedPluginProcess RestartableProcess) interface{} { return &restartableBlockStore{ - key: kindAndName{kind: PluginKindBlockStore, name: name}, + key: kindAndName{kind: framework.PluginKindBlockStore, name: name}, sharedPluginProcess: sharedPluginProcess, } }, @@ -208,14 +209,14 @@ func TestGetBlockStore(t *testing.T) { func TestGetBackupItemAction(t *testing.T) { getPluginTest(t, - PluginKindBackupItemAction, + framework.PluginKindBackupItemAction, "pod", func(m Manager, name string) (interface{}, error) { return m.GetBackupItemAction(name) }, func(name string, sharedPluginProcess RestartableProcess) interface{} { return &restartableBackupItemAction{ - key: kindAndName{kind: PluginKindBackupItemAction, name: name}, + key: kindAndName{kind: framework.PluginKindBackupItemAction, name: name}, sharedPluginProcess: sharedPluginProcess, } }, @@ -225,14 +226,14 @@ func TestGetBackupItemAction(t *testing.T) { func TestGetRestoreItemAction(t *testing.T) { getPluginTest(t, - PluginKindRestoreItemAction, + framework.PluginKindRestoreItemAction, "pod", func(m Manager, name string) (interface{}, error) { return m.GetRestoreItemAction(name) }, func(name string, sharedPluginProcess RestartableProcess) interface{} { return &restartableRestoreItemAction{ - key: kindAndName{kind: PluginKindRestoreItemAction, name: name}, + key: kindAndName{kind: framework.PluginKindRestoreItemAction, name: name}, sharedPluginProcess: sharedPluginProcess, } }, @@ -242,7 +243,7 @@ func TestGetRestoreItemAction(t *testing.T) { func getPluginTest( t *testing.T, - kind PluginKind, + kind framework.PluginKind, name string, getPluginFunc func(m Manager, name string) (interface{}, error), expectedResultFunc func(name string, sharedPluginProcess RestartableProcess) interface{}, @@ -261,7 +262,7 @@ func getPluginTest( pluginKind := kind pluginName := name - pluginID := PluginIdentifier{ + pluginID := framework.PluginIdentifier{ Command: "/command", Kind: pluginKind, Name: pluginName, @@ -326,10 +327,10 @@ func TestGetBackupItemActions(t *testing.T) { defer factory.AssertExpectations(t) m.restartableProcessFactory = factory - pluginKind := PluginKindBackupItemAction - var pluginIDs []PluginIdentifier + pluginKind := framework.PluginKindBackupItemAction + var pluginIDs []framework.PluginIdentifier for i := range tc.names { - pluginID := PluginIdentifier{ + pluginID := framework.PluginIdentifier{ Command: "/command", Kind: pluginKind, Name: tc.names[i], @@ -418,10 +419,10 @@ func TestGetRestoreItemActions(t *testing.T) { defer factory.AssertExpectations(t) m.restartableProcessFactory = factory - pluginKind := PluginKindRestoreItemAction - var pluginIDs []PluginIdentifier + pluginKind := framework.PluginKindRestoreItemAction + var pluginIDs []framework.PluginIdentifier for i := range tc.names { - pluginID := PluginIdentifier{ + pluginID := framework.PluginIdentifier{ Command: "/command", Kind: pluginKind, Name: tc.names[i], diff --git a/pkg/plugin/process.go b/pkg/plugin/process.go index fa2562829..27d600b7d 100644 --- a/pkg/plugin/process.go +++ b/pkg/plugin/process.go @@ -19,6 +19,8 @@ import ( plugin "github.com/hashicorp/go-plugin" "github.com/pkg/errors" "github.com/sirupsen/logrus" + + "github.com/heptio/velero/pkg/plugin/framework" ) type ProcessFactory interface { @@ -75,13 +77,13 @@ func (r *process) dispense(key kindAndName) (interface{}, error) { } // Currently all plugins except for PluginLister dispense clientDispenser instances. - if clientDispenser, ok := dispensed.(ClientDispenser); ok { + if clientDispenser, ok := dispensed.(framework.ClientDispenser); ok { if key.name == "" { return nil, errors.Errorf("%s plugin requested but name is missing", key.kind.String()) } // Get the instance that implements our plugin interface (e.g. ObjectStore) that is a gRPC-based // client - dispensed = clientDispenser.clientFor(key.name) + dispensed = clientDispenser.ClientFor(key.name) } return dispensed, nil diff --git a/pkg/plugin/process_test.go b/pkg/plugin/process_test.go index 4785da5c8..d67c04966 100644 --- a/pkg/plugin/process_test.go +++ b/pkg/plugin/process_test.go @@ -22,6 +22,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/heptio/velero/pkg/plugin/framework" ) type mockClientProtocol struct { @@ -47,7 +49,7 @@ type mockClientDispenser struct { mock.Mock } -func (cd *mockClientDispenser) clientFor(name string) interface{} { +func (cd *mockClientDispenser) ClientFor(name string) interface{} { args := cd.Called(name) return args.Get(0) } @@ -94,17 +96,17 @@ func TestDispense(t *testing.T) { key := kindAndName{} if tc.clientDispenser { - key.kind = PluginKindObjectStore + key.kind = framework.PluginKindObjectStore protocolClient.On("Dispense", key.kind.String()).Return(clientDispenser, tc.dispenseError) if !tc.missingKeyName { key.name = "aws" - client = &BackupItemActionGRPCClient{} - clientDispenser.On("clientFor", key.name).Return(client) + client = &framework.BackupItemActionGRPCClient{} + clientDispenser.On("ClientFor", key.name).Return(client) } } else { - key.kind = PluginKindPluginLister - client = &PluginListerGRPCClient{} + key.kind = framework.PluginKindPluginLister + client = &framework.PluginListerGRPCClient{} protocolClient.On("Dispense", key.kind.String()).Return(client, tc.dispenseError) } diff --git a/pkg/plugin/registry.go b/pkg/plugin/registry.go index 4c79053c2..5b34a0b39 100644 --- a/pkg/plugin/registry.go +++ b/pkg/plugin/registry.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/heptio/velero/pkg/plugin/framework" "github.com/heptio/velero/pkg/util/filesystem" ) @@ -31,14 +32,14 @@ type Registry interface { // DiscoverPlugins discovers all available plugins. DiscoverPlugins() error // List returns all PluginIdentifiers for kind. - List(kind PluginKind) []PluginIdentifier + List(kind framework.PluginKind) []framework.PluginIdentifier // Get returns the PluginIdentifier for kind and name. - Get(kind PluginKind, name string) (PluginIdentifier, error) + Get(kind framework.PluginKind, name string) (framework.PluginIdentifier, error) } // kindAndName is a convenience struct that combines a PluginKind and a name. type kindAndName struct { - kind PluginKind + kind framework.PluginKind name string } @@ -51,8 +52,8 @@ type registry struct { processFactory ProcessFactory fs filesystem.Interface - pluginsByID map[kindAndName]PluginIdentifier - pluginsByKind map[PluginKind][]PluginIdentifier + pluginsByID map[kindAndName]framework.PluginIdentifier + pluginsByKind map[framework.PluginKind][]framework.PluginIdentifier } // NewRegistry returns a new registry. @@ -64,8 +65,8 @@ func NewRegistry(dir string, logger logrus.FieldLogger, logLevel logrus.Level) R processFactory: newProcessFactory(), fs: filesystem.NewFileSystem(), - pluginsByID: make(map[kindAndName]PluginIdentifier), - pluginsByKind: make(map[PluginKind][]PluginIdentifier), + pluginsByID: make(map[kindAndName]framework.PluginIdentifier), + pluginsByKind: make(map[framework.PluginKind][]framework.PluginIdentifier), } } @@ -108,16 +109,16 @@ func (r *registry) discoverPlugins(commands []string) error { // List returns info about all plugin binaries that implement the given // PluginKind. -func (r *registry) List(kind PluginKind) []PluginIdentifier { +func (r *registry) List(kind framework.PluginKind) []framework.PluginIdentifier { return r.pluginsByKind[kind] } // Get returns info about a plugin with the given name and kind, or an // error if one cannot be found. -func (r *registry) Get(kind PluginKind, name string) (PluginIdentifier, error) { +func (r *registry) Get(kind framework.PluginKind, name string) (framework.PluginIdentifier, error) { p, found := r.pluginsByID[kindAndName{kind: kind, name: name}] if !found { - return PluginIdentifier{}, newPluginNotFoundError(kind, name) + return framework.PluginIdentifier{}, newPluginNotFoundError(kind, name) } return p, nil } @@ -173,19 +174,19 @@ func executable(info os.FileInfo) bool { } // listPlugins executes command, queries it for registered plugins, and returns the list of PluginIdentifiers. -func (r *registry) listPlugins(command string) ([]PluginIdentifier, error) { +func (r *registry) listPlugins(command string) ([]framework.PluginIdentifier, error) { process, err := r.processFactory.newProcess(command, r.logger, r.logLevel) if err != nil { return nil, err } defer process.kill() - plugin, err := process.dispense(kindAndName{kind: PluginKindPluginLister}) + plugin, err := process.dispense(kindAndName{kind: framework.PluginKindPluginLister}) if err != nil { return nil, err } - lister, ok := plugin.(PluginLister) + lister, ok := plugin.(framework.PluginLister) if !ok { return nil, errors.Errorf("%T is not a PluginLister", plugin) } @@ -194,7 +195,7 @@ func (r *registry) listPlugins(command string) ([]PluginIdentifier, error) { } // register registers a PluginIdentifier with the registry. -func (r *registry) register(id PluginIdentifier) error { +func (r *registry) register(id framework.PluginIdentifier) error { key := kindAndName{kind: id.Kind, name: id.Name} if existing, found := r.pluginsByID[key]; found { return newDuplicatePluginRegistrationError(existing, id) @@ -208,12 +209,12 @@ func (r *registry) register(id PluginIdentifier) error { // pluginNotFoundError indicates a plugin could not be located for kind and name. type pluginNotFoundError struct { - kind PluginKind + kind framework.PluginKind name string } // newPluginNotFoundError returns a new pluginNotFoundError for kind and name. -func newPluginNotFoundError(kind PluginKind, name string) *pluginNotFoundError { +func newPluginNotFoundError(kind framework.PluginKind, name string) *pluginNotFoundError { return &pluginNotFoundError{ kind: kind, name: name, @@ -225,11 +226,11 @@ func (e *pluginNotFoundError) Error() string { } type duplicatePluginRegistrationError struct { - existing PluginIdentifier - duplicate PluginIdentifier + existing framework.PluginIdentifier + duplicate framework.PluginIdentifier } -func newDuplicatePluginRegistrationError(existing, duplicate PluginIdentifier) *duplicatePluginRegistrationError { +func newDuplicatePluginRegistrationError(existing, duplicate framework.PluginIdentifier) *duplicatePluginRegistrationError { return &duplicatePluginRegistrationError{ existing: existing, duplicate: duplicate, diff --git a/pkg/plugin/restartable_backup_item_action.go b/pkg/plugin/restartable_backup_item_action.go index 5a96443d8..3254bb60d 100644 --- a/pkg/plugin/restartable_backup_item_action.go +++ b/pkg/plugin/restartable_backup_item_action.go @@ -20,6 +20,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" api "github.com/heptio/velero/pkg/apis/velero/v1" + "github.com/heptio/velero/pkg/plugin/framework" "github.com/heptio/velero/pkg/plugin/velero" ) @@ -35,7 +36,7 @@ type restartableBackupItemAction struct { // newRestartableBackupItemAction returns a new restartableBackupItemAction. func newRestartableBackupItemAction(name string, sharedPluginProcess RestartableProcess) *restartableBackupItemAction { r := &restartableBackupItemAction{ - key: kindAndName{kind: PluginKindBackupItemAction, name: name}, + key: kindAndName{kind: framework.PluginKindBackupItemAction, name: name}, sharedPluginProcess: sharedPluginProcess, } return r diff --git a/pkg/plugin/restartable_backup_item_action_test.go b/pkg/plugin/restartable_backup_item_action_test.go index fc8066933..dc5c14632 100644 --- a/pkg/plugin/restartable_backup_item_action_test.go +++ b/pkg/plugin/restartable_backup_item_action_test.go @@ -27,6 +27,7 @@ import ( v1 "github.com/heptio/velero/pkg/apis/velero/v1" "github.com/heptio/velero/pkg/backup/mocks" + "github.com/heptio/velero/pkg/plugin/framework" "github.com/heptio/velero/pkg/plugin/velero" ) @@ -59,7 +60,7 @@ func TestRestartableGetBackupItemAction(t *testing.T) { defer p.AssertExpectations(t) name := "pod" - key := kindAndName{kind: PluginKindBackupItemAction, name: name} + key := kindAndName{kind: framework.PluginKindBackupItemAction, name: name} p.On("getByKindAndName", key).Return(tc.plugin, tc.getError) r := newRestartableBackupItemAction(name, p) @@ -90,7 +91,7 @@ func TestRestartableBackupItemActionGetDelegate(t *testing.T) { // Happy path p.On("resetIfNeeded").Return(nil) expected := new(mocks.ItemAction) - key := kindAndName{kind: PluginKindBackupItemAction, name: name} + key := kindAndName{kind: framework.PluginKindBackupItemAction, name: name} p.On("getByKindAndName", key).Return(expected, nil) a, err = r.getDelegate() @@ -121,7 +122,7 @@ func TestRestartableBackupItemActionDelegatedFunctions(t *testing.T) { runRestartableDelegateTests( t, - PluginKindBackupItemAction, + framework.PluginKindBackupItemAction, func(key kindAndName, p RestartableProcess) interface{} { return &restartableBackupItemAction{ key: key, diff --git a/pkg/plugin/restartable_block_store.go b/pkg/plugin/restartable_block_store.go index 9f7c38b36..8c2122c19 100644 --- a/pkg/plugin/restartable_block_store.go +++ b/pkg/plugin/restartable_block_store.go @@ -19,6 +19,7 @@ import ( "github.com/pkg/errors" "k8s.io/apimachinery/pkg/runtime" + "github.com/heptio/velero/pkg/plugin/framework" "github.com/heptio/velero/pkg/plugin/velero" ) @@ -34,7 +35,7 @@ type restartableBlockStore struct { // newRestartableBlockStore returns a new restartableBlockStore. func newRestartableBlockStore(name string, sharedPluginProcess RestartableProcess) *restartableBlockStore { - key := kindAndName{kind: PluginKindBlockStore, name: name} + key := kindAndName{kind: framework.PluginKindBlockStore, name: name} r := &restartableBlockStore{ key: key, sharedPluginProcess: sharedPluginProcess, diff --git a/pkg/plugin/restartable_block_store_test.go b/pkg/plugin/restartable_block_store_test.go index ba5e7fc11..c79c96c2c 100644 --- a/pkg/plugin/restartable_block_store_test.go +++ b/pkg/plugin/restartable_block_store_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "github.com/heptio/velero/pkg/cloudprovider/mocks" + "github.com/heptio/velero/pkg/plugin/framework" ) func TestRestartableGetBlockStore(t *testing.T) { @@ -58,7 +59,7 @@ func TestRestartableGetBlockStore(t *testing.T) { defer p.AssertExpectations(t) name := "aws" - key := kindAndName{kind: PluginKindBlockStore, name: name} + key := kindAndName{kind: framework.PluginKindBlockStore, name: name} p.On("getByKindAndName", key).Return(tc.plugin, tc.getError) r := &restartableBlockStore{ @@ -83,7 +84,7 @@ func TestRestartableBlockStoreReinitialize(t *testing.T) { defer p.AssertExpectations(t) name := "aws" - key := kindAndName{kind: PluginKindBlockStore, name: name} + key := kindAndName{kind: framework.PluginKindBlockStore, name: name} r := &restartableBlockStore{ key: key, sharedPluginProcess: p, @@ -116,7 +117,7 @@ func TestRestartableBlockStoreGetDelegate(t *testing.T) { // Reset error p.On("resetIfNeeded").Return(errors.Errorf("reset error")).Once() name := "aws" - key := kindAndName{kind: PluginKindBlockStore, name: name} + key := kindAndName{kind: framework.PluginKindBlockStore, name: name} r := &restartableBlockStore{ key: key, sharedPluginProcess: p, @@ -144,7 +145,7 @@ func TestRestartableBlockStoreInit(t *testing.T) { // getBlockStore error name := "aws" - key := kindAndName{kind: PluginKindBlockStore, name: name} + key := kindAndName{kind: framework.PluginKindBlockStore, name: name} r := &restartableBlockStore{ key: key, sharedPluginProcess: p, @@ -196,7 +197,7 @@ func TestRestartableBlockStoreDelegatedFunctions(t *testing.T) { runRestartableDelegateTests( t, - PluginKindBlockStore, + framework.PluginKindBlockStore, func(key kindAndName, p RestartableProcess) interface{} { return &restartableBlockStore{ key: key, diff --git a/pkg/plugin/restartable_delegate_test.go b/pkg/plugin/restartable_delegate_test.go index 451d877b0..7e45a9af6 100644 --- a/pkg/plugin/restartable_delegate_test.go +++ b/pkg/plugin/restartable_delegate_test.go @@ -23,6 +23,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/heptio/velero/pkg/plugin/framework" ) type restartableDelegateTest struct { @@ -40,7 +42,7 @@ type mockable interface { func runRestartableDelegateTests( t *testing.T, - kind PluginKind, + kind framework.PluginKind, newRestartable func(key kindAndName, p RestartableProcess) interface{}, newMock func() mockable, tests ...restartableDelegateTest, diff --git a/pkg/plugin/restartable_object_store.go b/pkg/plugin/restartable_object_store.go index 687a99ddd..6b656f3a0 100644 --- a/pkg/plugin/restartable_object_store.go +++ b/pkg/plugin/restartable_object_store.go @@ -21,6 +21,7 @@ import ( "github.com/pkg/errors" + "github.com/heptio/velero/pkg/plugin/framework" "github.com/heptio/velero/pkg/plugin/velero" ) @@ -38,7 +39,7 @@ type restartableObjectStore struct { // newRestartableObjectStore returns a new restartableObjectStore. func newRestartableObjectStore(name string, sharedPluginProcess RestartableProcess) *restartableObjectStore { - key := kindAndName{kind: PluginKindObjectStore, name: name} + key := kindAndName{kind: framework.PluginKindObjectStore, name: name} r := &restartableObjectStore{ key: key, sharedPluginProcess: sharedPluginProcess, diff --git a/pkg/plugin/restartable_object_store_test.go b/pkg/plugin/restartable_object_store_test.go index 3b17945fb..855d3ef82 100644 --- a/pkg/plugin/restartable_object_store_test.go +++ b/pkg/plugin/restartable_object_store_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" cloudprovidermocks "github.com/heptio/velero/pkg/cloudprovider/mocks" + "github.com/heptio/velero/pkg/plugin/framework" ) func TestRestartableGetObjectStore(t *testing.T) { @@ -59,7 +60,7 @@ func TestRestartableGetObjectStore(t *testing.T) { defer p.AssertExpectations(t) name := "aws" - key := kindAndName{kind: PluginKindObjectStore, name: name} + key := kindAndName{kind: framework.PluginKindObjectStore, name: name} p.On("getByKindAndName", key).Return(tc.plugin, tc.getError) r := &restartableObjectStore{ @@ -84,7 +85,7 @@ func TestRestartableObjectStoreReinitialize(t *testing.T) { defer p.AssertExpectations(t) name := "aws" - key := kindAndName{kind: PluginKindObjectStore, name: name} + key := kindAndName{kind: framework.PluginKindObjectStore, name: name} r := &restartableObjectStore{ key: key, sharedPluginProcess: p, @@ -117,7 +118,7 @@ func TestRestartableObjectStoreGetDelegate(t *testing.T) { // Reset error p.On("resetIfNeeded").Return(errors.Errorf("reset error")).Once() name := "aws" - key := kindAndName{kind: PluginKindObjectStore, name: name} + key := kindAndName{kind: framework.PluginKindObjectStore, name: name} r := &restartableObjectStore{ key: key, sharedPluginProcess: p, @@ -145,7 +146,7 @@ func TestRestartableObjectStoreInit(t *testing.T) { // getObjectStore error name := "aws" - key := kindAndName{kind: PluginKindObjectStore, name: name} + key := kindAndName{kind: framework.PluginKindObjectStore, name: name} r := &restartableObjectStore{ key: key, sharedPluginProcess: p, @@ -185,7 +186,7 @@ func TestRestartableObjectStoreInit(t *testing.T) { func TestRestartableObjectStoreDelegatedFunctions(t *testing.T) { runRestartableDelegateTests( t, - PluginKindObjectStore, + framework.PluginKindObjectStore, func(key kindAndName, p RestartableProcess) interface{} { return &restartableObjectStore{ key: key, diff --git a/pkg/plugin/restartable_restore_item_action.go b/pkg/plugin/restartable_restore_item_action.go index 1edbd7994..6624b5475 100644 --- a/pkg/plugin/restartable_restore_item_action.go +++ b/pkg/plugin/restartable_restore_item_action.go @@ -18,6 +18,7 @@ package plugin import ( "github.com/pkg/errors" + "github.com/heptio/velero/pkg/plugin/framework" "github.com/heptio/velero/pkg/plugin/velero" ) @@ -34,7 +35,7 @@ type restartableRestoreItemAction struct { // newRestartableRestoreItemAction returns a new restartableRestoreItemAction. func newRestartableRestoreItemAction(name string, sharedPluginProcess RestartableProcess) *restartableRestoreItemAction { r := &restartableRestoreItemAction{ - key: kindAndName{kind: PluginKindRestoreItemAction, name: name}, + key: kindAndName{kind: framework.PluginKindRestoreItemAction, name: name}, sharedPluginProcess: sharedPluginProcess, } return r diff --git a/pkg/plugin/restartable_restore_item_action_test.go b/pkg/plugin/restartable_restore_item_action_test.go index d1382f2df..53d63a610 100644 --- a/pkg/plugin/restartable_restore_item_action_test.go +++ b/pkg/plugin/restartable_restore_item_action_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" v1 "github.com/heptio/velero/pkg/apis/velero/v1" + "github.com/heptio/velero/pkg/plugin/framework" "github.com/heptio/velero/pkg/plugin/velero" "github.com/heptio/velero/pkg/restore/mocks" ) @@ -58,7 +59,7 @@ func TestRestartableGetRestoreItemAction(t *testing.T) { defer p.AssertExpectations(t) name := "pod" - key := kindAndName{kind: PluginKindRestoreItemAction, name: name} + key := kindAndName{kind: framework.PluginKindRestoreItemAction, name: name} p.On("getByKindAndName", key).Return(tc.plugin, tc.getError) r := newRestartableRestoreItemAction(name, p) @@ -89,7 +90,7 @@ func TestRestartableRestoreItemActionGetDelegate(t *testing.T) { // Happy path p.On("resetIfNeeded").Return(nil) expected := new(mocks.ItemAction) - key := kindAndName{kind: PluginKindRestoreItemAction, name: name} + key := kindAndName{kind: framework.PluginKindRestoreItemAction, name: name} p.On("getByKindAndName", key).Return(expected, nil) a, err = r.getDelegate() @@ -121,7 +122,7 @@ func TestRestartableRestoreItemActionDelegatedFunctions(t *testing.T) { runRestartableDelegateTests( t, - PluginKindRestoreItemAction, + framework.PluginKindRestoreItemAction, func(key kindAndName, p RestartableProcess) interface{} { return &restartableRestoreItemAction{ key: key, diff --git a/pkg/plugin/restore_item_action.go b/pkg/plugin/restore_item_action.go deleted file mode 100644 index 424fa4e0e..000000000 --- a/pkg/plugin/restore_item_action.go +++ /dev/null @@ -1,241 +0,0 @@ -/* -Copyright 2017, 2019 the Velero contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package plugin - -import ( - "encoding/json" - - plugin "github.com/hashicorp/go-plugin" - "github.com/pkg/errors" - "golang.org/x/net/context" - "google.golang.org/grpc" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - api "github.com/heptio/velero/pkg/apis/velero/v1" - proto "github.com/heptio/velero/pkg/plugin/generated" - "github.com/heptio/velero/pkg/plugin/velero" -) - -// RestoreItemActionPlugin is an implementation of go-plugin's Plugin -// interface with support for gRPC for the restore/ItemAction -// interface. -type RestoreItemActionPlugin struct { - plugin.NetRPCUnsupportedPlugin - *pluginBase -} - -var _ velero.RestoreItemAction = &RestoreItemActionGRPCClient{} - -// NewRestoreItemActionPlugin constructs a RestoreItemActionPlugin. -func NewRestoreItemActionPlugin(options ...pluginOption) *RestoreItemActionPlugin { - return &RestoreItemActionPlugin{ - pluginBase: newPluginBase(options...), - } -} - -////////////////////////////////////////////////////////////////////////////// -// client code -////////////////////////////////////////////////////////////////////////////// - -// GRPCClient returns a RestoreItemAction gRPC client. -func (p *RestoreItemActionPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { - return newClientDispenser(p.clientLogger, c, newRestoreItemActionGRPCClient), nil -} - -// RestoreItemActionGRPCClient implements the backup/ItemAction interface and uses a -// gRPC client to make calls to the plugin server. -type RestoreItemActionGRPCClient struct { - *clientBase - grpcClient proto.RestoreItemActionClient -} - -func newRestoreItemActionGRPCClient(base *clientBase, clientConn *grpc.ClientConn) interface{} { - return &RestoreItemActionGRPCClient{ - clientBase: base, - grpcClient: proto.NewRestoreItemActionClient(clientConn), - } -} - -func (c *RestoreItemActionGRPCClient) AppliesTo() (velero.ResourceSelector, error) { - res, err := c.grpcClient.AppliesTo(context.Background(), &proto.AppliesToRequest{Plugin: c.plugin}) - if err != nil { - return velero.ResourceSelector{}, err - } - - return velero.ResourceSelector{ - IncludedNamespaces: res.IncludedNamespaces, - ExcludedNamespaces: res.ExcludedNamespaces, - IncludedResources: res.IncludedResources, - ExcludedResources: res.ExcludedResources, - LabelSelector: res.Selector, - }, nil -} - -func (c *RestoreItemActionGRPCClient) Execute(input *velero.RestoreItemActionExecuteInput) (*velero.RestoreItemActionExecuteOutput, error) { - itemJSON, err := json.Marshal(input.Item.UnstructuredContent()) - if err != nil { - return nil, err - } - - itemFromBackupJSON, err := json.Marshal(input.ItemFromBackup.UnstructuredContent()) - if err != nil { - return nil, err - } - - restoreJSON, err := json.Marshal(input.Restore) - if err != nil { - return nil, err - } - - req := &proto.RestoreExecuteRequest{ - Plugin: c.plugin, - Item: itemJSON, - ItemFromBackup: itemFromBackupJSON, - Restore: restoreJSON, - } - - res, err := c.grpcClient.Execute(context.Background(), req) - if err != nil { - return nil, err - } - - var updatedItem unstructured.Unstructured - if err := json.Unmarshal(res.Item, &updatedItem); err != nil { - return nil, err - } - - var warning error - if res.Warning != "" { - warning = errors.New(res.Warning) - } - - return &velero.RestoreItemActionExecuteOutput{ - UpdatedItem: &updatedItem, - Warning: warning, - }, nil -} - -////////////////////////////////////////////////////////////////////////////// -// server code -////////////////////////////////////////////////////////////////////////////// - -// GRPCServer registers a RestoreItemAction gRPC server. -func (p *RestoreItemActionPlugin) GRPCServer(s *grpc.Server) error { - proto.RegisterRestoreItemActionServer(s, &RestoreItemActionGRPCServer{mux: p.serverMux}) - return nil -} - -// RestoreItemActionGRPCServer implements the proto-generated RestoreItemActionServer interface, and accepts -// gRPC calls and forwards them to an implementation of the pluggable interface. -type RestoreItemActionGRPCServer struct { - mux *serverMux -} - -func (s *RestoreItemActionGRPCServer) getImpl(name string) (velero.RestoreItemAction, error) { - impl, err := s.mux.getHandler(name) - if err != nil { - return nil, err - } - - itemAction, ok := impl.(velero.RestoreItemAction) - if !ok { - return nil, errors.Errorf("%T is not a restore item action", impl) - } - - return itemAction, nil -} - -func (s *RestoreItemActionGRPCServer) AppliesTo(ctx context.Context, req *proto.AppliesToRequest) (response *proto.AppliesToResponse, err error) { - defer func() { - if recoveredErr := handlePanic(recover()); recoveredErr != nil { - err = recoveredErr - } - }() - - impl, err := s.getImpl(req.Plugin) - if err != nil { - return nil, err - } - - appliesTo, err := impl.AppliesTo() - if err != nil { - return nil, err - } - - return &proto.AppliesToResponse{ - IncludedNamespaces: appliesTo.IncludedNamespaces, - ExcludedNamespaces: appliesTo.ExcludedNamespaces, - IncludedResources: appliesTo.IncludedResources, - ExcludedResources: appliesTo.ExcludedResources, - Selector: appliesTo.LabelSelector, - }, nil -} - -func (s *RestoreItemActionGRPCServer) Execute(ctx context.Context, req *proto.RestoreExecuteRequest) (response *proto.RestoreExecuteResponse, err error) { - defer func() { - if recoveredErr := handlePanic(recover()); recoveredErr != nil { - err = recoveredErr - } - }() - - impl, err := s.getImpl(req.Plugin) - if err != nil { - return nil, err - } - - var ( - item unstructured.Unstructured - itemFromBackup unstructured.Unstructured - restoreObj api.Restore - ) - - if err := json.Unmarshal(req.Item, &item); err != nil { - return nil, err - } - - if err := json.Unmarshal(req.ItemFromBackup, &itemFromBackup); err != nil { - return nil, err - } - - if err := json.Unmarshal(req.Restore, &restoreObj); err != nil { - return nil, err - } - - executeOutput, err := impl.Execute(&velero.RestoreItemActionExecuteInput{ - Item: &item, - ItemFromBackup: &itemFromBackup, - Restore: &restoreObj, - }) - if err != nil { - return nil, err - } - - updatedItem, err := json.Marshal(executeOutput.UpdatedItem) - if err != nil { - return nil, err - } - - var warnMessage string - if executeOutput.Warning != nil { - warnMessage = executeOutput.Warning.Error() - } - - return &proto.RestoreExecuteResponse{ - Item: updatedItem, - Warning: warnMessage, - }, nil -}