Split plugin framework into its own package

Signed-off-by: Carlisia <carlisiac@vmware.com>
This commit is contained in:
Carlisia
2019-03-14 18:25:52 -07:00
parent 73514a003b
commit 7dfe58d37f
48 changed files with 1114 additions and 824 deletions

View File

@@ -0,0 +1,43 @@
/*
Copyright 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"github.com/hashicorp/go-plugin"
"google.golang.org/grpc"
proto "github.com/heptio/velero/pkg/plugin/generated"
)
// BackupItemActionPlugin is an implementation of go-plugin's Plugin
// interface with support for gRPC for the backup/ItemAction
// interface.
type BackupItemActionPlugin struct {
plugin.NetRPCUnsupportedPlugin
*pluginBase
}
// GRPCClient returns a clientDispenser for BackupItemAction gRPC clients.
func (p *BackupItemActionPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) {
return newClientDispenser(p.clientLogger, c, newBackupItemActionGRPCClient), nil
}
// GRPCServer registers a BackupItemAction gRPC server.
func (p *BackupItemActionPlugin) GRPCServer(s *grpc.Server) error {
proto.RegisterBackupItemActionServer(s, &BackupItemActionGRPCServer{mux: p.serverMux})
return nil
}

View File

@@ -0,0 +1,112 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"encoding/json"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
api "github.com/heptio/velero/pkg/apis/velero/v1"
proto "github.com/heptio/velero/pkg/plugin/generated"
"github.com/heptio/velero/pkg/plugin/velero"
)
// NewBackupItemActionPlugin constructs a BackupItemActionPlugin.
func NewBackupItemActionPlugin(options ...PluginOption) *BackupItemActionPlugin {
return &BackupItemActionPlugin{
pluginBase: newPluginBase(options...),
}
}
// BackupItemActionGRPCClient implements the backup/ItemAction interface and uses a
// gRPC client to make calls to the plugin server.
type BackupItemActionGRPCClient struct {
*clientBase
grpcClient proto.BackupItemActionClient
}
func newBackupItemActionGRPCClient(base *clientBase, clientConn *grpc.ClientConn) interface{} {
return &BackupItemActionGRPCClient{
clientBase: base,
grpcClient: proto.NewBackupItemActionClient(clientConn),
}
}
func (c *BackupItemActionGRPCClient) AppliesTo() (velero.ResourceSelector, error) {
res, err := c.grpcClient.AppliesTo(context.Background(), &proto.AppliesToRequest{Plugin: c.plugin})
if err != nil {
return velero.ResourceSelector{}, err
}
return velero.ResourceSelector{
IncludedNamespaces: res.IncludedNamespaces,
ExcludedNamespaces: res.ExcludedNamespaces,
IncludedResources: res.IncludedResources,
ExcludedResources: res.ExcludedResources,
LabelSelector: res.Selector,
}, nil
}
func (c *BackupItemActionGRPCClient) Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, error) {
itemJSON, err := json.Marshal(item.UnstructuredContent())
if err != nil {
return nil, nil, err
}
backupJSON, err := json.Marshal(backup)
if err != nil {
return nil, nil, err
}
req := &proto.ExecuteRequest{
Plugin: c.plugin,
Item: itemJSON,
Backup: backupJSON,
}
res, err := c.grpcClient.Execute(context.Background(), req)
if err != nil {
return nil, nil, err
}
var updatedItem unstructured.Unstructured
if err := json.Unmarshal(res.Item, &updatedItem); err != nil {
return nil, nil, err
}
var additionalItems []velero.ResourceIdentifier
for _, itm := range res.AdditionalItems {
newItem := velero.ResourceIdentifier{
GroupResource: schema.GroupResource{
Group: itm.Group,
Resource: itm.Resource,
},
Namespace: itm.Namespace,
Name: itm.Name,
}
additionalItems = append(additionalItems, newItem)
}
return &updatedItem, additionalItems, nil
}

View File

@@ -0,0 +1,134 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"encoding/json"
"github.com/pkg/errors"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
api "github.com/heptio/velero/pkg/apis/velero/v1"
proto "github.com/heptio/velero/pkg/plugin/generated"
"github.com/heptio/velero/pkg/plugin/velero"
)
// BackupItemActionGRPCServer implements the proto-generated BackupItemAction interface, and accepts
// gRPC calls and forwards them to an implementation of the pluggable interface.
type BackupItemActionGRPCServer struct {
mux *serverMux
}
func (s *BackupItemActionGRPCServer) getImpl(name string) (velero.BackupItemAction, error) {
impl, err := s.mux.getHandler(name)
if err != nil {
return nil, err
}
itemAction, ok := impl.(velero.BackupItemAction)
if !ok {
return nil, errors.Errorf("%T is not a backup item action", impl)
}
return itemAction, nil
}
func (s *BackupItemActionGRPCServer) AppliesTo(ctx context.Context, req *proto.AppliesToRequest) (response *proto.AppliesToResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
resourceSelector, err := impl.AppliesTo()
if err != nil {
return nil, err
}
return &proto.AppliesToResponse{
IncludedNamespaces: resourceSelector.IncludedNamespaces,
ExcludedNamespaces: resourceSelector.ExcludedNamespaces,
IncludedResources: resourceSelector.IncludedResources,
ExcludedResources: resourceSelector.ExcludedResources,
Selector: resourceSelector.LabelSelector,
}, nil
}
func (s *BackupItemActionGRPCServer) Execute(ctx context.Context, req *proto.ExecuteRequest) (response *proto.ExecuteResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
var item unstructured.Unstructured
var backup api.Backup
if err := json.Unmarshal(req.Item, &item); err != nil {
return nil, err
}
if err := json.Unmarshal(req.Backup, &backup); err != nil {
return nil, err
}
updatedItem, additionalItems, err := impl.Execute(&item, &backup)
if err != nil {
return nil, err
}
// If the plugin implementation returned a nil updatedItem (meaning no modifications), reset updatedItem to the
// original item.
var updatedItemJSON []byte
if updatedItem == nil {
updatedItemJSON = req.Item
} else {
updatedItemJSON, err = json.Marshal(updatedItem.UnstructuredContent())
if err != nil {
return nil, err
}
}
res := &proto.ExecuteResponse{
Item: updatedItemJSON,
}
for _, item := range additionalItems {
res.AdditionalItems = append(res.AdditionalItems, backupResourceIdentifierToProto(item))
}
return res, nil
}
func backupResourceIdentifierToProto(id velero.ResourceIdentifier) *proto.ResourceIdentifier {
return &proto.ResourceIdentifier{
Group: id.Group,
Resource: id.Resource,
Namespace: id.Namespace,
Name: id.Name,
}
}

View File

@@ -0,0 +1,199 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"encoding/json"
"testing"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
v1 "github.com/heptio/velero/pkg/apis/velero/v1"
"github.com/heptio/velero/pkg/backup/mocks"
proto "github.com/heptio/velero/pkg/plugin/generated"
"github.com/heptio/velero/pkg/plugin/velero"
velerotest "github.com/heptio/velero/pkg/util/test"
)
func TestBackupItemActionGRPCServerExecute(t *testing.T) {
invalidItem := []byte("this is gibberish json")
validItem := []byte(`
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"namespace": "myns",
"name": "myconfigmap"
},
"data": {
"key": "value"
}
}`)
var validItemObject unstructured.Unstructured
err := json.Unmarshal(validItem, &validItemObject)
require.NoError(t, err)
updatedItem := []byte(`
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"namespace": "myns",
"name": "myconfigmap"
},
"data": {
"key": "changed!"
}
}`)
var updatedItemObject unstructured.Unstructured
err = json.Unmarshal(updatedItem, &updatedItemObject)
require.NoError(t, err)
invalidBackup := []byte("this is gibberish json")
validBackup := []byte(`
{
"apiVersion": "velero.io/v1",
"kind": "Backup",
"metadata": {
"namespace": "myns",
"name": "mybackup"
},
"spec": {
"includedNamespaces": ["*"],
"includedResources": ["*"],
"ttl": "60m"
}
}`)
var validBackupObject v1.Backup
err = json.Unmarshal(validBackup, &validBackupObject)
require.NoError(t, err)
tests := []struct {
name string
backup []byte
item []byte
implUpdatedItem runtime.Unstructured
implAdditionalItems []velero.ResourceIdentifier
implError error
expectError bool
skipMock bool
}{
{
name: "error unmarshaling item",
item: invalidItem,
backup: validBackup,
expectError: true,
skipMock: true,
},
{
name: "error unmarshaling backup",
item: validItem,
backup: invalidBackup,
expectError: true,
skipMock: true,
},
{
name: "error running impl",
item: validItem,
backup: validBackup,
implError: errors.New("impl error"),
expectError: true,
},
{
name: "nil updatedItem / no additionalItems",
item: validItem,
backup: validBackup,
},
{
name: "same updatedItem / some additionalItems",
item: validItem,
backup: validBackup,
implUpdatedItem: &validItemObject,
implAdditionalItems: []velero.ResourceIdentifier{
{
GroupResource: schema.GroupResource{Group: "v1", Resource: "pods"},
Namespace: "myns",
Name: "mypod",
},
},
},
{
name: "different updatedItem",
item: validItem,
backup: validBackup,
implUpdatedItem: &updatedItemObject,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
itemAction := &mocks.ItemAction{}
defer itemAction.AssertExpectations(t)
if !test.skipMock {
itemAction.On("Execute", &validItemObject, &validBackupObject).Return(test.implUpdatedItem, test.implAdditionalItems, test.implError)
}
s := &BackupItemActionGRPCServer{mux: &serverMux{
serverLog: velerotest.NewLogger(),
handlers: map[string]interface{}{
"xyz": itemAction,
},
}}
req := &proto.ExecuteRequest{
Plugin: "xyz",
Item: test.item,
Backup: test.backup,
}
resp, err := s.Execute(context.Background(), req)
// Verify error
assert.Equal(t, test.expectError, err != nil)
if err != nil {
return
}
require.NotNil(t, resp)
// Verify updated item
updatedItem := test.implUpdatedItem
if updatedItem == nil {
// If the impl returned nil for its updatedItem, we should expect the plugin to return the original item
updatedItem = &validItemObject
}
var respItem unstructured.Unstructured
err = json.Unmarshal(resp.Item, &respItem)
require.NoError(t, err)
assert.Equal(t, updatedItem, &respItem)
// Verify additional items
var expectedAdditionalItems []*proto.ResourceIdentifier
for _, item := range test.implAdditionalItems {
expectedAdditionalItems = append(expectedAdditionalItems, backupResourceIdentifierToProto(item))
}
assert.Equal(t, expectedAdditionalItems, resp.AdditionalItems)
})
}
}

View File

@@ -0,0 +1,43 @@
/*
Copyright 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"github.com/hashicorp/go-plugin"
"google.golang.org/grpc"
proto "github.com/heptio/velero/pkg/plugin/generated"
)
// BlockStorePlugin is an implementation of go-plugin's Plugin
// interface with support for gRPC for the cloudprovider/BlockStore
// interface.
type BlockStorePlugin struct {
plugin.NetRPCUnsupportedPlugin
*pluginBase
}
// GRPCClient returns a BlockStore gRPC client.
func (p *BlockStorePlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) {
return newClientDispenser(p.clientLogger, c, newBlockStoreGRPCClient), nil
}
// GRPCServer registers a BlockStore gRPC server.
func (p *BlockStorePlugin) GRPCServer(s *grpc.Server) error {
proto.RegisterBlockStoreServer(s, &BlockStoreGRPCServer{mux: p.serverMux})
return nil
}

View File

@@ -0,0 +1,168 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"encoding/json"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
proto "github.com/heptio/velero/pkg/plugin/generated"
)
// NewBlockStorePlugin constructs a BlockStorePlugin.
func NewBlockStorePlugin(options ...PluginOption) *BlockStorePlugin {
return &BlockStorePlugin{
pluginBase: newPluginBase(options...),
}
}
// BlockStoreGRPCClient implements the cloudprovider.BlockStore interface and uses a
// gRPC client to make calls to the plugin server.
type BlockStoreGRPCClient struct {
*clientBase
grpcClient proto.BlockStoreClient
}
func newBlockStoreGRPCClient(base *clientBase, clientConn *grpc.ClientConn) interface{} {
return &BlockStoreGRPCClient{
clientBase: base,
grpcClient: proto.NewBlockStoreClient(clientConn),
}
}
// Init prepares the BlockStore for usage using the provided map of
// configuration key-value pairs. It returns an error if the BlockStore
// cannot be initialized from the provided config.
func (c *BlockStoreGRPCClient) Init(config map[string]string) error {
_, err := c.grpcClient.Init(context.Background(), &proto.InitRequest{Plugin: c.plugin, Config: config})
return err
}
// CreateVolumeFromSnapshot creates a new block volume, initialized from the provided snapshot,
// and with the specified type and IOPS (if using provisioned IOPS).
func (c *BlockStoreGRPCClient) CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ string, iops *int64) (string, error) {
req := &proto.CreateVolumeRequest{
Plugin: c.plugin,
SnapshotID: snapshotID,
VolumeType: volumeType,
VolumeAZ: volumeAZ,
}
if iops == nil {
req.Iops = 0
} else {
req.Iops = *iops
}
res, err := c.grpcClient.CreateVolumeFromSnapshot(context.Background(), req)
if err != nil {
return "", err
}
return res.VolumeID, nil
}
// GetVolumeInfo returns the type and IOPS (if using provisioned IOPS) for a specified block
// volume.
func (c *BlockStoreGRPCClient) GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, error) {
res, err := c.grpcClient.GetVolumeInfo(context.Background(), &proto.GetVolumeInfoRequest{Plugin: c.plugin, VolumeID: volumeID, VolumeAZ: volumeAZ})
if err != nil {
return "", nil, err
}
var iops *int64
if res.Iops != 0 {
iops = &res.Iops
}
return res.VolumeType, iops, nil
}
// CreateSnapshot creates a snapshot of the specified block volume, and applies the provided
// set of tags to the snapshot.
func (c *BlockStoreGRPCClient) CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (string, error) {
req := &proto.CreateSnapshotRequest{
Plugin: c.plugin,
VolumeID: volumeID,
VolumeAZ: volumeAZ,
Tags: tags,
}
res, err := c.grpcClient.CreateSnapshot(context.Background(), req)
if err != nil {
return "", err
}
return res.SnapshotID, nil
}
// DeleteSnapshot deletes the specified volume snapshot.
func (c *BlockStoreGRPCClient) DeleteSnapshot(snapshotID string) error {
_, err := c.grpcClient.DeleteSnapshot(context.Background(), &proto.DeleteSnapshotRequest{Plugin: c.plugin, SnapshotID: snapshotID})
return err
}
func (c *BlockStoreGRPCClient) GetVolumeID(pv runtime.Unstructured) (string, error) {
encodedPV, err := json.Marshal(pv.UnstructuredContent())
if err != nil {
return "", err
}
req := &proto.GetVolumeIDRequest{
Plugin: c.plugin,
PersistentVolume: encodedPV,
}
resp, err := c.grpcClient.GetVolumeID(context.Background(), req)
if err != nil {
return "", err
}
return resp.VolumeID, nil
}
func (c *BlockStoreGRPCClient) SetVolumeID(pv runtime.Unstructured, volumeID string) (runtime.Unstructured, error) {
encodedPV, err := json.Marshal(pv.UnstructuredContent())
if err != nil {
return nil, err
}
req := &proto.SetVolumeIDRequest{
Plugin: c.plugin,
PersistentVolume: encodedPV,
VolumeID: volumeID,
}
resp, err := c.grpcClient.SetVolumeID(context.Background(), req)
if err != nil {
return nil, err
}
var updatedPV unstructured.Unstructured
if err := json.Unmarshal(resp.PersistentVolume, &updatedPV); err != nil {
return nil, err
}
return &updatedPV, nil
}

View File

@@ -0,0 +1,230 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"encoding/json"
"github.com/pkg/errors"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
proto "github.com/heptio/velero/pkg/plugin/generated"
"github.com/heptio/velero/pkg/plugin/velero"
)
// BlockStoreGRPCServer implements the proto-generated BlockStoreServer interface, and accepts
// gRPC calls and forwards them to an implementation of the pluggable interface.
type BlockStoreGRPCServer struct {
mux *serverMux
}
func (s *BlockStoreGRPCServer) getImpl(name string) (velero.BlockStore, error) {
impl, err := s.mux.getHandler(name)
if err != nil {
return nil, err
}
blockStore, ok := impl.(velero.BlockStore)
if !ok {
return nil, errors.Errorf("%T is not a block store", impl)
}
return blockStore, nil
}
// Init prepares the BlockStore for usage using the provided map of
// configuration key-value pairs. It returns an error if the BlockStore
// cannot be initialized from the provided config.
func (s *BlockStoreGRPCServer) Init(ctx context.Context, req *proto.InitRequest) (response *proto.Empty, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
if err := impl.Init(req.Config); err != nil {
return nil, err
}
return &proto.Empty{}, nil
}
// CreateVolumeFromSnapshot creates a new block volume, initialized from the provided snapshot,
// and with the specified type and IOPS (if using provisioned IOPS).
func (s *BlockStoreGRPCServer) CreateVolumeFromSnapshot(ctx context.Context, req *proto.CreateVolumeRequest) (response *proto.CreateVolumeResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
snapshotID := req.SnapshotID
volumeType := req.VolumeType
volumeAZ := req.VolumeAZ
var iops *int64
if req.Iops != 0 {
iops = &req.Iops
}
volumeID, err := impl.CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ, iops)
if err != nil {
return nil, err
}
return &proto.CreateVolumeResponse{VolumeID: volumeID}, nil
}
// GetVolumeInfo returns the type and IOPS (if using provisioned IOPS) for a specified block
// volume.
func (s *BlockStoreGRPCServer) GetVolumeInfo(ctx context.Context, req *proto.GetVolumeInfoRequest) (response *proto.GetVolumeInfoResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
volumeType, iops, err := impl.GetVolumeInfo(req.VolumeID, req.VolumeAZ)
if err != nil {
return nil, err
}
res := &proto.GetVolumeInfoResponse{
VolumeType: volumeType,
}
if iops != nil {
res.Iops = *iops
}
return res, nil
}
// CreateSnapshot creates a snapshot of the specified block volume, and applies the provided
// set of tags to the snapshot.
func (s *BlockStoreGRPCServer) CreateSnapshot(ctx context.Context, req *proto.CreateSnapshotRequest) (response *proto.CreateSnapshotResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
snapshotID, err := impl.CreateSnapshot(req.VolumeID, req.VolumeAZ, req.Tags)
if err != nil {
return nil, err
}
return &proto.CreateSnapshotResponse{SnapshotID: snapshotID}, nil
}
// DeleteSnapshot deletes the specified volume snapshot.
func (s *BlockStoreGRPCServer) DeleteSnapshot(ctx context.Context, req *proto.DeleteSnapshotRequest) (response *proto.Empty, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
if err := impl.DeleteSnapshot(req.SnapshotID); err != nil {
return nil, err
}
return &proto.Empty{}, nil
}
func (s *BlockStoreGRPCServer) GetVolumeID(ctx context.Context, req *proto.GetVolumeIDRequest) (response *proto.GetVolumeIDResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
var pv unstructured.Unstructured
if err := json.Unmarshal(req.PersistentVolume, &pv); err != nil {
return nil, err
}
volumeID, err := impl.GetVolumeID(&pv)
if err != nil {
return nil, err
}
return &proto.GetVolumeIDResponse{VolumeID: volumeID}, nil
}
func (s *BlockStoreGRPCServer) SetVolumeID(ctx context.Context, req *proto.SetVolumeIDRequest) (response *proto.SetVolumeIDResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
var pv unstructured.Unstructured
if err := json.Unmarshal(req.PersistentVolume, &pv); err != nil {
return nil, err
}
updatedPV, err := impl.SetVolumeID(&pv, req.VolumeID)
if err != nil {
return nil, err
}
updatedPVBytes, err := json.Marshal(updatedPV.UnstructuredContent())
if err != nil {
return nil, err
}
return &proto.SetVolumeIDResponse{PersistentVolume: updatedPVBytes}, nil
}

View File

@@ -0,0 +1,75 @@
/*
Copyright 2018, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
// clientBase implements client and contains shared fields common to all clients.
type clientBase struct {
plugin string
logger logrus.FieldLogger
}
type ClientDispenser interface {
ClientFor(name string) interface{}
}
// clientDispenser supports the initialization and retrieval of multiple implementations for a single plugin kind, such as
// "aws" and "azure" implementations of the object store plugin.
type clientDispenser struct {
// logger is the log the plugin should use.
logger logrus.FieldLogger
// clienConn is shared among all implementations for this client.
clientConn *grpc.ClientConn
// initFunc returns a client that implements a plugin interface, such as ObjectStore.
initFunc clientInitFunc
// clients keeps track of all the initialized implementations.
clients map[string]interface{}
}
type clientInitFunc func(base *clientBase, clientConn *grpc.ClientConn) interface{}
// newClientDispenser creates a new clientDispenser.
func newClientDispenser(logger logrus.FieldLogger, clientConn *grpc.ClientConn, initFunc clientInitFunc) *clientDispenser {
return &clientDispenser{
clientConn: clientConn,
logger: logger,
initFunc: initFunc,
clients: make(map[string]interface{}),
}
}
// ClientFor returns a gRPC client stub for the implementation of a plugin named name. If the client stub does not
// currently exist, clientFor creates it.
func (cd *clientDispenser) ClientFor(name string) interface{} {
if client, found := cd.clients[name]; found {
return client
}
base := &clientBase{
plugin: name,
logger: cd.logger,
}
// Initialize the plugin (e.g. newBackupItemActionGRPCClient())
client := cd.initFunc(base, cd.clientConn)
cd.clients[name] = client
return client
}

View File

@@ -0,0 +1,81 @@
/*
Copyright 2018, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"github.com/heptio/velero/pkg/util/test"
)
type fakeClient struct {
base *clientBase
clientConn *grpc.ClientConn
}
func TestNewClientDispenser(t *testing.T) {
logger := test.NewLogger()
clientConn := new(grpc.ClientConn)
c := 3
initFunc := func(base *clientBase, clientConn *grpc.ClientConn) interface{} {
return c
}
cd := newClientDispenser(logger, clientConn, initFunc)
assert.Equal(t, clientConn, cd.clientConn)
assert.NotNil(t, cd.clients)
assert.Empty(t, cd.clients)
}
func TestClientFor(t *testing.T) {
logger := test.NewLogger()
clientConn := new(grpc.ClientConn)
c := new(fakeClient)
count := 0
initFunc := func(base *clientBase, clientConn *grpc.ClientConn) interface{} {
c.base = base
c.clientConn = clientConn
count++
return c
}
cd := newClientDispenser(logger, clientConn, initFunc)
actual := cd.ClientFor("pod")
require.IsType(t, &fakeClient{}, actual)
typed := actual.(*fakeClient)
assert.Equal(t, 1, count)
assert.Equal(t, &typed, &c)
expectedBase := &clientBase{
plugin: "pod",
logger: logger,
}
assert.Equal(t, expectedBase, typed.base)
assert.Equal(t, clientConn, typed.clientConn)
// Make sure we reuse a previous client
actual = cd.ClientFor("pod")
require.IsType(t, &fakeClient{}, actual)
typed = actual.(*fakeClient)
assert.Equal(t, 1, count)
}

View File

@@ -0,0 +1,3 @@
// Package framework is the common package that any plugin client
// will need to import, for example, both plugin authors and Velero core.
package framework

View File

@@ -0,0 +1,93 @@
/*
Copyright 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime"
)
func ExampleNewServer_blockStore() {
NewServer(). // call the server
RegisterBlockStore("example-blockstore", newBlockStore). // register the plugin
Serve() // serve the plugin
}
func newBlockStore(logger logrus.FieldLogger) (interface{}, error) {
return &BlockStore{FieldLogger: logger}, nil
}
type BlockStore struct {
FieldLogger logrus.FieldLogger
}
// Implement all methods for the BlockStore interface...
func (b *BlockStore) Init(config map[string]string) error {
b.FieldLogger.Infof("BlockStore.Init called")
// ...
return nil
}
func (b *BlockStore) CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ string, iops *int64) (volumeID string, err error) {
b.FieldLogger.Infof("CreateVolumeFromSnapshot called")
// ...
return "volumeID", nil
}
func (b *BlockStore) GetVolumeID(pv runtime.Unstructured) (string, error) {
b.FieldLogger.Infof("GetVolumeID called")
// ...
return "volumeID", nil
}
func (b *BlockStore) SetVolumeID(pv runtime.Unstructured, volumeID string) (runtime.Unstructured, error) {
b.FieldLogger.Infof("SetVolumeID called")
// ...
return nil, nil
}
func (b *BlockStore) GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, error) {
b.FieldLogger.Infof("GetVolumeInfo called")
// ...
return "volumeFilesystemType", nil, nil
}
func (b *BlockStore) CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (snapshotID string, err error) {
b.FieldLogger.Infof("CreateSnapshot called")
// ...
return "snapshotID", nil
}
func (b *BlockStore) DeleteSnapshot(snapshotID string) error {
b.FieldLogger.Infof("DeleteSnapshot called")
// ...
return nil
}

View File

@@ -0,0 +1,31 @@
/*
Copyright 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// handlePanic is a panic handler for the server half of velero plugins.
func handlePanic(p interface{}) error {
if p == nil {
return nil
}
return status.Errorf(codes.Aborted, "plugin panicked: %v", p)
}

View File

@@ -0,0 +1,28 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import plugin "github.com/hashicorp/go-plugin"
// Interface represents a Velero plugin.
type Interface interface {
plugin.Plugin
// names returns a list of all the registered implementations for this plugin (such as "pod" and "pvc" for
// BackupItemAction).
names() []string
}

View File

@@ -0,0 +1,59 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"github.com/sirupsen/logrus"
"github.com/heptio/velero/pkg/util/logging"
)
// newLogger returns a logger that is suitable for use within an
// Velero plugin.
func newLogger() *logrus.Logger {
logger := logrus.New()
/*
!!!DO NOT SET THE OUTPUT TO STDOUT!!!
go-plugin uses stdout for a communications protocol between client and server.
stderr is used for log messages from server to client. The velero server makes sure they are logged to stdout.
*/
// we use the JSON formatter because go-plugin will parse incoming
// JSON on stderr and use it to create structured log entries.
logger.Formatter = &logrus.JSONFormatter{
FieldMap: logrus.FieldMap{
// this is the hclog-compatible message field
logrus.FieldKeyMsg: "@message",
},
// Velero server already adds timestamps when emitting logs, so
// don't do it within the plugin.
DisableTimestamp: true,
}
// set a logger name for the location hook which will signal to the Velero
// server logger that the location has been set within a hook.
logger.Hooks.Add((&logging.LogLocationHook{}).WithLoggerName("plugin"))
// this hook adjusts the string representation of WarnLevel to "warn"
// rather than "warning" to make it parseable by go-plugin within the
// Velero server code
logger.Hooks.Add(&logging.HcLogLevelHook{})
return logger
}

