migrate backup actions to plugins

Signed-off-by: Steve Kriss <steve@heptio.com>
This commit is contained in:
Steve Kriss
2017-11-14 18:35:02 -08:00
parent 2ce15de2f8
commit 0f2d1ab82b
27 changed files with 1817 additions and 678 deletions

View File

@@ -0,0 +1,190 @@
/*
Copyright 2017 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"encoding/json"
"github.com/hashicorp/go-plugin"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
api "github.com/heptio/ark/pkg/apis/ark/v1"
arkbackup "github.com/heptio/ark/pkg/backup"
proto "github.com/heptio/ark/pkg/plugin/generated"
)
// BackupItemActionPlugin is an implementation of go-plugin's Plugin
// interface with support for gRPC for the backup/ItemAction
// interface.
type BackupItemActionPlugin struct {
plugin.NetRPCUnsupportedPlugin
impl arkbackup.ItemAction
log *logrusAdapter
}
// NewBackupItemActionPlugin constructs a BackupItemActionPlugin.
func NewBackupItemActionPlugin(itemAction arkbackup.ItemAction) *BackupItemActionPlugin {
return &BackupItemActionPlugin{
impl: itemAction,
}
}
// GRPCServer registers a BackupItemAction gRPC server.
func (p *BackupItemActionPlugin) GRPCServer(s *grpc.Server) error {
proto.RegisterBackupItemActionServer(s, &BackupItemActionGRPCServer{impl: p.impl})
return nil
}
// GRPCClient returns a BackupItemAction gRPC client.
func (p *BackupItemActionPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) {
return &BackupItemActionGRPCClient{grpcClient: proto.NewBackupItemActionClient(c), log: p.log}, nil
}
// BackupItemActionGRPCClient implements the backup/ItemAction interface and uses a
// gRPC client to make calls to the plugin server.
type BackupItemActionGRPCClient struct {
grpcClient proto.BackupItemActionClient
log *logrusAdapter
}
func (c *BackupItemActionGRPCClient) AppliesTo() (arkbackup.ResourceSelector, error) {
res, err := c.grpcClient.AppliesTo(context.Background(), &proto.Empty{})
if err != nil {
return arkbackup.ResourceSelector{}, err
}
return arkbackup.ResourceSelector{
IncludedNamespaces: res.IncludedNamespaces,
ExcludedNamespaces: res.ExcludedNamespaces,
IncludedResources: res.IncludedResources,
ExcludedResources: res.ExcludedResources,
LabelSelector: res.Selector,
}, nil
}
func (c *BackupItemActionGRPCClient) Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []arkbackup.ResourceIdentifier, error) {
itemJSON, err := json.Marshal(item.UnstructuredContent())
if err != nil {
return nil, nil, err
}
backupJSON, err := json.Marshal(backup)
if err != nil {
return nil, nil, err
}
req := &proto.ExecuteRequest{
Item: itemJSON,
Backup: backupJSON,
}
res, err := c.grpcClient.Execute(context.Background(), req)
if err != nil {
return nil, nil, err
}
var updatedItem unstructured.Unstructured
if err := json.Unmarshal(res.Item, &updatedItem); err != nil {
return nil, nil, err
}
var additionalItems []arkbackup.ResourceIdentifier
for _, itm := range res.AdditionalItems {
newItem := arkbackup.ResourceIdentifier{
GroupResource: schema.GroupResource{
Group: itm.Group,
Resource: itm.Resource,
},
Namespace: itm.Namespace,
Name: itm.Name,
}
additionalItems = append(additionalItems, newItem)
}
return &updatedItem, additionalItems, nil
}
func (c *BackupItemActionGRPCClient) SetLog(log logrus.FieldLogger) {
c.log.impl = log
}
// BackupItemActionGRPCServer implements the proto-generated BackupItemActionServer interface, and accepts
// gRPC calls and forwards them to an implementation of the pluggable interface.
type BackupItemActionGRPCServer struct {
impl arkbackup.ItemAction
}
func (s *BackupItemActionGRPCServer) AppliesTo(ctx context.Context, req *proto.Empty) (*proto.AppliesToResponse, error) {
resourceSelector, err := s.impl.AppliesTo()
if err != nil {
return nil, err
}
return &proto.AppliesToResponse{
IncludedNamespaces: resourceSelector.IncludedNamespaces,
ExcludedNamespaces: resourceSelector.ExcludedNamespaces,
IncludedResources: resourceSelector.IncludedResources,
ExcludedResources: resourceSelector.ExcludedResources,
Selector: resourceSelector.LabelSelector,
}, nil
}
func (s *BackupItemActionGRPCServer) Execute(ctx context.Context, req *proto.ExecuteRequest) (*proto.ExecuteResponse, error) {
var item unstructured.Unstructured
var backup api.Backup
if err := json.Unmarshal(req.Item, &item); err != nil {
return nil, err
}
if err := json.Unmarshal(req.Backup, &backup); err != nil {
return nil, err
}
updatedItem, additionalItems, err := s.impl.Execute(&item, &backup)
if err != nil {
return nil, err
}
updatedItemJSON, err := json.Marshal(updatedItem.UnstructuredContent())
if err != nil {
return nil, err
}
res := &proto.ExecuteResponse{
Item: updatedItemJSON,
}
for _, itm := range additionalItems {
val := proto.ResourceIdentifier{
Group: itm.Group,
Resource: itm.Resource,
Namespace: itm.Namespace,
Name: itm.Name,
}
res.AdditionalItems = append(res.AdditionalItems, &val)
}
return res, nil
}

View File

@@ -0,0 +1,43 @@
package plugin
import (
"os/exec"
"github.com/hashicorp/go-hclog"
hcplugin "github.com/hashicorp/go-plugin"
)
type clientBuilder struct {
config *hcplugin.ClientConfig
}
func newClientBuilder(baseConfig *hcplugin.ClientConfig) *clientBuilder {
return &clientBuilder{
config: baseConfig,
}
}
func (b *clientBuilder) withPlugin(kind PluginKind, plugin hcplugin.Plugin) *clientBuilder {
if b.config.Plugins == nil {
b.config.Plugins = make(map[string]hcplugin.Plugin)
}
b.config.Plugins[string(kind)] = plugin
return b
}
func (b *clientBuilder) withLogger(logger hclog.Logger) *clientBuilder {
b.config.Logger = logger
return b
}
func (b *clientBuilder) withCommand(name string, args ...string) *clientBuilder {
b.config.Cmd = exec.Command(name, args...)
return b
}
func (b *clientBuilder) client() *hcplugin.Client {
return hcplugin.NewClient(b.config)
}

105
pkg/plugin/client_store.go Normal file
View File

@@ -0,0 +1,105 @@
package plugin
import (
"sync"
plugin "github.com/hashicorp/go-plugin"
"github.com/pkg/errors"
)
// clientKey is a unique ID for a plugin client.
type clientKey struct {
kind PluginKind
// scope is an additional identifier that allows multiple clients
// for the same kind/name to be differentiated. It will typically
// be the name of the applicable backup/restore for ItemAction
// clients, and blank for Object/BlockStore clients.
scope string
}
func newClientStore() *clientStore {
return &clientStore{
clients: make(map[clientKey]map[string]*plugin.Client),
lock: &sync.RWMutex{},
}
}
// clientStore is a repository of active plugin clients.
type clientStore struct {
// clients is a nested map, keyed first by clientKey (a
// combo of kind and "scope"), and second by plugin name.
// This enables easy listing of all clients for a given
// kind and scope (e.g. all BackupItemActions for a given
// backup), and efficient lookup by kind+name+scope (e.g.
// the AWS ObjectStore.)
clients map[clientKey]map[string]*plugin.Client
lock *sync.RWMutex
}
// get returns a plugin client for the given kind/name/scope, or an error if none
// is found.
func (s *clientStore) get(kind PluginKind, name, scope string) (*plugin.Client, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if forScope, found := s.clients[clientKey{kind, scope}]; found {
if client, found := forScope[name]; found {
return client, nil
}
}
return nil, errors.New("client not found")
}
// list returns all plugin clients for the given kind/scope, or an
// error if none are found.
func (s *clientStore) list(kind PluginKind, scope string) ([]*plugin.Client, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if forScope, found := s.clients[clientKey{kind, scope}]; found {
var clients []*plugin.Client
for _, client := range forScope {
clients = append(clients, client)
}
return clients, nil
}
return nil, errors.New("clients not found")
}
// add stores a plugin client for the given kind/name/scope.
func (s *clientStore) add(client *plugin.Client, kind PluginKind, name, scope string) {
s.lock.Lock()
defer s.lock.Unlock()
key := clientKey{kind, scope}
if _, found := s.clients[key]; !found {
s.clients[key] = make(map[string]*plugin.Client)
}
s.clients[key][name] = client
}
// delete removes the client with the given kind/name/scope from the store.
func (s *clientStore) delete(kind PluginKind, name, scope string) {
s.lock.Lock()
defer s.lock.Unlock()
if forScope, found := s.clients[clientKey{kind, scope}]; found {
delete(forScope, name)
}
}
// deleteAll removes all clients with the given kind/scope from
// the store.
func (s *clientStore) deleteAll(kind PluginKind, scope string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.clients, clientKey{kind, scope})
}

View File

@@ -0,0 +1,339 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: BackupItemAction.proto
/*
Package generated is a generated protocol buffer package.
It is generated from these files:
BackupItemAction.proto
BlockStore.proto
ObjectStore.proto
Shared.proto
It has these top-level messages:
AppliesToResponse
ExecuteRequest
ExecuteResponse
ResourceIdentifier
CreateVolumeRequest
CreateVolumeResponse
GetVolumeInfoRequest
GetVolumeInfoResponse
IsVolumeReadyRequest
IsVolumeReadyResponse
ListSnapshotsRequest
ListSnapshotsResponse
CreateSnapshotRequest
CreateSnapshotResponse
DeleteSnapshotRequest
PutObjectRequest
GetObjectRequest
Bytes
ListCommonPrefixesRequest
ListCommonPrefixesResponse
ListObjectsRequest
ListObjectsResponse
DeleteObjectRequest
CreateSignedURLRequest
CreateSignedURLResponse
Empty
InitRequest
*/
package generated
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type AppliesToResponse struct {
IncludedNamespaces []string `protobuf:"bytes,1,rep,name=includedNamespaces" json:"includedNamespaces,omitempty"`
ExcludedNamespaces []string `protobuf:"bytes,2,rep,name=excludedNamespaces" json:"excludedNamespaces,omitempty"`
IncludedResources []string `protobuf:"bytes,3,rep,name=includedResources" json:"includedResources,omitempty"`
ExcludedResources []string `protobuf:"bytes,4,rep,name=excludedResources" json:"excludedResources,omitempty"`
Selector string `protobuf:"bytes,5,opt,name=selector" json:"selector,omitempty"`
}
func (m *AppliesToResponse) Reset() { *m = AppliesToResponse{} }
func (m *AppliesToResponse) String() string { return proto.CompactTextString(m) }
func (*AppliesToResponse) ProtoMessage() {}
func (*AppliesToResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *AppliesToResponse) GetIncludedNamespaces() []string {
if m != nil {
return m.IncludedNamespaces
}
return nil
}
func (m *AppliesToResponse) GetExcludedNamespaces() []string {
if m != nil {
return m.ExcludedNamespaces
}
return nil
}
func (m *AppliesToResponse) GetIncludedResources() []string {
if m != nil {
return m.IncludedResources
}
return nil
}
func (m *AppliesToResponse) GetExcludedResources() []string {
if m != nil {
return m.ExcludedResources
}
return nil
}
func (m *AppliesToResponse) GetSelector() string {
if m != nil {
return m.Selector
}
return ""
}
type ExecuteRequest struct {
Item []byte `protobuf:"bytes,1,opt,name=item,proto3" json:"item,omitempty"`
Backup []byte `protobuf:"bytes,2,opt,name=backup,proto3" json:"backup,omitempty"`
}
func (m *ExecuteRequest) Reset() { *m = ExecuteRequest{} }
func (m *ExecuteRequest) String() string { return proto.CompactTextString(m) }
func (*ExecuteRequest) ProtoMessage() {}
func (*ExecuteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *ExecuteRequest) GetItem() []byte {
if m != nil {
return m.Item
}
return nil
}
func (m *ExecuteRequest) GetBackup() []byte {
if m != nil {
return m.Backup
}
return nil
}
type ExecuteResponse struct {
Item []byte `protobuf:"bytes,1,opt,name=item,proto3" json:"item,omitempty"`
AdditionalItems []*ResourceIdentifier `protobuf:"bytes,2,rep,name=additionalItems" json:"additionalItems,omitempty"`
}
func (m *ExecuteResponse) Reset() { *m = ExecuteResponse{} }
func (m *ExecuteResponse) String() string { return proto.CompactTextString(m) }
func (*ExecuteResponse) ProtoMessage() {}
func (*ExecuteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *ExecuteResponse) GetItem() []byte {
if m != nil {
return m.Item
}
return nil
}
func (m *ExecuteResponse) GetAdditionalItems() []*ResourceIdentifier {
if m != nil {
return m.AdditionalItems
}
return nil
}
type ResourceIdentifier struct {
Group string `protobuf:"bytes,1,opt,name=group" json:"group,omitempty"`
Resource string `protobuf:"bytes,2,opt,name=resource" json:"resource,omitempty"`
Namespace string `protobuf:"bytes,3,opt,name=namespace" json:"namespace,omitempty"`
Name string `protobuf:"bytes,4,opt,name=name" json:"name,omitempty"`
}
func (m *ResourceIdentifier) Reset() { *m = ResourceIdentifier{} }
func (m *ResourceIdentifier) String() string { return proto.CompactTextString(m) }
func (*ResourceIdentifier) ProtoMessage() {}
func (*ResourceIdentifier) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *ResourceIdentifier) GetGroup() string {
if m != nil {
return m.Group
}
return ""
}
func (m *ResourceIdentifier) GetResource() string {
if m != nil {
return m.Resource
}
return ""
}
func (m *ResourceIdentifier) GetNamespace() string {
if m != nil {
return m.Namespace
}
return ""
}
func (m *ResourceIdentifier) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func init() {
proto.RegisterType((*AppliesToResponse)(nil), "generated.AppliesToResponse")
proto.RegisterType((*ExecuteRequest)(nil), "generated.ExecuteRequest")
proto.RegisterType((*ExecuteResponse)(nil), "generated.ExecuteResponse")
proto.RegisterType((*ResourceIdentifier)(nil), "generated.ResourceIdentifier")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for BackupItemAction service
type BackupItemActionClient interface {
AppliesTo(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*AppliesToResponse, error)
Execute(ctx context.Context, in *ExecuteRequest, opts ...grpc.CallOption) (*ExecuteResponse, error)
}
type backupItemActionClient struct {
cc *grpc.ClientConn
}
func NewBackupItemActionClient(cc *grpc.ClientConn) BackupItemActionClient {
return &backupItemActionClient{cc}
}
func (c *backupItemActionClient) AppliesTo(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*AppliesToResponse, error) {
out := new(AppliesToResponse)
err := grpc.Invoke(ctx, "/generated.BackupItemAction/AppliesTo", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *backupItemActionClient) Execute(ctx context.Context, in *ExecuteRequest, opts ...grpc.CallOption) (*ExecuteResponse, error) {
out := new(ExecuteResponse)
err := grpc.Invoke(ctx, "/generated.BackupItemAction/Execute", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for BackupItemAction service
type BackupItemActionServer interface {
AppliesTo(context.Context, *Empty) (*AppliesToResponse, error)
Execute(context.Context, *ExecuteRequest) (*ExecuteResponse, error)
}
func RegisterBackupItemActionServer(s *grpc.Server, srv BackupItemActionServer) {
s.RegisterService(&_BackupItemAction_serviceDesc, srv)
}
func _BackupItemAction_AppliesTo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BackupItemActionServer).AppliesTo(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/generated.BackupItemAction/AppliesTo",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BackupItemActionServer).AppliesTo(ctx, req.(*Empty))
}
return interceptor(ctx, in, info, handler)
}
func _BackupItemAction_Execute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ExecuteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BackupItemActionServer).Execute(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/generated.BackupItemAction/Execute",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BackupItemActionServer).Execute(ctx, req.(*ExecuteRequest))
}
return interceptor(ctx, in, info, handler)
}
var _BackupItemAction_serviceDesc = grpc.ServiceDesc{
ServiceName: "generated.BackupItemAction",
HandlerType: (*BackupItemActionServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "AppliesTo",
Handler: _BackupItemAction_AppliesTo_Handler,
},
{
MethodName: "Execute",
Handler: _BackupItemAction_Execute_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "BackupItemAction.proto",
}
func init() { proto.RegisterFile("BackupItemAction.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 366 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0xcb, 0x4e, 0xeb, 0x30,
0x10, 0x86, 0x95, 0xde, 0xce, 0xc9, 0x9c, 0xea, 0xb4, 0xb5, 0x50, 0x15, 0xa2, 0x22, 0x55, 0x59,
0x75, 0x81, 0xb2, 0x28, 0x4b, 0x58, 0x50, 0xa4, 0x0a, 0x75, 0xc3, 0xc2, 0xf0, 0x02, 0x69, 0x32,
0x94, 0x88, 0xc4, 0x36, 0xb6, 0x23, 0x95, 0xc7, 0xe0, 0x39, 0x79, 0x09, 0x64, 0xe7, 0xd2, 0xd2,
0x74, 0x97, 0x99, 0xff, 0x9b, 0x89, 0xe7, 0x9f, 0x81, 0xe9, 0x43, 0x14, 0xbf, 0x17, 0x62, 0xa3,
0x31, 0x5f, 0xc5, 0x3a, 0xe5, 0x2c, 0x14, 0x92, 0x6b, 0x4e, 0xdc, 0x1d, 0x32, 0x94, 0x91, 0xc6,
0xc4, 0x1f, 0x3e, 0xbf, 0x45, 0x12, 0x93, 0x52, 0x08, 0xbe, 0x1d, 0x98, 0xac, 0x84, 0xc8, 0x52,
0x54, 0x2f, 0x9c, 0xa2, 0x12, 0x9c, 0x29, 0x24, 0x21, 0x90, 0x94, 0xc5, 0x59, 0x91, 0x60, 0xf2,
0x14, 0xe5, 0xa8, 0x44, 0x14, 0xa3, 0xf2, 0x9c, 0x79, 0x77, 0xe1, 0xd2, 0x33, 0x8a, 0xe1, 0x71,
0xdf, 0xe2, 0x3b, 0x25, 0xdf, 0x56, 0xc8, 0x35, 0x4c, 0xea, 0x2e, 0x14, 0x15, 0x2f, 0xa4, 0xc1,
0xbb, 0x16, 0x6f, 0x0b, 0x86, 0xae, 0x7b, 0x1c, 0xe8, 0x5e, 0x49, 0xb7, 0x04, 0xe2, 0xc3, 0x5f,
0x85, 0x19, 0xc6, 0x9a, 0x4b, 0xaf, 0x3f, 0x77, 0x16, 0x2e, 0x6d, 0xe2, 0xe0, 0x0e, 0xfe, 0xaf,
0xf7, 0x18, 0x17, 0x1a, 0x29, 0x7e, 0x14, 0xa8, 0x34, 0x21, 0xd0, 0x4b, 0x35, 0xe6, 0x9e, 0x33,
0x77, 0x16, 0x43, 0x6a, 0xbf, 0xc9, 0x14, 0x06, 0x5b, 0x6b, 0xa3, 0xd7, 0xb1, 0xd9, 0x2a, 0x0a,
0x18, 0x8c, 0x9a, 0xea, 0xca, 0xa8, 0x73, 0xe5, 0x8f, 0x30, 0x8a, 0x92, 0x24, 0x35, 0xee, 0x47,
0x99, 0xd9, 0x44, 0xe9, 0xc4, 0xbf, 0xe5, 0x55, 0xd8, 0x6c, 0x21, 0xac, 0xdf, 0xbb, 0x49, 0x90,
0xe9, 0xf4, 0x35, 0x45, 0x49, 0x4f, 0xab, 0x82, 0x3d, 0x90, 0x36, 0x46, 0x2e, 0xa0, 0xbf, 0x93,
0xbc, 0x10, 0xf6, 0x9f, 0x2e, 0x2d, 0x03, 0x33, 0xb5, 0xac, 0x58, 0xfb, 0x6a, 0x97, 0x36, 0x31,
0x99, 0x81, 0xcb, 0x6a, 0xef, 0xbd, 0xae, 0x15, 0x0f, 0x09, 0x33, 0x82, 0x09, 0xbc, 0x9e, 0x15,
0xec, 0xf7, 0xf2, 0xcb, 0x81, 0xf1, 0xe9, 0x25, 0x91, 0x5b, 0x70, 0x9b, 0x4b, 0x21, 0xe3, 0xa3,
0x59, 0xd6, 0xb9, 0xd0, 0x9f, 0xfe, 0xec, 0x28, 0xd3, 0xbe, 0xa8, 0x7b, 0xf8, 0x53, 0x79, 0x47,
0x2e, 0x8f, 0x4b, 0x7f, 0x6d, 0xc3, 0xf7, 0xcf, 0x49, 0x65, 0x87, 0xed, 0xc0, 0x1e, 0xec, 0xcd,
0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x20, 0xf8, 0x53, 0xe3, 0x02, 0x00, 0x00,
}

View File

@@ -1,39 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: BlockStore.proto
/*
Package generated is a generated protocol buffer package.
It is generated from these files:
BlockStore.proto
ObjectStore.proto
Shared.proto
It has these top-level messages:
CreateVolumeRequest
CreateVolumeResponse
GetVolumeInfoRequest
GetVolumeInfoResponse
IsVolumeReadyRequest
IsVolumeReadyResponse
ListSnapshotsRequest
ListSnapshotsResponse
CreateSnapshotRequest
CreateSnapshotResponse
DeleteSnapshotRequest
PutObjectRequest
GetObjectRequest
Bytes
ListCommonPrefixesRequest
ListCommonPrefixesResponse
ListObjectsRequest
ListObjectsResponse
DeleteObjectRequest
CreateSignedURLRequest
CreateSignedURLResponse
Empty
InitRequest
*/
package generated
import proto "github.com/golang/protobuf/proto"
@@ -50,12 +17,6 @@ var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type CreateVolumeRequest struct {
SnapshotID string `protobuf:"bytes,1,opt,name=snapshotID" json:"snapshotID,omitempty"`
VolumeType string `protobuf:"bytes,2,opt,name=volumeType" json:"volumeType,omitempty"`
@@ -66,7 +27,7 @@ type CreateVolumeRequest struct {
func (m *CreateVolumeRequest) Reset() { *m = CreateVolumeRequest{} }
func (m *CreateVolumeRequest) String() string { return proto.CompactTextString(m) }
func (*CreateVolumeRequest) ProtoMessage() {}
func (*CreateVolumeRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (*CreateVolumeRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} }
func (m *CreateVolumeRequest) GetSnapshotID() string {
if m != nil {
@@ -103,7 +64,7 @@ type CreateVolumeResponse struct {
func (m *CreateVolumeResponse) Reset() { *m = CreateVolumeResponse{} }
func (m *CreateVolumeResponse) String() string { return proto.CompactTextString(m) }
func (*CreateVolumeResponse) ProtoMessage() {}
func (*CreateVolumeResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (*CreateVolumeResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{1} }
func (m *CreateVolumeResponse) GetVolumeID() string {
if m != nil {
@@ -120,7 +81,7 @@ type GetVolumeInfoRequest struct {
func (m *GetVolumeInfoRequest) Reset() { *m = GetVolumeInfoRequest{} }
func (m *GetVolumeInfoRequest) String() string { return proto.CompactTextString(m) }
func (*GetVolumeInfoRequest) ProtoMessage() {}
func (*GetVolumeInfoRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (*GetVolumeInfoRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{2} }
func (m *GetVolumeInfoRequest) GetVolumeID() string {
if m != nil {
@@ -144,7 +105,7 @@ type GetVolumeInfoResponse struct {
func (m *GetVolumeInfoResponse) Reset() { *m = GetVolumeInfoResponse{} }
func (m *GetVolumeInfoResponse) String() string { return proto.CompactTextString(m) }
func (*GetVolumeInfoResponse) ProtoMessage() {}
func (*GetVolumeInfoResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (*GetVolumeInfoResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{3} }
func (m *GetVolumeInfoResponse) GetVolumeType() string {
if m != nil {
@@ -168,7 +129,7 @@ type IsVolumeReadyRequest struct {
func (m *IsVolumeReadyRequest) Reset() { *m = IsVolumeReadyRequest{} }
func (m *IsVolumeReadyRequest) String() string { return proto.CompactTextString(m) }
func (*IsVolumeReadyRequest) ProtoMessage() {}
func (*IsVolumeReadyRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (*IsVolumeReadyRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{4} }
func (m *IsVolumeReadyRequest) GetVolumeID() string {
if m != nil {
@@ -191,7 +152,7 @@ type IsVolumeReadyResponse struct {
func (m *IsVolumeReadyResponse) Reset() { *m = IsVolumeReadyResponse{} }
func (m *IsVolumeReadyResponse) String() string { return proto.CompactTextString(m) }
func (*IsVolumeReadyResponse) ProtoMessage() {}
func (*IsVolumeReadyResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
func (*IsVolumeReadyResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{5} }
func (m *IsVolumeReadyResponse) GetReady() bool {
if m != nil {
@@ -207,7 +168,7 @@ type ListSnapshotsRequest struct {
func (m *ListSnapshotsRequest) Reset() { *m = ListSnapshotsRequest{} }
func (m *ListSnapshotsRequest) String() string { return proto.CompactTextString(m) }
func (*ListSnapshotsRequest) ProtoMessage() {}
func (*ListSnapshotsRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (*ListSnapshotsRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{6} }
func (m *ListSnapshotsRequest) GetTagFilters() map[string]string {
if m != nil {
@@ -223,7 +184,7 @@ type ListSnapshotsResponse struct {
func (m *ListSnapshotsResponse) Reset() { *m = ListSnapshotsResponse{} }
func (m *ListSnapshotsResponse) String() string { return proto.CompactTextString(m) }
func (*ListSnapshotsResponse) ProtoMessage() {}
func (*ListSnapshotsResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (*ListSnapshotsResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{7} }
func (m *ListSnapshotsResponse) GetSnapshotIDs() []string {
if m != nil {
@@ -241,7 +202,7 @@ type CreateSnapshotRequest struct {
func (m *CreateSnapshotRequest) Reset() { *m = CreateSnapshotRequest{} }
func (m *CreateSnapshotRequest) String() string { return proto.CompactTextString(m) }
func (*CreateSnapshotRequest) ProtoMessage() {}
func (*CreateSnapshotRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
func (*CreateSnapshotRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{8} }
func (m *CreateSnapshotRequest) GetVolumeID() string {
if m != nil {
@@ -271,7 +232,7 @@ type CreateSnapshotResponse struct {
func (m *CreateSnapshotResponse) Reset() { *m = CreateSnapshotResponse{} }
func (m *CreateSnapshotResponse) String() string { return proto.CompactTextString(m) }
func (*CreateSnapshotResponse) ProtoMessage() {}
func (*CreateSnapshotResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} }
func (*CreateSnapshotResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{9} }
func (m *CreateSnapshotResponse) GetSnapshotID() string {
if m != nil {
@@ -287,7 +248,7 @@ type DeleteSnapshotRequest struct {
func (m *DeleteSnapshotRequest) Reset() { *m = DeleteSnapshotRequest{} }
func (m *DeleteSnapshotRequest) String() string { return proto.CompactTextString(m) }
func (*DeleteSnapshotRequest) ProtoMessage() {}
func (*DeleteSnapshotRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} }
func (*DeleteSnapshotRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{10} }
func (m *DeleteSnapshotRequest) GetSnapshotID() string {
if m != nil {
@@ -580,9 +541,9 @@ var _BlockStore_serviceDesc = grpc.ServiceDesc{
Metadata: "BlockStore.proto",
}
func init() { proto.RegisterFile("BlockStore.proto", fileDescriptor0) }
func init() { proto.RegisterFile("BlockStore.proto", fileDescriptor1) }
var fileDescriptor0 = []byte{
var fileDescriptor1 = []byte{
// 539 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xc1, 0x6e, 0xd3, 0x40,
0x10, 0xd5, 0xc6, 0x06, 0x35, 0x53, 0x5a, 0xa2, 0xc5, 0xae, 0x2c, 0x1f, 0x8a, 0xf1, 0x29, 0x42,

View File

@@ -26,7 +26,7 @@ type PutObjectRequest struct {
func (m *PutObjectRequest) Reset() { *m = PutObjectRequest{} }
func (m *PutObjectRequest) String() string { return proto.CompactTextString(m) }
func (*PutObjectRequest) ProtoMessage() {}
func (*PutObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} }
func (*PutObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{0} }
func (m *PutObjectRequest) GetBucket() string {
if m != nil {
@@ -57,7 +57,7 @@ type GetObjectRequest struct {
func (m *GetObjectRequest) Reset() { *m = GetObjectRequest{} }
func (m *GetObjectRequest) String() string { return proto.CompactTextString(m) }
func (*GetObjectRequest) ProtoMessage() {}
func (*GetObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{1} }
func (*GetObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{1} }
func (m *GetObjectRequest) GetBucket() string {
if m != nil {
@@ -80,7 +80,7 @@ type Bytes struct {
func (m *Bytes) Reset() { *m = Bytes{} }
func (m *Bytes) String() string { return proto.CompactTextString(m) }
func (*Bytes) ProtoMessage() {}
func (*Bytes) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{2} }
func (*Bytes) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{2} }
func (m *Bytes) GetData() []byte {
if m != nil {
@@ -97,7 +97,7 @@ type ListCommonPrefixesRequest struct {
func (m *ListCommonPrefixesRequest) Reset() { *m = ListCommonPrefixesRequest{} }
func (m *ListCommonPrefixesRequest) String() string { return proto.CompactTextString(m) }
func (*ListCommonPrefixesRequest) ProtoMessage() {}
func (*ListCommonPrefixesRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{3} }
func (*ListCommonPrefixesRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{3} }
func (m *ListCommonPrefixesRequest) GetBucket() string {
if m != nil {
@@ -120,7 +120,7 @@ type ListCommonPrefixesResponse struct {
func (m *ListCommonPrefixesResponse) Reset() { *m = ListCommonPrefixesResponse{} }
func (m *ListCommonPrefixesResponse) String() string { return proto.CompactTextString(m) }
func (*ListCommonPrefixesResponse) ProtoMessage() {}
func (*ListCommonPrefixesResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{4} }
func (*ListCommonPrefixesResponse) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{4} }
func (m *ListCommonPrefixesResponse) GetPrefixes() []string {
if m != nil {
@@ -137,7 +137,7 @@ type ListObjectsRequest struct {
func (m *ListObjectsRequest) Reset() { *m = ListObjectsRequest{} }
func (m *ListObjectsRequest) String() string { return proto.CompactTextString(m) }
func (*ListObjectsRequest) ProtoMessage() {}
func (*ListObjectsRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{5} }
func (*ListObjectsRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{5} }
func (m *ListObjectsRequest) GetBucket() string {
if m != nil {
@@ -160,7 +160,7 @@ type ListObjectsResponse struct {
func (m *ListObjectsResponse) Reset() { *m = ListObjectsResponse{} }
func (m *ListObjectsResponse) String() string { return proto.CompactTextString(m) }
func (*ListObjectsResponse) ProtoMessage() {}
func (*ListObjectsResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{6} }
func (*ListObjectsResponse) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{6} }
func (m *ListObjectsResponse) GetKeys() []string {
if m != nil {
@@ -177,7 +177,7 @@ type DeleteObjectRequest struct {
func (m *DeleteObjectRequest) Reset() { *m = DeleteObjectRequest{} }
func (m *DeleteObjectRequest) String() string { return proto.CompactTextString(m) }
func (*DeleteObjectRequest) ProtoMessage() {}
func (*DeleteObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{7} }
func (*DeleteObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{7} }
func (m *DeleteObjectRequest) GetBucket() string {
if m != nil {
@@ -202,7 +202,7 @@ type CreateSignedURLRequest struct {
func (m *CreateSignedURLRequest) Reset() { *m = CreateSignedURLRequest{} }
func (m *CreateSignedURLRequest) String() string { return proto.CompactTextString(m) }
func (*CreateSignedURLRequest) ProtoMessage() {}
func (*CreateSignedURLRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{8} }
func (*CreateSignedURLRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{8} }
func (m *CreateSignedURLRequest) GetBucket() string {
if m != nil {
@@ -232,7 +232,7 @@ type CreateSignedURLResponse struct {
func (m *CreateSignedURLResponse) Reset() { *m = CreateSignedURLResponse{} }
func (m *CreateSignedURLResponse) String() string { return proto.CompactTextString(m) }
func (*CreateSignedURLResponse) ProtoMessage() {}
func (*CreateSignedURLResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{9} }
func (*CreateSignedURLResponse) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{9} }
func (m *CreateSignedURLResponse) GetUrl() string {
if m != nil {
@@ -586,9 +586,9 @@ var _ObjectStore_serviceDesc = grpc.ServiceDesc{
Metadata: "ObjectStore.proto",
}
func init() { proto.RegisterFile("ObjectStore.proto", fileDescriptor1) }
func init() { proto.RegisterFile("ObjectStore.proto", fileDescriptor2) }
var fileDescriptor1 = []byte{
var fileDescriptor2 = []byte{
// 444 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0xdf, 0x8b, 0xd3, 0x40,
0x10, 0xc7, 0x89, 0xa9, 0xc5, 0xcc, 0x15, 0x8c, 0x73, 0x50, 0x6b, 0x4e, 0xa5, 0x2e, 0x0a, 0x15,

View File

@@ -18,7 +18,7 @@ type Empty struct {
func (m *Empty) Reset() { *m = Empty{} }
func (m *Empty) String() string { return proto.CompactTextString(m) }
func (*Empty) ProtoMessage() {}
func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{0} }
func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor3, []int{0} }
type InitRequest struct {
Config map[string]string `protobuf:"bytes,1,rep,name=config" json:"config,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
@@ -27,7 +27,7 @@ type InitRequest struct {
func (m *InitRequest) Reset() { *m = InitRequest{} }
func (m *InitRequest) String() string { return proto.CompactTextString(m) }
func (*InitRequest) ProtoMessage() {}
func (*InitRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{1} }
func (*InitRequest) Descriptor() ([]byte, []int) { return fileDescriptor3, []int{1} }
func (m *InitRequest) GetConfig() map[string]string {
if m != nil {
@@ -41,9 +41,9 @@ func init() {
proto.RegisterType((*InitRequest)(nil), "generated.InitRequest")
}
func init() { proto.RegisterFile("Shared.proto", fileDescriptor2) }
func init() { proto.RegisterFile("Shared.proto", fileDescriptor3) }
var fileDescriptor2 = []byte{
var fileDescriptor3 = []byte{
// 156 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x09, 0xce, 0x48, 0x2c,
0x4a, 0x4d, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0x4f, 0xcd, 0x4b, 0x2d, 0x4a,

View File

@@ -18,15 +18,17 @@ package plugin
import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/cloudprovider"
)
@@ -38,6 +40,13 @@ func (k PluginKind) String() string {
return string(k)
}
func baseConfig() *plugin.ClientConfig {
return &plugin.ClientConfig{
HandshakeConfig: Handshake,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
}
}
const (
// PluginKindObjectStore is the Kind string for
// an Object Store plugin.
@@ -47,12 +56,36 @@ const (
// a Block Store plugin.
PluginKindBlockStore PluginKind = "blockstore"
// PluginKindCloudProvider is the Kind string for
// a CloudProvider plugin (i.e. an Object & Block
// store).
//
// NOTE that it is highly likely that in subsequent
// versions of Ark this kind of plugin will be replaced
// with a different mechanism for providing multiple
// plugin impls within a single binary. This should
// probably not be used.
PluginKindCloudProvider PluginKind = "cloudprovider"
// PluginKindBackupItemAction is the Kind string for
// a Backup ItemAction plugin.
PluginKindBackupItemAction PluginKind = "backupitemaction"
pluginDir = "/plugins"
)
var AllPluginKinds = []PluginKind{
PluginKindObjectStore,
PluginKindBlockStore,
PluginKindCloudProvider,
PluginKindBackupItemAction,
}
type pluginInfo struct {
kind PluginKind
name string
kinds []PluginKind
name string
commandName string
commandArgs []string
}
// Manager exposes functions for getting implementations of the pluggable
@@ -65,86 +98,58 @@ type Manager interface {
// GetBlockStore returns the plugin implementation of the
// cloudprovider.BlockStore interface with the specified name.
GetBlockStore(name string) (cloudprovider.BlockStore, error)
// GetBackupItemActions returns all backup.ItemAction plugins.
// These plugin instances should ONLY be used for a single backup
// (mainly because each one outputs to a per-backup log),
// and should be terminated upon completion of the backup with
// CloseBackupItemActions().
GetBackupItemActions(backupName string, logger logrus.FieldLogger, level logrus.Level) ([]backup.ItemAction, error)
// CloseBackupItemActions terminates the plugin sub-processes that
// are hosting BackupItemAction plugins for the given backup name.
CloseBackupItemActions(backupName string) error
}
type manager struct {
logger hclog.Logger
clients map[pluginInfo]*plugin.Client
internalPlugins map[pluginInfo]interface{}
logger hclog.Logger
pluginRegistry *registry
clientStore *clientStore
}
// NewManager constructs a manager for getting plugin implementations.
func NewManager(logger logrus.FieldLogger, level logrus.Level) Manager {
return &manager{
logger: (&logrusAdapter{impl: logger, level: level}),
clients: make(map[pluginInfo]*plugin.Client),
internalPlugins: map[pluginInfo]interface{}{
{kind: PluginKindObjectStore, name: "aws"}: struct{}{},
{kind: PluginKindBlockStore, name: "aws"}: struct{}{},
func NewManager(logger logrus.FieldLogger, level logrus.Level) (Manager, error) {
m := &manager{
logger: &logrusAdapter{impl: logger, level: level},
pluginRegistry: newRegistry(),
clientStore: newClientStore(),
}
{kind: PluginKindObjectStore, name: "gcp"}: struct{}{},
{kind: PluginKindBlockStore, name: "gcp"}: struct{}{},
if err := m.registerPlugins(); err != nil {
return nil, err
}
{kind: PluginKindObjectStore, name: "azure"}: struct{}{},
{kind: PluginKindBlockStore, name: "azure"}: struct{}{},
},
return m, nil
}
func pluginForKind(kind PluginKind) plugin.Plugin {
switch kind {
case PluginKindObjectStore:
return &ObjectStorePlugin{}
case PluginKindBlockStore:
return &BlockStorePlugin{}
default:
return nil
}
}
func addPlugins(config *plugin.ClientConfig, kinds ...PluginKind) {
for _, kind := range kinds {
if kind == PluginKindObjectStore {
config.Plugins[kind.String()] = &ObjectStorePlugin{}
} else if kind == PluginKindBlockStore {
config.Plugins[kind.String()] = &BlockStorePlugin{}
}
}
}
func (m *manager) getPlugin(descriptor pluginInfo, logger hclog.Logger) (interface{}, error) {
client, found := m.clients[descriptor]
if !found {
var (
externalPath = filepath.Join(pluginDir, fmt.Sprintf("ark-%s-%s", descriptor.kind, descriptor.name))
config = &plugin.ClientConfig{
HandshakeConfig: Handshake,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Plugins: make(map[string]plugin.Plugin),
Logger: logger,
}
)
// First check to see if there's an external plugin for this kind and name. this
// is so users can override the built-in plugins if they want. If it doesn't exist,
// see if there's an internal one.
if _, err := os.Stat(externalPath); err == nil {
addPlugins(config, descriptor.kind)
config.Cmd = exec.Command(externalPath)
client = plugin.NewClient(config)
m.clients[descriptor] = client
} else if _, found := m.internalPlugins[descriptor]; found {
addPlugins(config, PluginKindObjectStore, PluginKindBlockStore)
config.Cmd = exec.Command("/ark", "plugin", "cloudprovider", descriptor.name)
client = plugin.NewClient(config)
// since a single sub-process will serve both an object and block store
// for a given cloud-provider, record this client as being valid for both
m.clients[pluginInfo{PluginKindObjectStore, descriptor.name}] = client
m.clients[pluginInfo{PluginKindBlockStore, descriptor.name}] = client
} else {
return nil, errors.Errorf("plugin not found for kind=%s, name=%s", descriptor.kind, descriptor.name)
}
}
func getPluginInstance(client *plugin.Client, kind PluginKind) (interface{}, error) {
protocolClient, err := client.Client()
if err != nil {
return nil, errors.WithStack(err)
}
plugin, err := protocolClient.Dispense(descriptor.kind.String())
plugin, err := protocolClient.Dispense(string(kind))
if err != nil {
return nil, errors.WithStack(err)
}
@@ -152,10 +157,56 @@ func (m *manager) getPlugin(descriptor pluginInfo, logger hclog.Logger) (interfa
return plugin, nil
}
func (m *manager) registerPlugins() error {
// first, register internal plugins
for _, provider := range []string{"aws", "gcp", "azure"} {
m.pluginRegistry.register(provider, "/ark", []string{"plugin", "cloudprovider", provider}, PluginKindObjectStore, PluginKindBlockStore)
}
m.pluginRegistry.register("backup_pv", "/ark", []string{"plugin", string(PluginKindBackupItemAction), "backup_pv"}, PluginKindBackupItemAction)
// second, register external plugins (these will override internal plugins, if applicable)
if _, err := os.Stat(pluginDir); err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
files, err := ioutil.ReadDir(pluginDir)
if err != nil {
return err
}
for _, file := range files {
name, kind, err := parse(file.Name())
if err != nil {
continue
}
if kind == PluginKindCloudProvider {
m.pluginRegistry.register(name, filepath.Join(pluginDir, file.Name()), nil, PluginKindObjectStore, PluginKindBlockStore)
} else {
m.pluginRegistry.register(name, filepath.Join(pluginDir, file.Name()), nil, kind)
}
}
return nil
}
func parse(filename string) (string, PluginKind, error) {
for _, kind := range AllPluginKinds {
if prefix := fmt.Sprintf("ark-%s-", kind); strings.Index(filename, prefix) == 0 {
return strings.Replace(filename, prefix, "", -1), kind, nil
}
}
return "", "", errors.New("invalid file name")
}
// GetObjectStore returns the plugin implementation of the cloudprovider.ObjectStore
// interface with the specified name.
func (m *manager) GetObjectStore(name string) (cloudprovider.ObjectStore, error) {
pluginObj, err := m.getPlugin(pluginInfo{PluginKindObjectStore, name}, m.logger)
pluginObj, err := m.getCloudProviderPlugin(name, PluginKindObjectStore)
if err != nil {
return nil, err
}
@@ -171,7 +222,7 @@ func (m *manager) GetObjectStore(name string) (cloudprovider.ObjectStore, error)
// GetBlockStore returns the plugin implementation of the cloudprovider.BlockStore
// interface with the specified name.
func (m *manager) GetBlockStore(name string) (cloudprovider.BlockStore, error) {
pluginObj, err := m.getPlugin(pluginInfo{PluginKindBlockStore, name}, m.logger)
pluginObj, err := m.getCloudProviderPlugin(name, PluginKindBlockStore)
if err != nil {
return nil, err
}
@@ -183,3 +234,99 @@ func (m *manager) GetBlockStore(name string) (cloudprovider.BlockStore, error) {
return blockStore, nil
}
func (m *manager) getCloudProviderPlugin(name string, kind PluginKind) (interface{}, error) {
client, err := m.clientStore.get(kind, name, "")
if err != nil {
pluginInfo, err := m.pluginRegistry.get(kind, name)
if err != nil {
return nil, err
}
// build a plugin client that can dispense all of the PluginKinds it's registered for
clientBuilder := newClientBuilder(baseConfig()).
withCommand(pluginInfo.commandName, pluginInfo.commandArgs...)
for _, kind := range pluginInfo.kinds {
clientBuilder.withPlugin(kind, pluginForKind(kind))
}
client = clientBuilder.client()
// register the plugin client for the appropriate kinds
for _, kind := range pluginInfo.kinds {
m.clientStore.add(client, kind, name, "")
}
}
pluginObj, err := getPluginInstance(client, kind)
if err != nil {
return nil, err
}
return pluginObj, nil
}
// GetBackupActions returns all backup.BackupAction plugins.
// These plugin instances should ONLY be used for a single backup
// (mainly because each one outputs to a per-backup log),
// and should be terminated upon completion of the backup with
// CloseBackupActions().
func (m *manager) GetBackupItemActions(backupName string, logger logrus.FieldLogger, level logrus.Level) ([]backup.ItemAction, error) {
clients, err := m.clientStore.list(PluginKindBackupItemAction, backupName)
if err != nil {
pluginInfo, err := m.pluginRegistry.list(PluginKindBackupItemAction)
if err != nil {
return nil, err
}
// create clients for each, using the provided logger
log := &logrusAdapter{impl: logger, level: level}
for _, plugin := range pluginInfo {
client := newClientBuilder(baseConfig()).
withCommand(plugin.commandName, plugin.commandArgs...).
withPlugin(PluginKindBackupItemAction, &BackupItemActionPlugin{log: log}).
withLogger(log).
client()
m.clientStore.add(client, PluginKindBackupItemAction, plugin.name, backupName)
clients = append(clients, client)
}
}
var backupActions []backup.ItemAction
for _, client := range clients {
plugin, err := getPluginInstance(client, PluginKindBackupItemAction)
if err != nil {
return nil, err
}
backupAction, ok := plugin.(backup.ItemAction)
if !ok {
return nil, errors.New("could not convert gRPC client to backup.BackupAction")
}
backupActions = append(backupActions, backupAction)
}
return backupActions, nil
}
// CloseBackupItemActions terminates the plugin sub-processes that
// are hosting BackupItemAction plugins for the given backup name.
func (m *manager) CloseBackupItemActions(backupName string) error {
clients, err := m.clientStore.list(PluginKindBackupItemAction, backupName)
if err != nil {
return err
}
for _, client := range clients {
client.Kill()
}
m.clientStore.deleteAll(PluginKindBackupItemAction, backupName)
return nil
}

View File

@@ -0,0 +1,34 @@
syntax = "proto3";
package generated;
import "Shared.proto";
message AppliesToResponse {
repeated string includedNamespaces = 1;
repeated string excludedNamespaces = 2;
repeated string includedResources = 3;
repeated string excludedResources = 4;
string selector = 5;
}
message ExecuteRequest {
bytes item = 1;
bytes backup = 2;
}
message ExecuteResponse {
bytes item = 1;
repeated ResourceIdentifier additionalItems = 2;
}
message ResourceIdentifier {
string group = 1;
string resource = 2;
string namespace = 3;
string name = 4;
}
service BackupItemAction {
rpc AppliesTo(Empty) returns (AppliesToResponse);
rpc Execute(ExecuteRequest) returns (ExecuteResponse);
}

68
pkg/plugin/registry.go Normal file
View File

@@ -0,0 +1,68 @@
package plugin
import (
"github.com/pkg/errors"
)
// registry is a simple store of plugin binary information. If a binary
// is registered as supporting multiple PluginKinds, it will be
// gettable/listable for all of those kinds.
type registry struct {
// plugins is a nested map, keyed first by PluginKind,
// and second by name. this is to allow easy listing
// of plugins for a kind, as well as efficient lookup
// of a plugin by kind+name.
plugins map[PluginKind]map[string]pluginInfo
}
func newRegistry() *registry {
return &registry{
plugins: make(map[PluginKind]map[string]pluginInfo),
}
}
// register adds a binary to the registry. If the binary supports multiple
// PluginKinds, it will be stored for each of those kinds so subsequent gets/lists
// for any supported kind will return it.
func (r *registry) register(name, commandName string, commandArgs []string, kinds ...PluginKind) {
for _, kind := range kinds {
if r.plugins[kind] == nil {
r.plugins[kind] = make(map[string]pluginInfo)
}
r.plugins[kind][name] = pluginInfo{
kinds: kinds,
name: name,
commandName: commandName,
commandArgs: commandArgs,
}
}
}
// list returns info about all plugin binaries that implement the given
// PluginKind.
func (r *registry) list(kind PluginKind) ([]pluginInfo, error) {
var res []pluginInfo
if plugins, found := r.plugins[kind]; found {
for _, itm := range plugins {
res = append(res, itm)
}
return res, nil
}
return nil, errors.New("plugins not found")
}
// get returns info about a plugin with the given name and kind, or an
// error if one cannot be found.
func (r *registry) get(kind PluginKind, name string) (pluginInfo, error) {
if forKind := r.plugins[kind]; forKind != nil {
if plugin, found := r.plugins[kind][name]; found {
return plugin, nil
}
}
return pluginInfo{}, errors.New("plugin not found")
}