mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-05 04:55:22 +00:00
Merge pull request #825 from skriss/sync-controller
sync controller improvements
This commit is contained in:
@@ -16,7 +16,10 @@ limitations under the License.
|
||||
|
||||
package v1
|
||||
|
||||
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
// +genclient
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
@@ -89,6 +92,8 @@ const (
|
||||
|
||||
// BackupStorageLocationStatus describes the current status of an Ark BackupStorageLocation.
|
||||
type BackupStorageLocationStatus struct {
|
||||
Phase BackupStorageLocationPhase `json:"phase,omitempty"`
|
||||
AccessMode BackupStorageLocationAccessMode `json:"accessMode,omitempty"`
|
||||
Phase BackupStorageLocationPhase `json:"phase,omitempty"`
|
||||
AccessMode BackupStorageLocationAccessMode `json:"accessMode,omitempty"`
|
||||
LastSyncedRevision types.UID `json:"lastSyncedRevision,omitempty"`
|
||||
LastSyncedTime metav1.Time `json:"lastSyncedTime,omitempty"`
|
||||
}
|
||||
|
||||
@@ -307,7 +307,7 @@ func (in *BackupStorageLocation) DeepCopyInto(out *BackupStorageLocation) {
|
||||
out.TypeMeta = in.TypeMeta
|
||||
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
|
||||
in.Spec.DeepCopyInto(&out.Spec)
|
||||
out.Status = in.Status
|
||||
in.Status.DeepCopyInto(&out.Status)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -389,6 +389,7 @@ func (in *BackupStorageLocationSpec) DeepCopy() *BackupStorageLocationSpec {
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *BackupStorageLocationStatus) DeepCopyInto(out *BackupStorageLocationStatus) {
|
||||
*out = *in
|
||||
in.LastSyncedTime.DeepCopyInto(&out.LastSyncedTime)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -450,7 +450,7 @@ func (s *server) loadConfig() (*api.Config, error) {
|
||||
}
|
||||
|
||||
const (
|
||||
defaultBackupSyncPeriod = 60 * time.Minute
|
||||
defaultBackupSyncPeriod = time.Minute
|
||||
defaultPodVolumeOperationTimeout = 60 * time.Minute
|
||||
)
|
||||
|
||||
@@ -601,6 +601,7 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B
|
||||
}
|
||||
|
||||
backupSyncController := controller.NewBackupSyncController(
|
||||
s.arkClient.ArkV1(),
|
||||
s.arkClient.ArkV1(),
|
||||
s.sharedInformerFactory.Ark().V1().Backups(),
|
||||
s.sharedInformerFactory.Ark().V1().BackupStorageLocations(),
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package controller
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -25,6 +26,7 @@ import (
|
||||
kuberrs "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
@@ -34,14 +36,14 @@ import (
|
||||
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
|
||||
"github.com/heptio/ark/pkg/persistence"
|
||||
"github.com/heptio/ark/pkg/plugin"
|
||||
"github.com/heptio/ark/pkg/util/kube"
|
||||
"github.com/heptio/ark/pkg/util/stringslice"
|
||||
)
|
||||
|
||||
type backupSyncController struct {
|
||||
*genericController
|
||||
|
||||
client arkv1client.BackupsGetter
|
||||
backupClient arkv1client.BackupsGetter
|
||||
backupLocationClient arkv1client.BackupStorageLocationsGetter
|
||||
backupLister listers.BackupLister
|
||||
backupStorageLocationLister listers.BackupStorageLocationLister
|
||||
namespace string
|
||||
@@ -51,7 +53,8 @@ type backupSyncController struct {
|
||||
}
|
||||
|
||||
func NewBackupSyncController(
|
||||
client arkv1client.BackupsGetter,
|
||||
backupClient arkv1client.BackupsGetter,
|
||||
backupLocationClient arkv1client.BackupStorageLocationsGetter,
|
||||
backupInformer informers.BackupInformer,
|
||||
backupStorageLocationInformer informers.BackupStorageLocationInformer,
|
||||
syncPeriod time.Duration,
|
||||
@@ -67,7 +70,8 @@ func NewBackupSyncController(
|
||||
|
||||
c := &backupSyncController{
|
||||
genericController: newGenericController("backup-sync", logger),
|
||||
client: client,
|
||||
backupClient: backupClient,
|
||||
backupLocationClient: backupLocationClient,
|
||||
namespace: namespace,
|
||||
defaultBackupLocation: defaultBackupLocation,
|
||||
backupLister: backupInformer.Lister(),
|
||||
@@ -91,8 +95,35 @@ func NewBackupSyncController(
|
||||
|
||||
const gcFinalizer = "gc.ark.heptio.com"
|
||||
|
||||
func shouldSync(location *arkv1api.BackupStorageLocation, now time.Time, backupStore persistence.BackupStore, log logrus.FieldLogger) (bool, string) {
|
||||
log = log.WithFields(map[string]interface{}{
|
||||
"lastSyncedRevision": location.Status.LastSyncedRevision,
|
||||
"lastSyncedTime": location.Status.LastSyncedTime.Time.Format(time.RFC1123Z),
|
||||
})
|
||||
|
||||
revision, err := backupStore.GetRevision()
|
||||
if err != nil {
|
||||
log.WithError(err).Info("Error getting backup store's revision, syncing")
|
||||
return true, ""
|
||||
}
|
||||
log = log.WithField("revision", revision)
|
||||
|
||||
if location.Status.LastSyncedTime.Add(time.Hour).Before(now) {
|
||||
log.Infof("Backup location hasn't been synced in more than %s, syncing", time.Hour)
|
||||
return true, revision
|
||||
}
|
||||
|
||||
if string(location.Status.LastSyncedRevision) != revision {
|
||||
log.Info("Backup location hasn't been synced since its last modification, syncing")
|
||||
return true, revision
|
||||
}
|
||||
|
||||
log.Debug("Backup location's contents haven't changed since last sync, not syncing")
|
||||
return false, ""
|
||||
}
|
||||
|
||||
func (c *backupSyncController) run() {
|
||||
c.logger.Info("Syncing backups from backup storage into cluster")
|
||||
c.logger.Info("Checking for backup storage locations to sync into cluster")
|
||||
|
||||
locations, err := c.backupStorageLocationLister.BackupStorageLocations(c.namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
@@ -103,10 +134,10 @@ func (c *backupSyncController) run() {
|
||||
locations = orderedBackupLocations(locations, c.defaultBackupLocation)
|
||||
|
||||
pluginManager := c.newPluginManager(c.logger)
|
||||
defer pluginManager.CleanupClients()
|
||||
|
||||
for _, location := range locations {
|
||||
log := c.logger.WithField("backupLocation", location.Name)
|
||||
log.Info("Syncing backups from backup location")
|
||||
|
||||
backupStore, err := c.newBackupStore(location, pluginManager, log)
|
||||
if err != nil {
|
||||
@@ -114,24 +145,26 @@ func (c *backupSyncController) run() {
|
||||
continue
|
||||
}
|
||||
|
||||
backupsInBackupStore, err := backupStore.ListBackups()
|
||||
ok, revision := shouldSync(location, time.Now().UTC(), backupStore, log)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
res, err := backupStore.ListBackups()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error listing backups in backup store")
|
||||
continue
|
||||
}
|
||||
backupStoreBackups := sets.NewString(res...)
|
||||
log.WithField("backupCount", len(backupStoreBackups)).Info("Got backups from backup store")
|
||||
|
||||
log.WithField("backupCount", len(backupsInBackupStore)).Info("Got backups from backup store")
|
||||
|
||||
cloudBackupNames := sets.NewString()
|
||||
for _, cloudBackup := range backupsInBackupStore {
|
||||
log = log.WithField("backup", kube.NamespaceAndName(cloudBackup))
|
||||
log.Debug("Checking cloud backup to see if it needs to be synced into the cluster")
|
||||
|
||||
cloudBackupNames.Insert(cloudBackup.Name)
|
||||
for backupName := range backupStoreBackups {
|
||||
log = log.WithField("backup", backupName)
|
||||
log.Debug("Checking backup store backup to see if it needs to be synced into the cluster")
|
||||
|
||||
// use the controller's namespace when getting the backup because that's where we
|
||||
// are syncing backups to, regardless of the namespace of the cloud backup.
|
||||
_, err := c.client.Backups(c.namespace).Get(cloudBackup.Name, metav1.GetOptions{})
|
||||
_, err := c.backupClient.Backups(c.namespace).Get(backupName, metav1.GetOptions{})
|
||||
if err == nil {
|
||||
log.Debug("Backup already exists in cluster")
|
||||
continue
|
||||
@@ -140,22 +173,28 @@ func (c *backupSyncController) run() {
|
||||
log.WithError(errors.WithStack(err)).Error("Error getting backup from client, proceeding with sync into cluster")
|
||||
}
|
||||
|
||||
backup, err := backupStore.GetBackupMetadata(backupName)
|
||||
if err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error getting backup metadata from backup store")
|
||||
continue
|
||||
}
|
||||
|
||||
// remove the pre-v0.8.0 gcFinalizer if it exists
|
||||
// TODO(1.0): remove this
|
||||
cloudBackup.Finalizers = stringslice.Except(cloudBackup.Finalizers, gcFinalizer)
|
||||
cloudBackup.Namespace = c.namespace
|
||||
cloudBackup.ResourceVersion = ""
|
||||
backup.Finalizers = stringslice.Except(backup.Finalizers, gcFinalizer)
|
||||
backup.Namespace = c.namespace
|
||||
backup.ResourceVersion = ""
|
||||
|
||||
// update the StorageLocation field and label since the name of the location
|
||||
// may be different in this cluster than in the cluster that created the
|
||||
// backup.
|
||||
cloudBackup.Spec.StorageLocation = location.Name
|
||||
if cloudBackup.Labels == nil {
|
||||
cloudBackup.Labels = make(map[string]string)
|
||||
backup.Spec.StorageLocation = location.Name
|
||||
if backup.Labels == nil {
|
||||
backup.Labels = make(map[string]string)
|
||||
}
|
||||
cloudBackup.Labels[arkv1api.StorageLocationLabel] = cloudBackup.Spec.StorageLocation
|
||||
backup.Labels[arkv1api.StorageLocationLabel] = backup.Spec.StorageLocation
|
||||
|
||||
_, err = c.client.Backups(cloudBackup.Namespace).Create(cloudBackup)
|
||||
_, err = c.backupClient.Backups(backup.Namespace).Create(backup)
|
||||
switch {
|
||||
case err != nil && kuberrs.IsAlreadyExists(err):
|
||||
log.Debug("Backup already exists in cluster")
|
||||
@@ -166,7 +205,30 @@ func (c *backupSyncController) run() {
|
||||
}
|
||||
}
|
||||
|
||||
c.deleteOrphanedBackups(location.Name, cloudBackupNames, log)
|
||||
c.deleteOrphanedBackups(location.Name, backupStoreBackups, log)
|
||||
|
||||
// update the location's status's last-synced fields
|
||||
patch := map[string]interface{}{
|
||||
"status": map[string]interface{}{
|
||||
"lastSyncedTime": time.Now().UTC(),
|
||||
"lastSyncedRevision": revision,
|
||||
},
|
||||
}
|
||||
|
||||
patchBytes, err := json.Marshal(patch)
|
||||
if err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error marshaling last-synced patch to JSON")
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err = c.backupLocationClient.BackupStorageLocations(c.namespace).Patch(
|
||||
location.Name,
|
||||
types.MergePatchType,
|
||||
patchBytes,
|
||||
); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error patching backup location's last-synced time and revision")
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,7 +254,7 @@ func (c *backupSyncController) deleteOrphanedBackups(locationName string, cloudB
|
||||
continue
|
||||
}
|
||||
|
||||
if err := c.client.Backups(backup.Namespace).Delete(backup.Name, nil); err != nil {
|
||||
if err := c.backupClient.Backups(backup.Namespace).Delete(backup.Name, nil); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error deleting orphaned backup from cluster")
|
||||
} else {
|
||||
log.Debug("Deleted orphaned backup from cluster")
|
||||
|
||||
@@ -20,11 +20,14 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
core "k8s.io/client-go/testing"
|
||||
|
||||
@@ -169,6 +172,7 @@ func TestBackupSyncControllerRun(t *testing.T) {
|
||||
)
|
||||
|
||||
c := NewBackupSyncController(
|
||||
client.ArkV1(),
|
||||
client.ArkV1(),
|
||||
sharedInformers.Ark().V1().Backups(),
|
||||
sharedInformers.Ark().V1().BackupStorageLocations(),
|
||||
@@ -195,7 +199,14 @@ func TestBackupSyncControllerRun(t *testing.T) {
|
||||
backupStore, ok := backupStores[location.Name]
|
||||
require.True(t, ok, "no mock backup store for location %s", location.Name)
|
||||
|
||||
backupStore.On("ListBackups").Return(test.cloudBackups[location.Spec.ObjectStorage.Bucket], nil)
|
||||
backupStore.On("GetRevision").Return("foo", nil)
|
||||
|
||||
var backupNames []string
|
||||
for _, b := range test.cloudBackups[location.Spec.ObjectStorage.Bucket] {
|
||||
backupNames = append(backupNames, b.Name)
|
||||
backupStore.On("GetBackupMetadata", b.Name).Return(b, nil)
|
||||
}
|
||||
backupStore.On("ListBackups").Return(backupNames, nil)
|
||||
}
|
||||
|
||||
for _, existingBackup := range test.existingBackups {
|
||||
@@ -335,6 +346,7 @@ func TestDeleteOrphanedBackups(t *testing.T) {
|
||||
)
|
||||
|
||||
c := NewBackupSyncController(
|
||||
client.ArkV1(),
|
||||
client.ArkV1(),
|
||||
sharedInformers.Ark().V1().Backups(),
|
||||
sharedInformers.Ark().V1().BackupStorageLocations(),
|
||||
@@ -379,6 +391,95 @@ func TestDeleteOrphanedBackups(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldSync(t *testing.T) {
|
||||
c := clock.NewFakeClock(time.Now())
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
location *arkv1api.BackupStorageLocation
|
||||
backupStoreRevision string
|
||||
now time.Time
|
||||
expectSync bool
|
||||
expectedRevision string
|
||||
}{
|
||||
{
|
||||
name: "BSL with no last-synced metadata should sync",
|
||||
location: &arkv1api.BackupStorageLocation{},
|
||||
backupStoreRevision: "foo",
|
||||
now: c.Now(),
|
||||
expectSync: true,
|
||||
expectedRevision: "foo",
|
||||
},
|
||||
{
|
||||
name: "BSL with unchanged revision last synced more than an hour ago should sync",
|
||||
location: &arkv1api.BackupStorageLocation{
|
||||
Status: arkv1api.BackupStorageLocationStatus{
|
||||
LastSyncedRevision: types.UID("foo"),
|
||||
LastSyncedTime: metav1.Time{Time: c.Now().Add(-61 * time.Minute)},
|
||||
},
|
||||
},
|
||||
backupStoreRevision: "foo",
|
||||
now: c.Now(),
|
||||
expectSync: true,
|
||||
expectedRevision: "foo",
|
||||
},
|
||||
{
|
||||
name: "BSL with unchanged revision last synced less than an hour ago should not sync",
|
||||
location: &arkv1api.BackupStorageLocation{
|
||||
Status: arkv1api.BackupStorageLocationStatus{
|
||||
LastSyncedRevision: types.UID("foo"),
|
||||
LastSyncedTime: metav1.Time{Time: c.Now().Add(-59 * time.Minute)},
|
||||
},
|
||||
},
|
||||
backupStoreRevision: "foo",
|
||||
now: c.Now(),
|
||||
expectSync: false,
|
||||
},
|
||||
{
|
||||
name: "BSL with different revision than backup store last synced less than an hour ago should sync",
|
||||
location: &arkv1api.BackupStorageLocation{
|
||||
Status: arkv1api.BackupStorageLocationStatus{
|
||||
LastSyncedRevision: types.UID("foo"),
|
||||
LastSyncedTime: metav1.Time{Time: c.Now().Add(-time.Minute)},
|
||||
},
|
||||
},
|
||||
backupStoreRevision: "bar",
|
||||
now: c.Now(),
|
||||
expectSync: true,
|
||||
expectedRevision: "bar",
|
||||
},
|
||||
{
|
||||
name: "BSL with different revision than backup store last synced more than an hour ago should sync",
|
||||
location: &arkv1api.BackupStorageLocation{
|
||||
Status: arkv1api.BackupStorageLocationStatus{
|
||||
LastSyncedRevision: types.UID("foo"),
|
||||
LastSyncedTime: metav1.Time{Time: c.Now().Add(-61 * time.Minute)},
|
||||
},
|
||||
},
|
||||
backupStoreRevision: "bar",
|
||||
now: c.Now(),
|
||||
expectSync: true,
|
||||
expectedRevision: "bar",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
backupStore := new(persistencemocks.BackupStore)
|
||||
if test.backupStoreRevision != "" {
|
||||
backupStore.On("GetRevision").Return(test.backupStoreRevision, nil)
|
||||
} else {
|
||||
backupStore.On("GetRevision").Return("", errors.New("object revision not found"))
|
||||
}
|
||||
|
||||
shouldSync, rev := shouldSync(test.location, test.now, backupStore, arktest.NewLogger())
|
||||
assert.Equal(t, test.expectSync, shouldSync)
|
||||
assert.Equal(t, test.expectedRevision, rev)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func getDeleteActions(actions []core.Action) []core.Action {
|
||||
var deleteActions []core.Action
|
||||
for _, action := range actions {
|
||||
|
||||
@@ -120,16 +120,37 @@ func (_m *BackupStore) IsValid() error {
|
||||
return r0
|
||||
}
|
||||
|
||||
// ListBackups provides a mock function with given fields:
|
||||
func (_m *BackupStore) ListBackups() ([]*v1.Backup, error) {
|
||||
// GetRevision provides a mock function with given fields:
|
||||
func (_m *BackupStore) GetRevision() (string, error) {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 []*v1.Backup
|
||||
if rf, ok := ret.Get(0).(func() []*v1.Backup); ok {
|
||||
var r0 string
|
||||
if rf, ok := ret.Get(0).(func() string); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(string)
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func() error); ok {
|
||||
r1 = rf()
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// ListBackups provides a mock function with given fields:
|
||||
func (_m *BackupStore) ListBackups() ([]string, error) {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 []string
|
||||
if rf, ok := ret.Get(0).(func() []string); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]*v1.Backup)
|
||||
r0 = ret.Get(0).([]string)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/satori/uuid"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
kerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
@@ -36,8 +37,9 @@ import (
|
||||
// Ark backup and restore data in/from a persistent backup store.
|
||||
type BackupStore interface {
|
||||
IsValid() error
|
||||
GetRevision() (string, error)
|
||||
|
||||
ListBackups() ([]*arkv1api.Backup, error)
|
||||
ListBackups() ([]string, error)
|
||||
|
||||
PutBackup(name string, metadata, contents, log io.Reader) error
|
||||
GetBackupMetadata(name string) (*arkv1api.Backup, error)
|
||||
@@ -133,16 +135,16 @@ func (s *objectBackupStore) IsValid() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *objectBackupStore) ListBackups() ([]*arkv1api.Backup, error) {
|
||||
func (s *objectBackupStore) ListBackups() ([]string, error) {
|
||||
prefixes, err := s.objectStore.ListCommonPrefixes(s.bucket, s.layout.subdirs["backups"], "/")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(prefixes) == 0 {
|
||||
return []*arkv1api.Backup{}, nil
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
output := make([]*arkv1api.Backup, 0, len(prefixes))
|
||||
output := make([]string, 0, len(prefixes))
|
||||
|
||||
for _, prefix := range prefixes {
|
||||
// values returned from a call to cloudprovider.ObjectStore's
|
||||
@@ -151,13 +153,7 @@ func (s *objectBackupStore) ListBackups() ([]*arkv1api.Backup, error) {
|
||||
// each of those off to get the backup name.
|
||||
backupName := strings.TrimSuffix(strings.TrimPrefix(prefix, s.layout.subdirs["backups"]), "/")
|
||||
|
||||
backup, err := s.GetBackupMetadata(backupName)
|
||||
if err != nil {
|
||||
s.logger.WithError(err).WithField("dir", backupName).Error("Error reading backup directory")
|
||||
continue
|
||||
}
|
||||
|
||||
output = append(output, backup)
|
||||
output = append(output, backupName)
|
||||
}
|
||||
|
||||
return output, nil
|
||||
@@ -187,6 +183,10 @@ func (s *objectBackupStore) PutBackup(name string, metadata io.Reader, contents
|
||||
return kerrors.NewAggregate([]error{err, deleteErr})
|
||||
}
|
||||
|
||||
if err := s.putRevision(); err != nil {
|
||||
s.logger.WithField("backup", name).WithError(err).Warn("Error updating backup store revision")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -239,6 +239,10 @@ func (s *objectBackupStore) DeleteBackup(name string) error {
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.putRevision(); err != nil {
|
||||
s.logger.WithField("backup", name).WithError(err).Warn("Error updating backup store revision")
|
||||
}
|
||||
|
||||
return errors.WithStack(kerrors.NewAggregate(errs))
|
||||
}
|
||||
|
||||
@@ -258,6 +262,10 @@ func (s *objectBackupStore) DeleteRestore(name string) error {
|
||||
}
|
||||
}
|
||||
|
||||
if err = s.putRevision(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
return errors.WithStack(kerrors.NewAggregate(errs))
|
||||
}
|
||||
|
||||
@@ -284,6 +292,30 @@ func (s *objectBackupStore) GetDownloadURL(target arkv1api.DownloadTarget) (stri
|
||||
}
|
||||
}
|
||||
|
||||
func (s *objectBackupStore) GetRevision() (string, error) {
|
||||
rdr, err := s.objectStore.GetObject(s.bucket, s.layout.getRevisionKey())
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
bytes, err := ioutil.ReadAll(rdr)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "error reading contents of revision file")
|
||||
}
|
||||
|
||||
return string(bytes), nil
|
||||
}
|
||||
|
||||
func (s *objectBackupStore) putRevision() error {
|
||||
rdr := strings.NewReader(uuid.NewV4().String())
|
||||
|
||||
if err := seekAndPutObject(s.objectStore, s.bucket, s.layout.getRevisionKey(), rdr); err != nil {
|
||||
return errors.Wrap(err, "error updating revision file")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func seekToBeginning(r io.Reader) error {
|
||||
seeker, ok := r.(io.Seeker)
|
||||
if !ok {
|
||||
|
||||
@@ -38,6 +38,7 @@ func NewObjectStoreLayout(prefix string) *ObjectStoreLayout {
|
||||
"backups": path.Join(prefix, "backups") + "/",
|
||||
"restores": path.Join(prefix, "restores") + "/",
|
||||
"restic": path.Join(prefix, "restic") + "/",
|
||||
"metadata": path.Join(prefix, "metadata") + "/",
|
||||
}
|
||||
|
||||
return &ObjectStoreLayout{
|
||||
@@ -58,6 +59,10 @@ func (l *ObjectStoreLayout) isValidSubdir(name string) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
func (l *ObjectStoreLayout) getRevisionKey() string {
|
||||
return path.Join(l.subdirs["metadata"], "revision")
|
||||
}
|
||||
|
||||
func (l *ObjectStoreLayout) getBackupDir(backup string) string {
|
||||
return path.Join(l.subdirs["backups"], backup) + "/"
|
||||
}
|
||||
|
||||
@@ -21,11 +21,13 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -160,7 +162,7 @@ func TestListBackups(t *testing.T) {
|
||||
name string
|
||||
prefix string
|
||||
storageData cloudprovider.BucketData
|
||||
expectedRes []*api.Backup
|
||||
expectedRes []string
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
@@ -169,16 +171,7 @@ func TestListBackups(t *testing.T) {
|
||||
"backups/backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}),
|
||||
"backups/backup-2/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-2"}}),
|
||||
},
|
||||
expectedRes: []*api.Backup{
|
||||
{
|
||||
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "backup-1"},
|
||||
},
|
||||
{
|
||||
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "backup-2"},
|
||||
},
|
||||
},
|
||||
expectedRes: []string{"backup-1", "backup-2"},
|
||||
},
|
||||
{
|
||||
name: "normal case with backup store prefix",
|
||||
@@ -187,29 +180,7 @@ func TestListBackups(t *testing.T) {
|
||||
"ark-backups/backups/backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}),
|
||||
"ark-backups/backups/backup-2/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-2"}}),
|
||||
},
|
||||
expectedRes: []*api.Backup{
|
||||
{
|
||||
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "backup-1"},
|
||||
},
|
||||
{
|
||||
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "backup-2"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "backup that can't be decoded is ignored",
|
||||
storageData: map[string][]byte{
|
||||
"backups/backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}),
|
||||
"backups/backup-2/ark-backup.json": []byte("this is not valid backup JSON"),
|
||||
},
|
||||
expectedRes: []*api.Backup{
|
||||
{
|
||||
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "backup-1"},
|
||||
},
|
||||
},
|
||||
expectedRes: []string{"backup-1", "backup-2"},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -225,22 +196,8 @@ func TestListBackups(t *testing.T) {
|
||||
|
||||
arktest.AssertErrorMatches(t, tc.expectedErr, err)
|
||||
|
||||
getComparer := func(obj []*api.Backup) func(i, j int) bool {
|
||||
return func(i, j int) bool {
|
||||
switch strings.Compare(obj[i].Namespace, obj[j].Namespace) {
|
||||
case -1:
|
||||
return true
|
||||
case 1:
|
||||
return false
|
||||
default:
|
||||
// namespaces are the same: compare by name
|
||||
return obj[i].Name < obj[j].Name
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(tc.expectedRes, getComparer(tc.expectedRes))
|
||||
sort.Slice(res, getComparer(res))
|
||||
sort.Strings(tc.expectedRes)
|
||||
sort.Strings(res)
|
||||
|
||||
assert.Equal(t, tc.expectedRes, res)
|
||||
})
|
||||
@@ -263,7 +220,7 @@ func TestPutBackup(t *testing.T) {
|
||||
contents: newStringReadSeeker("contents"),
|
||||
log: newStringReadSeeker("log"),
|
||||
expectedErr: "",
|
||||
expectedKeys: []string{"backups/backup-1/ark-backup.json", "backups/backup-1/backup-1.tar.gz", "backups/backup-1/backup-1-logs.gz"},
|
||||
expectedKeys: []string{"backups/backup-1/ark-backup.json", "backups/backup-1/backup-1.tar.gz", "backups/backup-1/backup-1-logs.gz", "metadata/revision"},
|
||||
},
|
||||
{
|
||||
name: "normal case with backup store prefix",
|
||||
@@ -272,7 +229,7 @@ func TestPutBackup(t *testing.T) {
|
||||
contents: newStringReadSeeker("contents"),
|
||||
log: newStringReadSeeker("log"),
|
||||
expectedErr: "",
|
||||
expectedKeys: []string{"prefix-1/backups/backup-1/ark-backup.json", "prefix-1/backups/backup-1/backup-1.tar.gz", "prefix-1/backups/backup-1/backup-1-logs.gz"},
|
||||
expectedKeys: []string{"prefix-1/backups/backup-1/ark-backup.json", "prefix-1/backups/backup-1/backup-1.tar.gz", "prefix-1/backups/backup-1/backup-1-logs.gz", "prefix-1/metadata/revision"},
|
||||
},
|
||||
{
|
||||
name: "error on metadata upload does not upload data",
|
||||
@@ -296,7 +253,7 @@ func TestPutBackup(t *testing.T) {
|
||||
contents: newStringReadSeeker("bar"),
|
||||
log: new(errorReader),
|
||||
expectedErr: "",
|
||||
expectedKeys: []string{"backups/backup-1/ark-backup.json", "backups/backup-1/backup-1.tar.gz"},
|
||||
expectedKeys: []string{"backups/backup-1/ark-backup.json", "backups/backup-1/backup-1.tar.gz", "metadata/revision"},
|
||||
},
|
||||
{
|
||||
name: "don't upload data when metadata is nil",
|
||||
@@ -309,15 +266,17 @@ func TestPutBackup(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
harness := newObjectBackupStoreTestHarness("foo", tc.prefix)
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
harness := newObjectBackupStoreTestHarness("foo", tc.prefix)
|
||||
|
||||
err := harness.PutBackup("backup-1", tc.metadata, tc.contents, tc.log)
|
||||
err := harness.PutBackup("backup-1", tc.metadata, tc.contents, tc.log)
|
||||
|
||||
arktest.AssertErrorMatches(t, tc.expectedErr, err)
|
||||
assert.Len(t, harness.objectStore.Data[harness.bucket], len(tc.expectedKeys))
|
||||
for _, key := range tc.expectedKeys {
|
||||
assert.Contains(t, harness.objectStore.Data[harness.bucket], key)
|
||||
}
|
||||
arktest.AssertErrorMatches(t, tc.expectedErr, err)
|
||||
assert.Len(t, harness.objectStore.Data[harness.bucket], len(tc.expectedKeys))
|
||||
for _, key := range tc.expectedKeys {
|
||||
assert.Contains(t, harness.objectStore.Data[harness.bucket], key)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -378,6 +337,7 @@ func TestDeleteBackup(t *testing.T) {
|
||||
}
|
||||
|
||||
objectStore.On("DeleteObject", backupStore.bucket, obj).Return(err)
|
||||
objectStore.On("PutObject", "test-bucket", path.Join(test.prefix, "metadata", "revision"), mock.Anything).Return(nil)
|
||||
}
|
||||
|
||||
err := backupStore.DeleteBackup("bak")
|
||||
|
||||
Reference in New Issue
Block a user