Store PodVolumeBackups in obj storage & use as source of truth (#1577)

* Store PodVolumeBackups in object storage

Signed-off-by: Carlisia <carlisiac@vmware.com>
This commit is contained in:
KubeKween
2019-07-24 12:51:20 -07:00
committed by Nolan Brubaker
parent f80e1dc390
commit 3b9af8c654
23 changed files with 617 additions and 395 deletions

View File

@@ -43,8 +43,10 @@ type backupSyncController struct {
backupClient velerov1client.BackupsGetter
backupLocationClient velerov1client.BackupStorageLocationsGetter
podVolumeBackupClient velerov1client.PodVolumeBackupsGetter
backupLister listers.BackupLister
backupStorageLocationLister listers.BackupStorageLocationLister
podVolumeBackupLister listers.PodVolumeBackupLister
namespace string
defaultBackupLocation string
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
@@ -54,8 +56,10 @@ type backupSyncController struct {
func NewBackupSyncController(
backupClient velerov1client.BackupsGetter,
backupLocationClient velerov1client.BackupStorageLocationsGetter,
podVolumeBackupClient velerov1client.PodVolumeBackupsGetter,
backupInformer informers.BackupInformer,
backupStorageLocationInformer informers.BackupStorageLocationInformer,
podVolumeBackupInformer informers.PodVolumeBackupInformer,
syncPeriod time.Duration,
namespace string,
defaultBackupLocation string,
@@ -71,10 +75,12 @@ func NewBackupSyncController(
genericController: newGenericController("backup-sync", logger),
backupClient: backupClient,
backupLocationClient: backupLocationClient,
podVolumeBackupClient: podVolumeBackupClient,
namespace: namespace,
defaultBackupLocation: defaultBackupLocation,
backupLister: backupInformer.Lister(),
backupStorageLocationLister: backupStorageLocationInformer.Lister(),
podVolumeBackupLister: podVolumeBackupInformer.Lister(),
// use variables to refer to these functions so they can be
// replaced with fakes for testing.
@@ -159,7 +165,7 @@ func (c *backupSyncController) run() {
backupStore, err := c.newBackupStore(location, pluginManager, log)
if err != nil {
log.WithError(err).Error("Error getting backup store for location")
log.WithError(err).Error("Error getting backup store for this location")
continue
}
@@ -167,7 +173,7 @@ func (c *backupSyncController) run() {
if !ok {
continue
}
log.Infof("Syncing contents of backup store into cluster")
log.Info("Syncing contents of backup store into cluster")
res, err := backupStore.ListBackups()
if err != nil {
@@ -179,26 +185,16 @@ func (c *backupSyncController) run() {
for backupName := range backupStoreBackups {
log = log.WithField("backup", backupName)
log.Debug("Checking backup store backup to see if it needs to be synced into the cluster")
log.Debug("Checking this backup to see if it needs to be synced into the cluster")
// use the controller's namespace when getting the backup because that's where we
// are syncing backups to, regardless of the namespace of the cloud backup.
backup, err := c.backupClient.Backups(c.namespace).Get(backupName, metav1.GetOptions{})
if err == nil {
log.Debug("Backup already exists in cluster")
if backup.Spec.StorageLocation != "" {
continue
}
// pre-v0.10 backups won't initially have a .spec.storageLocation so fill it in
log.Debug("Patching backup's .spec.storageLocation because it's missing")
if err := patchStorageLocation(backup, c.backupClient.Backups(c.namespace), location.Name); err != nil {
log.WithError(err).Error("Error patching backup's .spec.storageLocation")
}
continue
}
if !kuberrs.IsNotFound(err) {
log.WithError(errors.WithStack(err)).Error("Error getting backup from client, proceeding with sync into cluster")
}
@@ -220,8 +216,8 @@ func (c *backupSyncController) run() {
backup.Labels = make(map[string]string)
}
backup.Labels[velerov1api.StorageLocationLabel] = label.GetValidName(backup.Spec.StorageLocation)
_, err = c.backupClient.Backups(backup.Namespace).Create(backup)
// process the regular velero backup
backup, err = c.backupClient.Backups(backup.Namespace).Create(backup)
switch {
case err != nil && kuberrs.IsAlreadyExists(err):
log.Debug("Backup already exists in cluster")
@@ -232,6 +228,40 @@ func (c *backupSyncController) run() {
default:
log.Debug("Synced backup into cluster")
}
// process the pod volume backups from object store, if any
podVolumeBackups, err := backupStore.GetPodVolumeBackups(backupName)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error getting pod volumes for this backup from backup store")
continue
}
for _, podVolumeBackup := range podVolumeBackups {
log = log.WithField("podVolumeBackup", podVolumeBackup.Name)
log.Debug("Checking this pod volume backup to see if it needs to be synced into the cluster")
for _, or := range podVolumeBackup.ObjectMeta.OwnerReferences {
if or.Name == backup.Name {
or.UID = backup.UID
}
}
if _, ok := podVolumeBackup.Labels[velerov1api.BackupUIDLabel]; ok {
podVolumeBackup.Labels[velerov1api.BackupUIDLabel] = string(backup.UID)
}
_, err = c.podVolumeBackupClient.PodVolumeBackups(backup.Namespace).Create(podVolumeBackup)
switch {
case err != nil && kuberrs.IsAlreadyExists(err):
log.Debug("Pod volume backup already exists in cluster")
continue
case err != nil && !kuberrs.IsAlreadyExists(err):
log.WithError(errors.WithStack(err)).Error("Error syncing pod volume backup into cluster")
continue
default:
log.Debug("Synced pod volume backup into cluster")
}
}
}
c.deleteOrphanedBackups(location.Name, backupStoreBackups, log)
@@ -280,9 +310,9 @@ func patchStorageLocation(backup *velerov1api.Backup, client velerov1client.Back
return nil
}
// deleteOrphanedBackups deletes backup objects from Kubernetes that have the specified location
// deleteOrphanedBackups deletes backup objects (CRDs) from Kubernetes that have the specified location
// and a phase of Completed, but no corresponding backup in object storage.
func (c *backupSyncController) deleteOrphanedBackups(locationName string, cloudBackupNames sets.String, log logrus.FieldLogger) {
func (c *backupSyncController) deleteOrphanedBackups(locationName string, backupStoreBackups sets.String, log logrus.FieldLogger) {
locationSelector := labels.Set(map[string]string{
velerov1api.StorageLocationLabel: label.GetValidName(locationName),
}).AsSelector()
@@ -298,7 +328,7 @@ func (c *backupSyncController) deleteOrphanedBackups(locationName string, cloudB
for _, backup := range backups {
log = log.WithField("backup", backup.Name)
if backup.Status.Phase != velerov1api.BackupPhaseCompleted || cloudBackupNames.Has(backup.Name) {
if backup.Status.Phase != velerov1api.BackupPhaseCompleted || backupStoreBackups.Has(backup.Name) {
continue
}