View File

@@ -0,0 +1,46 @@
/*
Copyright 2018, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/heptio/velero/pkg/util/logging"
)
func TestNewLogger(t *testing.T) {
l := newLogger()
expectedFormatter := &logrus.JSONFormatter{
FieldMap: logrus.FieldMap{
logrus.FieldKeyMsg: "@message",
},
DisableTimestamp: true,
}
assert.Equal(t, expectedFormatter, l.Formatter)
expectedHooks := []logrus.Hook{
(&logging.LogLocationHook{}).WithLoggerName("plugin"),
&logging.HcLogLevelHook{},
}
for _, level := range logrus.AllLevels {
assert.Equal(t, expectedHooks, l.Hooks[level])
}
}

View File

@@ -0,0 +1,44 @@
/*
Copyright 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"github.com/hashicorp/go-plugin"
"google.golang.org/grpc"
proto "github.com/heptio/velero/pkg/plugin/generated"
)
// ObjectStorePlugin is an implementation of go-plugin's Plugin
// interface with support for gRPC for the cloudprovider/ObjectStore
// interface.
type ObjectStorePlugin struct {
plugin.NetRPCUnsupportedPlugin
*pluginBase
}
// GRPCClient returns an ObjectStore gRPC client.
func (p *ObjectStorePlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) {
return newClientDispenser(p.clientLogger, c, newObjectStoreGRPCClient), nil
}
// GRPCServer registers an ObjectStore gRPC server.
func (p *ObjectStorePlugin) GRPCServer(s *grpc.Server) error {
proto.RegisterObjectStoreServer(s, &ObjectStoreGRPCServer{mux: p.serverMux})
return nil
}

View File

@@ -0,0 +1,163 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"io"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
proto "github.com/heptio/velero/pkg/plugin/generated"
)
const byteChunkSize = 16384
// NewObjectStorePlugin construct an ObjectStorePlugin.
func NewObjectStorePlugin(options ...PluginOption) *ObjectStorePlugin {
return &ObjectStorePlugin{
pluginBase: newPluginBase(options...),
}
}
// ObjectStoreGRPCClient implements the ObjectStore interface and uses a
// gRPC client to make calls to the plugin server.
type ObjectStoreGRPCClient struct {
*clientBase
grpcClient proto.ObjectStoreClient
}
func newObjectStoreGRPCClient(base *clientBase, clientConn *grpc.ClientConn) interface{} {
return &ObjectStoreGRPCClient{
clientBase: base,
grpcClient: proto.NewObjectStoreClient(clientConn),
}
}
// Init prepares the ObjectStore for usage using the provided map of
// configuration key-value pairs. It returns an error if the ObjectStore
// cannot be initialized from the provided config.
func (c *ObjectStoreGRPCClient) Init(config map[string]string) error {
_, err := c.grpcClient.Init(context.Background(), &proto.InitRequest{Plugin: c.plugin, Config: config})
return err
}
// PutObject creates a new object using the data in body within the specified
// object storage bucket with the given key.
func (c *ObjectStoreGRPCClient) PutObject(bucket, key string, body io.Reader) error {
stream, err := c.grpcClient.PutObject(context.Background())
if err != nil {
return err
}
// read from the provider io.Reader into chunks, and send each one over
// the gRPC stream
chunk := make([]byte, byteChunkSize)
for {
n, err := body.Read(chunk)
if err == io.EOF {
_, resErr := stream.CloseAndRecv()
return resErr
}
if err != nil {
stream.CloseSend()
return err
}
if err := stream.Send(&proto.PutObjectRequest{Plugin: c.plugin, Bucket: bucket, Key: key, Body: chunk[0:n]}); err != nil {
return err
}
}
}
// GetObject retrieves the object with the given key from the specified
// bucket in object storage.
func (c *ObjectStoreGRPCClient) GetObject(bucket, key string) (io.ReadCloser, error) {
stream, err := c.grpcClient.GetObject(context.Background(), &proto.GetObjectRequest{Plugin: c.plugin, Bucket: bucket, Key: key})
if err != nil {
return nil, err
}
receive := func() ([]byte, error) {
data, err := stream.Recv()
if err != nil {
return nil, err
}
return data.Data, nil
}
close := func() error {
return stream.CloseSend()
}
return &StreamReadCloser{receive: receive, close: close}, nil
}
// ListCommonPrefixes gets a list of all object key prefixes that come
// after the provided prefix and before the provided delimiter (this is
// often used to simulate a directory hierarchy in object storage).
func (c *ObjectStoreGRPCClient) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
req := &proto.ListCommonPrefixesRequest{
Plugin: c.plugin,
Bucket: bucket,
Prefix: prefix,
Delimiter: delimiter,
}
res, err := c.grpcClient.ListCommonPrefixes(context.Background(), req)
if err != nil {
return nil, err
}
return res.Prefixes, nil
}
// ListObjects gets a list of all objects in bucket that have the same prefix.
func (c *ObjectStoreGRPCClient) ListObjects(bucket, prefix string) ([]string, error) {
res, err := c.grpcClient.ListObjects(context.Background(), &proto.ListObjectsRequest{Plugin: c.plugin, Bucket: bucket, Prefix: prefix})
if err != nil {
return nil, err
}
return res.Keys, nil
}
// DeleteObject removes object with the specified key from the given
// bucket.
func (c *ObjectStoreGRPCClient) DeleteObject(bucket, key string) error {
_, err := c.grpcClient.DeleteObject(context.Background(), &proto.DeleteObjectRequest{Plugin: c.plugin, Bucket: bucket, Key: key})
return err
}
// CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl.
func (c *ObjectStoreGRPCClient) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) {
res, err := c.grpcClient.CreateSignedURL(context.Background(), &proto.CreateSignedURLRequest{
Plugin: c.plugin,
Bucket: bucket,
Key: key,
Ttl: int64(ttl),
})
if err != nil {
return "", nil
}
return res.Url, nil
}

View File

@@ -0,0 +1,241 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"io"
"time"
"github.com/pkg/errors"
"golang.org/x/net/context"
proto "github.com/heptio/velero/pkg/plugin/generated"
"github.com/heptio/velero/pkg/plugin/velero"
)
// ObjectStoreGRPCServer implements the proto-generated ObjectStoreServer interface, and accepts
// gRPC calls and forwards them to an implementation of the pluggable interface.
type ObjectStoreGRPCServer struct {
mux *serverMux
}
func (s *ObjectStoreGRPCServer) getImpl(name string) (velero.ObjectStore, error) {
impl, err := s.mux.getHandler(name)
if err != nil {
return nil, err
}
itemAction, ok := impl.(velero.ObjectStore)
if !ok {
return nil, errors.Errorf("%T is not an object store", impl)
}
return itemAction, nil
}
// Init prepares the ObjectStore for usage using the provided map of
// configuration key-value pairs. It returns an error if the ObjectStore
// cannot be initialized from the provided config.
func (s *ObjectStoreGRPCServer) Init(ctx context.Context, req *proto.InitRequest) (response *proto.Empty, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
if err := impl.Init(req.Config); err != nil {
return nil, err
}
return &proto.Empty{}, nil
}
// PutObject creates a new object using the data in body within the specified
// object storage bucket with the given key.
func (s *ObjectStoreGRPCServer) PutObject(stream proto.ObjectStore_PutObjectServer) (err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
// we need to read the first chunk ahead of time to get the bucket and key;
// in our receive method, we'll use `first` on the first call
firstChunk, err := stream.Recv()
if err != nil {
return err
}
impl, err := s.getImpl(firstChunk.Plugin)
if err != nil {
return err
}
bucket := firstChunk.Bucket
key := firstChunk.Key
receive := func() ([]byte, error) {
if firstChunk != nil {
res := firstChunk.Body
firstChunk = nil
return res, nil
}
data, err := stream.Recv()
if err != nil {
return nil, err
}
return data.Body, nil
}
close := func() error {
return nil
}
if err := impl.PutObject(bucket, key, &StreamReadCloser{receive: receive, close: close}); err != nil {
return err
}
return stream.SendAndClose(&proto.Empty{})
}
// GetObject retrieves the object with the given key from the specified
// bucket in object storage.
func (s *ObjectStoreGRPCServer) GetObject(req *proto.GetObjectRequest, stream proto.ObjectStore_GetObjectServer) (err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return err
}
rdr, err := impl.GetObject(req.Bucket, req.Key)
if err != nil {
return err
}
defer rdr.Close()
chunk := make([]byte, byteChunkSize)
for {
n, err := rdr.Read(chunk)
if err != nil && err != io.EOF {
return err
}
if n == 0 {
return nil
}
if err := stream.Send(&proto.Bytes{Data: chunk[0:n]}); err != nil {
return err
}
}
}
// ListCommonPrefixes gets a list of all object key prefixes that start with
// the specified prefix and stop at the next instance of the provided delimiter
// (this is often used to simulate a directory hierarchy in object storage).
func (s *ObjectStoreGRPCServer) ListCommonPrefixes(ctx context.Context, req *proto.ListCommonPrefixesRequest) (response *proto.ListCommonPrefixesResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
prefixes, err := impl.ListCommonPrefixes(req.Bucket, req.Prefix, req.Delimiter)
if err != nil {
return nil, err
}
return &proto.ListCommonPrefixesResponse{Prefixes: prefixes}, nil
}
// ListObjects gets a list of all objects in bucket that have the same prefix.
func (s *ObjectStoreGRPCServer) ListObjects(ctx context.Context, req *proto.ListObjectsRequest) (response *proto.ListObjectsResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
keys, err := impl.ListObjects(req.Bucket, req.Prefix)
if err != nil {
return nil, err
}
return &proto.ListObjectsResponse{Keys: keys}, nil
}
// DeleteObject removes object with the specified key from the given
// bucket.
func (s *ObjectStoreGRPCServer) DeleteObject(ctx context.Context, req *proto.DeleteObjectRequest) (response *proto.Empty, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
if err := impl.DeleteObject(req.Bucket, req.Key); err != nil {
return nil, err
}
return &proto.Empty{}, nil
}
// CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl.
func (s *ObjectStoreGRPCServer) CreateSignedURL(ctx context.Context, req *proto.CreateSignedURLRequest) (response *proto.CreateSignedURLResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
url, err := impl.CreateSignedURL(req.Bucket, req.Key, time.Duration(req.Ttl))
if err != nil {
return nil, err
}
return &proto.CreateSignedURLResponse{Url: url}, nil
}

View File

@@ -0,0 +1,48 @@
/*
Copyright 2018, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"github.com/sirupsen/logrus"
)
type pluginBase struct {
clientLogger logrus.FieldLogger
*serverMux
}
func newPluginBase(options ...PluginOption) *pluginBase {
base := new(pluginBase)
for _, option := range options {
option(base)
}
return base
}
type PluginOption func(base *pluginBase)
func ClientLogger(logger logrus.FieldLogger) PluginOption {
return func(base *pluginBase) {
base.clientLogger = logger
}
}
func serverLogger(logger logrus.FieldLogger) PluginOption {
return func(base *pluginBase) {
base.serverMux = newServerMux(logger)
}
}

View File

@@ -0,0 +1,40 @@
/*
Copyright 2018, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/heptio/velero/pkg/util/test"
)
func TestClientLogger(t *testing.T) {
base := &pluginBase{}
logger := test.NewLogger()
f := ClientLogger(logger)
f(base)
assert.Equal(t, logger, base.clientLogger)
}
func TestServerLogger(t *testing.T) {
base := &pluginBase{}
logger := test.NewLogger()
f := serverLogger(logger)
f(base)
assert.Equal(t, newServerMux(logger), base.serverMux)
}

View File

@@ -0,0 +1,56 @@
/*
Copyright 2018, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"k8s.io/apimachinery/pkg/util/sets"
)
// PluginKind is a type alias for a string that describes
// the kind of a Velero-supported plugin.
type PluginKind string
// String returns the string for k.
func (k PluginKind) String() string {
return string(k)
}
const (
// PluginKindObjectStore represents an object store plugin.
PluginKindObjectStore PluginKind = "ObjectStore"
// PluginKindBlockStore represents a block store plugin.
PluginKindBlockStore PluginKind = "BlockStore"
// PluginKindBackupItemAction represents a backup item action plugin.
PluginKindBackupItemAction PluginKind = "BackupItemAction"
// PluginKindRestoreItemAction represents a restore item action plugin.
PluginKindRestoreItemAction PluginKind = "RestoreItemAction"
// PluginKindPluginLister represents a plugin lister plugin.
PluginKindPluginLister PluginKind = "PluginLister"
)
// allPluginKinds contains all the valid plugin kinds that Velero supports, excluding PluginLister because that is not a
// kind that a developer would ever need to implement (it's handled by Velero and the Velero plugin library code).
var allPluginKinds = sets.NewString(
PluginKindObjectStore.String(),
PluginKindBlockStore.String(),
PluginKindBackupItemAction.String(),
PluginKindRestoreItemAction.String(),
)

View File

@@ -0,0 +1,34 @@
/*
Copyright 2018, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/sets"
)
func TestAllPluginKinds(t *testing.T) {
expected := sets.NewString(
PluginKindObjectStore.String(),
PluginKindBlockStore.String(),
PluginKindBackupItemAction.String(),
PluginKindRestoreItemAction.String(),
)
assert.True(t, expected.Equal(allPluginKinds))
}

View File

@@ -0,0 +1,143 @@
/*
Copyright 2018, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
plugin "github.com/hashicorp/go-plugin"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
proto "github.com/heptio/velero/pkg/plugin/generated"
)
// PluginIdentifier uniquely identifies a plugin by command, kind, and name.
type PluginIdentifier struct {
Command string
Kind PluginKind
Name string
}
// PluginLister lists plugins.
type PluginLister interface {
ListPlugins() ([]PluginIdentifier, error)
}
// pluginLister implements PluginLister.
type pluginLister struct {
plugins []PluginIdentifier
}
// NewPluginLister returns a new PluginLister for plugins.
func NewPluginLister(plugins ...PluginIdentifier) PluginLister {
return &pluginLister{plugins: plugins}
}
// ListPlugins returns the pluginLister's plugins.
func (pl *pluginLister) ListPlugins() ([]PluginIdentifier, error) {
return pl.plugins, nil
}
// PluginListerPlugin is a go-plugin Plugin for a PluginLister.
type PluginListerPlugin struct {
plugin.NetRPCUnsupportedPlugin
impl PluginLister
}
// NewPluginListerPlugin creates a new PluginListerPlugin with impl as the server-side implementation.
func NewPluginListerPlugin(impl PluginLister) *PluginListerPlugin {
return &PluginListerPlugin{impl: impl}
}
//////////////////////////////////////////////////////////////////////////////
// client code
//////////////////////////////////////////////////////////////////////////////
// GRPCClient returns a PluginLister gRPC client.
func (p *PluginListerPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) {
return &PluginListerGRPCClient{grpcClient: proto.NewPluginListerClient(c)}, nil
}
// PluginListerGRPCClient implements PluginLister and uses a gRPC client to make calls to the plugin server.
type PluginListerGRPCClient struct {
grpcClient proto.PluginListerClient
}
// ListPlugins uses the gRPC client to request the list of plugins from the server. It translates the protobuf response
// to []PluginIdentifier.
func (c *PluginListerGRPCClient) ListPlugins() ([]PluginIdentifier, error) {
resp, err := c.grpcClient.ListPlugins(context.Background(), &proto.Empty{})
if err != nil {
return nil, err
}
ret := make([]PluginIdentifier, len(resp.Plugins))
for i, id := range resp.Plugins {
if !allPluginKinds.Has(id.Kind) {
return nil, errors.Errorf("invalid plugin kind: %s", id.Kind)
}
ret[i] = PluginIdentifier{
Command: id.Command,
Kind: PluginKind(id.Kind),
Name: id.Name,
}
}
return ret, nil
}
//////////////////////////////////////////////////////////////////////////////
// server code
//////////////////////////////////////////////////////////////////////////////
// GRPCServer registers a PluginLister gRPC server.
func (p *PluginListerPlugin) GRPCServer(s *grpc.Server) error {
proto.RegisterPluginListerServer(s, &PluginListerGRPCServer{impl: p.impl})
return nil
}
// PluginListerGRPCServer implements the proto-generated PluginLister gRPC service interface. It accepts gRPC calls,
// forwards them to impl, and translates the responses to protobuf.
type PluginListerGRPCServer struct {
impl PluginLister
}
// ListPlugins returns a list of registered plugins, delegating to s.impl to perform the listing.
func (s *PluginListerGRPCServer) ListPlugins(ctx context.Context, req *proto.Empty) (*proto.ListPluginsResponse, error) {
list, err := s.impl.ListPlugins()
if err != nil {
return nil, err
}
plugins := make([]*proto.PluginIdentifier, len(list))
for i, id := range list {
if !allPluginKinds.Has(id.Kind.String()) {
return nil, errors.Errorf("invalid plugin kind: %s", id.Kind)
}
plugins[i] = &proto.PluginIdentifier{
Command: id.Command,
Kind: id.Kind.String(),
Name: id.Name,
}
}
ret := &proto.ListPluginsResponse{
Plugins: plugins,
}
return ret, nil
}

View File

@@ -0,0 +1,43 @@
/*
Copyright 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"github.com/hashicorp/go-plugin"
"google.golang.org/grpc"
proto "github.com/heptio/velero/pkg/plugin/generated"
)
// RestoreItemActionPlugin is an implementation of go-plugin's Plugin
// interface with support for gRPC for the restore/ItemAction
// interface.
type RestoreItemActionPlugin struct {
plugin.NetRPCUnsupportedPlugin
*pluginBase
}
// GRPCClient returns a RestoreItemAction gRPC client.
func (p *RestoreItemActionPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) {
return newClientDispenser(p.clientLogger, c, newRestoreItemActionGRPCClient), nil
}
// GRPCServer registers a RestoreItemAction gRPC server.
func (p *RestoreItemActionPlugin) GRPCServer(s *grpc.Server) error {
proto.RegisterRestoreItemActionServer(s, &RestoreItemActionGRPCServer{mux: p.serverMux})
return nil
}

View File

@@ -0,0 +1,111 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"encoding/json"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
proto "github.com/heptio/velero/pkg/plugin/generated"
"github.com/heptio/velero/pkg/plugin/velero"
)
var _ velero.RestoreItemAction = &RestoreItemActionGRPCClient{}
// NewRestoreItemActionPlugin constructs a RestoreItemActionPlugin.
func NewRestoreItemActionPlugin(options ...PluginOption) *RestoreItemActionPlugin {
return &RestoreItemActionPlugin{
pluginBase: newPluginBase(options...),
}
}
// RestoreItemActionGRPCClient implements the backup/ItemAction interface and uses a
// gRPC client to make calls to the plugin server.
type RestoreItemActionGRPCClient struct {
*clientBase
grpcClient proto.RestoreItemActionClient
}
func newRestoreItemActionGRPCClient(base *clientBase, clientConn *grpc.ClientConn) interface{} {
return &RestoreItemActionGRPCClient{
clientBase: base,
grpcClient: proto.NewRestoreItemActionClient(clientConn),
}
}
func (c *RestoreItemActionGRPCClient) AppliesTo() (velero.ResourceSelector, error) {
res, err := c.grpcClient.AppliesTo(context.Background(), &proto.AppliesToRequest{Plugin: c.plugin})
if err != nil {
return velero.ResourceSelector{}, err
}
return velero.ResourceSelector{
IncludedNamespaces: res.IncludedNamespaces,
ExcludedNamespaces: res.ExcludedNamespaces,
IncludedResources: res.IncludedResources,
ExcludedResources: res.ExcludedResources,
LabelSelector: res.Selector,
}, nil
}
func (c *RestoreItemActionGRPCClient) Execute(input *velero.RestoreItemActionExecuteInput) (*velero.RestoreItemActionExecuteOutput, error) {
itemJSON, err := json.Marshal(input.Item.UnstructuredContent())
if err != nil {
return nil, err
}
itemFromBackupJSON, err := json.Marshal(input.ItemFromBackup.UnstructuredContent())
if err != nil {
return nil, err
}
restoreJSON, err := json.Marshal(input.Restore)
if err != nil {
return nil, err
}
req := &proto.RestoreExecuteRequest{
Plugin: c.plugin,
Item: itemJSON,
ItemFromBackup: itemFromBackupJSON,
Restore: restoreJSON,
}
res, err := c.grpcClient.Execute(context.Background(), req)
if err != nil {
return nil, err
}
var updatedItem unstructured.Unstructured
if err := json.Unmarshal(res.Item, &updatedItem); err != nil {
return nil, err
}
var warning error
if res.Warning != "" {
warning = errors.New(res.Warning)
}
return &velero.RestoreItemActionExecuteOutput{
UpdatedItem: &updatedItem,
Warning: warning,
}, nil
}

View File

@@ -0,0 +1,130 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"encoding/json"
"github.com/pkg/errors"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
api "github.com/heptio/velero/pkg/apis/velero/v1"
proto "github.com/heptio/velero/pkg/plugin/generated"
"github.com/heptio/velero/pkg/plugin/velero"
)
// RestoreItemActionGRPCServer implements the proto-generated RestoreItemActionServer interface, and accepts
// gRPC calls and forwards them to an implementation of the pluggable interface.
type RestoreItemActionGRPCServer struct {
mux *serverMux
}
func (s *RestoreItemActionGRPCServer) getImpl(name string) (velero.RestoreItemAction, error) {
impl, err := s.mux.getHandler(name)
if err != nil {
return nil, err
}
itemAction, ok := impl.(velero.RestoreItemAction)
if !ok {
return nil, errors.Errorf("%T is not a restore item action", impl)
}
return itemAction, nil
}
func (s *RestoreItemActionGRPCServer) AppliesTo(ctx context.Context, req *proto.AppliesToRequest) (response *proto.AppliesToResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
appliesTo, err := impl.AppliesTo()
if err != nil {
return nil, err
}
return &proto.AppliesToResponse{
IncludedNamespaces: appliesTo.IncludedNamespaces,
ExcludedNamespaces: appliesTo.ExcludedNamespaces,
IncludedResources: appliesTo.IncludedResources,
ExcludedResources: appliesTo.ExcludedResources,
Selector: appliesTo.LabelSelector,
}, nil
}
func (s *RestoreItemActionGRPCServer) Execute(ctx context.Context, req *proto.RestoreExecuteRequest) (response *proto.RestoreExecuteResponse, err error) {
defer func() {
if recoveredErr := handlePanic(recover()); recoveredErr != nil {
err = recoveredErr
}
}()
impl, err := s.getImpl(req.Plugin)
if err != nil {
return nil, err
}
var (
item unstructured.Unstructured
itemFromBackup unstructured.Unstructured
restoreObj api.Restore
)
if err := json.Unmarshal(req.Item, &item); err != nil {
return nil, err
}
if err := json.Unmarshal(req.ItemFromBackup, &itemFromBackup); err != nil {
return nil, err
}
if err := json.Unmarshal(req.Restore, &restoreObj); err != nil {
return nil, err
}
executeOutput, err := impl.Execute(&velero.RestoreItemActionExecuteInput{
Item: &item,
ItemFromBackup: &itemFromBackup,
Restore: &restoreObj,
})
if err != nil {
return nil, err
}
updatedItem, err := json.Marshal(executeOutput.UpdatedItem)
if err != nil {
return nil, err
}
var warnMessage string
if executeOutput.Warning != nil {
warnMessage = executeOutput.Warning.Error()
}
return &proto.RestoreExecuteResponse{
Item: updatedItem,
Warning: warnMessage,
}, nil
}

View File

@@ -0,0 +1,201 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"fmt"
"os"
"strings"
plugin "github.com/hashicorp/go-plugin"
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/heptio/velero/pkg/util/logging"
)
// Handshake is configuration information that allows go-plugin clients and servers to perform a handshake.
//
// TODO(ncdc): this should probably be a function so it can't be mutated, and we should probably move it to
// handshake.go.
var Handshake = plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "ARK_PLUGIN",
MagicCookieValue: "hello",
}
// Server serves registered plugin implementations.
type Server interface {
// BindFlags defines the plugin server's command-line flags
// on the provided FlagSet. If you're not sure what flag set
// to use, pflag.CommandLine is the default set of command-line
// flags.
//
// This method must be called prior to calling .Serve().
BindFlags(flags *pflag.FlagSet) Server
// RegisterBackupItemAction registers a backup item action.
RegisterBackupItemAction(name string, initializer HandlerInitializer) Server
// RegisterBackupItemActions registers multiple backup item actions.
RegisterBackupItemActions(map[string]HandlerInitializer) Server
// RegisterBlockStore registers a block store.
RegisterBlockStore(name string, initializer HandlerInitializer) Server
// RegisterBlockStores registers multiple block stores.
RegisterBlockStores(map[string]HandlerInitializer) Server
// RegisterObjectStore registers an object store.
RegisterObjectStore(name string, initializer HandlerInitializer) Server
// RegisterObjectStores registers multiple object stores.
RegisterObjectStores(map[string]HandlerInitializer) Server
// RegisterRestoreItemAction registers a restore item action.
RegisterRestoreItemAction(name string, initializer HandlerInitializer) Server
// RegisterRestoreItemActions registers multiple restore item actions.
RegisterRestoreItemActions(map[string]HandlerInitializer) Server
// Server runs the plugin server.
Serve()
}
// server implements Server.
type server struct {
log *logrus.Logger
logLevelFlag *logging.LevelFlag
flagSet *pflag.FlagSet
backupItemAction *BackupItemActionPlugin
blockStore *BlockStorePlugin
objectStore *ObjectStorePlugin
restoreItemAction *RestoreItemActionPlugin
}
// NewServer returns a new Server
func NewServer() Server {
log := newLogger()
return &server{
log: log,
logLevelFlag: logging.LogLevelFlag(log.Level),
backupItemAction: NewBackupItemActionPlugin(serverLogger(log)),
blockStore: NewBlockStorePlugin(serverLogger(log)),
objectStore: NewObjectStorePlugin(serverLogger(log)),
restoreItemAction: NewRestoreItemActionPlugin(serverLogger(log)),
}
}
func (s *server) BindFlags(flags *pflag.FlagSet) Server {
flags.Var(s.logLevelFlag, "log-level", fmt.Sprintf("the level at which to log. Valid values are %s.", strings.Join(s.logLevelFlag.AllowedValues(), ", ")))
s.flagSet = flags
return s
}
func (s *server) RegisterBackupItemAction(name string, initializer HandlerInitializer) Server {
s.backupItemAction.register(name, initializer)
return s
}
func (s *server) RegisterBackupItemActions(m map[string]HandlerInitializer) Server {
for name := range m {
s.RegisterBackupItemAction(name, m[name])
}
return s
}
func (s *server) RegisterBlockStore(name string, initializer HandlerInitializer) Server {
s.blockStore.register(name, initializer)
return s
}
func (s *server) RegisterBlockStores(m map[string]HandlerInitializer) Server {
for name := range m {
s.RegisterBlockStore(name, m[name])
}
return s
}
func (s *server) RegisterObjectStore(name string, initializer HandlerInitializer) Server {
s.objectStore.register(name, initializer)
return s
}
func (s *server) RegisterObjectStores(m map[string]HandlerInitializer) Server {
for name := range m {
s.RegisterObjectStore(name, m[name])
}
return s
}
func (s *server) RegisterRestoreItemAction(name string, initializer HandlerInitializer) Server {
s.restoreItemAction.register(name, initializer)
return s
}
func (s *server) RegisterRestoreItemActions(m map[string]HandlerInitializer) Server {
for name := range m {
s.RegisterRestoreItemAction(name, m[name])
}
return s
}
// getNames returns a list of PluginIdentifiers registered with plugin.
func getNames(command string, kind PluginKind, plugin Interface) []PluginIdentifier {
var pluginIdentifiers []PluginIdentifier
for _, name := range plugin.names() {
id := PluginIdentifier{Command: command, Kind: kind, Name: name}
pluginIdentifiers = append(pluginIdentifiers, id)
}
return pluginIdentifiers
}
func (s *server) Serve() {
if s.flagSet != nil && !s.flagSet.Parsed() {
s.log.Infof("Parsing flags")
s.flagSet.Parse(os.Args[1:])
}
s.log.Level = s.logLevelFlag.Parse()
s.log.Infof("Setting log level to %s", strings.ToUpper(s.log.Level.String()))
command := os.Args[0]
var pluginIdentifiers []PluginIdentifier
pluginIdentifiers = append(pluginIdentifiers, getNames(command, PluginKindBackupItemAction, s.backupItemAction)...)
pluginIdentifiers = append(pluginIdentifiers, getNames(command, PluginKindBlockStore, s.blockStore)...)
pluginIdentifiers = append(pluginIdentifiers, getNames(command, PluginKindObjectStore, s.objectStore)...)
pluginIdentifiers = append(pluginIdentifiers, getNames(command, PluginKindRestoreItemAction, s.restoreItemAction)...)
pluginLister := NewPluginLister(pluginIdentifiers...)
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: Handshake,
Plugins: map[string]plugin.Plugin{
string(PluginKindBackupItemAction): s.backupItemAction,
string(PluginKindBlockStore): s.blockStore,
string(PluginKindObjectStore): s.objectStore,
string(PluginKindPluginLister): NewPluginListerPlugin(pluginLister),
string(PluginKindRestoreItemAction): s.restoreItemAction,
},
GRPCServer: plugin.DefaultGRPCServer,
})
}

View File

@@ -0,0 +1,77 @@
/*
Copyright 2018, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
)
// HandlerInitializer is a function that initializes and returns a new instance of one of Velero's plugin interfaces
// (ObjectStore, BlockStore, BackupItemAction, RestoreItemAction).
type HandlerInitializer func(logger logrus.FieldLogger) (interface{}, error)
// serverMux manages multiple implementations of a single plugin kind, such as pod and pvc BackupItemActions.
type serverMux struct {
kind PluginKind
initializers map[string]HandlerInitializer
handlers map[string]interface{}
serverLog logrus.FieldLogger
}
// newServerMux returns a new serverMux.
func newServerMux(logger logrus.FieldLogger) *serverMux {
return &serverMux{
initializers: make(map[string]HandlerInitializer),
handlers: make(map[string]interface{}),
serverLog: logger,
}
}
// register registers the initializer for name.
func (m *serverMux) register(name string, f HandlerInitializer) {
// TODO(ncdc): return an error on duplicate registrations for the same name.
m.initializers[name] = f
}
// names returns a list of all registered implementations.
func (m *serverMux) names() []string {
return sets.StringKeySet(m.initializers).List()
}
// getHandler returns the instance for a plugin with the given name. If an instance has already been initialized,
// that is returned. Otherwise, the instance is initialized by calling its initialization function.
func (m *serverMux) getHandler(name string) (interface{}, error) {
if instance, found := m.handlers[name]; found {
return instance, nil
}
initializer, found := m.initializers[name]
if !found {
return nil, errors.Errorf("unknown %v plugin: %s", m.kind, name)
}
instance, err := initializer(m.serverLog)
if err != nil {
return nil, err
}
m.handlers[name] = instance
return m.handlers[name], nil
}

View File

@@ -0,0 +1,75 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"bytes"
"io"
)
// ReceiveFunc is a function that either returns a slice
// of an arbitrary number of bytes OR an error. Returning
// an io.EOF means there is no more data to be read; any
// other error is considered an actual error.
type ReceiveFunc func() ([]byte, error)
// CloseFunc is used to signal to the source of data that
// the StreamReadCloser has been closed.
type CloseFunc func() error
// StreamReadCloser wraps a ReceiveFunc and a CloseSendFunc
// to implement io.ReadCloser.
type StreamReadCloser struct {
buf *bytes.Buffer
receive ReceiveFunc
close CloseFunc
}
func (s *StreamReadCloser) Read(p []byte) (n int, err error) {
for {
// if buf exists and holds at least as much as we're trying to read,
// read from the buffer
if s.buf != nil && s.buf.Len() >= len(p) {
return s.buf.Read(p)
}
// if buf is nil, create it
if s.buf == nil {
s.buf = new(bytes.Buffer)
}
// buf exists but doesn't hold enough data to fill p, so
// receive again. If we get an EOF, return what's in the
// buffer; else, write the new data to the buffer and
// try another read.
data, err := s.receive()
if err == io.EOF {
return s.buf.Read(p)
}
if err != nil {
return 0, err
}
if _, err := s.buf.Write(data); err != nil {
return 0, err
}
}
}
func (s *StreamReadCloser) Close() error {
return s.close()
}

View File

@@ -0,0 +1,66 @@
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"bytes"
"io/ioutil"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type stringByteReceiver struct {
buf *bytes.Buffer
chunkSize int
}
func (r *stringByteReceiver) Receive() ([]byte, error) {
chunk := make([]byte, r.chunkSize)
n, err := r.buf.Read(chunk)
if err != nil {
return nil, err
}
return chunk[0:n], nil
}
func (r *stringByteReceiver) CloseSend() error {
r.buf = nil
return nil
}
func TestStreamReader(t *testing.T) {
s := "hello world, it's me, streamreader!!!!!"
rdr := &stringByteReceiver{
buf: bytes.NewBufferString(s),
chunkSize: 3,
}
sr := &StreamReadCloser{
receive: rdr.Receive,
close: rdr.CloseSend,
}
res, err := ioutil.ReadAll(sr)
require.Nil(t, err)
assert.Equal(t, s, string(res))
}