mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-05 21:14:56 +00:00
only sync a backup location if it's changed since last sync
Signed-off-by: Steve Kriss <steve@heptio.com>
This commit is contained in:
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package controller
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -25,6 +26,7 @@ import (
|
||||
kuberrs "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
@@ -34,14 +36,14 @@ import (
|
||||
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
|
||||
"github.com/heptio/ark/pkg/persistence"
|
||||
"github.com/heptio/ark/pkg/plugin"
|
||||
"github.com/heptio/ark/pkg/util/kube"
|
||||
"github.com/heptio/ark/pkg/util/stringslice"
|
||||
)
|
||||
|
||||
type backupSyncController struct {
|
||||
*genericController
|
||||
|
||||
client arkv1client.BackupsGetter
|
||||
backupClient arkv1client.BackupsGetter
|
||||
backupLocationClient arkv1client.BackupStorageLocationsGetter
|
||||
backupLister listers.BackupLister
|
||||
backupStorageLocationLister listers.BackupStorageLocationLister
|
||||
namespace string
|
||||
@@ -51,7 +53,8 @@ type backupSyncController struct {
|
||||
}
|
||||
|
||||
func NewBackupSyncController(
|
||||
client arkv1client.BackupsGetter,
|
||||
backupClient arkv1client.BackupsGetter,
|
||||
backupLocationClient arkv1client.BackupStorageLocationsGetter,
|
||||
backupInformer informers.BackupInformer,
|
||||
backupStorageLocationInformer informers.BackupStorageLocationInformer,
|
||||
syncPeriod time.Duration,
|
||||
@@ -67,7 +70,8 @@ func NewBackupSyncController(
|
||||
|
||||
c := &backupSyncController{
|
||||
genericController: newGenericController("backup-sync", logger),
|
||||
client: client,
|
||||
backupClient: backupClient,
|
||||
backupLocationClient: backupLocationClient,
|
||||
namespace: namespace,
|
||||
defaultBackupLocation: defaultBackupLocation,
|
||||
backupLister: backupInformer.Lister(),
|
||||
@@ -91,8 +95,35 @@ func NewBackupSyncController(
|
||||
|
||||
const gcFinalizer = "gc.ark.heptio.com"
|
||||
|
||||
func shouldSync(location *arkv1api.BackupStorageLocation, now time.Time, backupStore persistence.BackupStore, log logrus.FieldLogger) (bool, string) {
|
||||
log = log.WithFields(map[string]interface{}{
|
||||
"lastSyncedRevision": location.Status.LastSyncedRevision,
|
||||
"lastSyncedTime": location.Status.LastSyncedTime.Time.Format(time.RFC1123Z),
|
||||
})
|
||||
|
||||
revision, err := backupStore.GetRevision()
|
||||
if err != nil {
|
||||
log.WithError(err).Info("Error getting backup store's revision, syncing")
|
||||
return true, ""
|
||||
}
|
||||
log = log.WithField("revision", revision)
|
||||
|
||||
if location.Status.LastSyncedTime.Add(time.Hour).Before(now) {
|
||||
log.Infof("Backup location hasn't been synced in more than %s, syncing", time.Hour)
|
||||
return true, revision
|
||||
}
|
||||
|
||||
if string(location.Status.LastSyncedRevision) != revision {
|
||||
log.Info("Backup location hasn't been synced since its last modification, syncing")
|
||||
return true, revision
|
||||
}
|
||||
|
||||
log.Debug("Backup location's contents haven't changed since last sync, not syncing")
|
||||
return false, ""
|
||||
}
|
||||
|
||||
func (c *backupSyncController) run() {
|
||||
c.logger.Info("Syncing backups from backup storage into cluster")
|
||||
c.logger.Info("Checking for backup storage locations to sync into cluster")
|
||||
|
||||
locations, err := c.backupStorageLocationLister.BackupStorageLocations(c.namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
@@ -103,10 +134,10 @@ func (c *backupSyncController) run() {
|
||||
locations = orderedBackupLocations(locations, c.defaultBackupLocation)
|
||||
|
||||
pluginManager := c.newPluginManager(c.logger)
|
||||
defer pluginManager.CleanupClients()
|
||||
|
||||
for _, location := range locations {
|
||||
log := c.logger.WithField("backupLocation", location.Name)
|
||||
log.Info("Syncing backups from backup location")
|
||||
|
||||
backupStore, err := c.newBackupStore(location, pluginManager, log)
|
||||
if err != nil {
|
||||
@@ -114,24 +145,26 @@ func (c *backupSyncController) run() {
|
||||
continue
|
||||
}
|
||||
|
||||
backupsInBackupStore, err := backupStore.ListBackups()
|
||||
ok, revision := shouldSync(location, time.Now().UTC(), backupStore, log)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
res, err := backupStore.ListBackups()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error listing backups in backup store")
|
||||
continue
|
||||
}
|
||||
backupStoreBackups := sets.NewString(res...)
|
||||
log.WithField("backupCount", len(backupStoreBackups)).Info("Got backups from backup store")
|
||||
|
||||
log.WithField("backupCount", len(backupsInBackupStore)).Info("Got backups from backup store")
|
||||
|
||||
cloudBackupNames := sets.NewString()
|
||||
for _, cloudBackup := range backupsInBackupStore {
|
||||
log = log.WithField("backup", kube.NamespaceAndName(cloudBackup))
|
||||
log.Debug("Checking cloud backup to see if it needs to be synced into the cluster")
|
||||
|
||||
cloudBackupNames.Insert(cloudBackup.Name)
|
||||
for backupName := range backupStoreBackups {
|
||||
log = log.WithField("backup", backupName)
|
||||
log.Debug("Checking backup store backup to see if it needs to be synced into the cluster")
|
||||
|
||||
// use the controller's namespace when getting the backup because that's where we
|
||||
// are syncing backups to, regardless of the namespace of the cloud backup.
|
||||
_, err := c.client.Backups(c.namespace).Get(cloudBackup.Name, metav1.GetOptions{})
|
||||
_, err := c.backupClient.Backups(c.namespace).Get(backupName, metav1.GetOptions{})
|
||||
if err == nil {
|
||||
log.Debug("Backup already exists in cluster")
|
||||
continue
|
||||
@@ -140,22 +173,28 @@ func (c *backupSyncController) run() {
|
||||
log.WithError(errors.WithStack(err)).Error("Error getting backup from client, proceeding with sync into cluster")
|
||||
}
|
||||
|
||||
backup, err := backupStore.GetBackupMetadata(backupName)
|
||||
if err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error getting backup metadata from backup store")
|
||||
continue
|
||||
}
|
||||
|
||||
// remove the pre-v0.8.0 gcFinalizer if it exists
|
||||
// TODO(1.0): remove this
|
||||
cloudBackup.Finalizers = stringslice.Except(cloudBackup.Finalizers, gcFinalizer)
|
||||
cloudBackup.Namespace = c.namespace
|
||||
cloudBackup.ResourceVersion = ""
|
||||
backup.Finalizers = stringslice.Except(backup.Finalizers, gcFinalizer)
|
||||
backup.Namespace = c.namespace
|
||||
backup.ResourceVersion = ""
|
||||
|
||||
// update the StorageLocation field and label since the name of the location
|
||||
// may be different in this cluster than in the cluster that created the
|
||||
// backup.
|
||||
cloudBackup.Spec.StorageLocation = location.Name
|
||||
if cloudBackup.Labels == nil {
|
||||
cloudBackup.Labels = make(map[string]string)
|
||||
backup.Spec.StorageLocation = location.Name
|
||||
if backup.Labels == nil {
|
||||
backup.Labels = make(map[string]string)
|
||||
}
|
||||
cloudBackup.Labels[arkv1api.StorageLocationLabel] = cloudBackup.Spec.StorageLocation
|
||||
backup.Labels[arkv1api.StorageLocationLabel] = backup.Spec.StorageLocation
|
||||
|
||||
_, err = c.client.Backups(cloudBackup.Namespace).Create(cloudBackup)
|
||||
_, err = c.backupClient.Backups(backup.Namespace).Create(backup)
|
||||
switch {
|
||||
case err != nil && kuberrs.IsAlreadyExists(err):
|
||||
log.Debug("Backup already exists in cluster")
|
||||
@@ -166,7 +205,30 @@ func (c *backupSyncController) run() {
|
||||
}
|
||||
}
|
||||
|
||||
c.deleteOrphanedBackups(location.Name, cloudBackupNames, log)
|
||||
c.deleteOrphanedBackups(location.Name, backupStoreBackups, log)
|
||||
|
||||
// update the location's status's last-synced fields
|
||||
patch := map[string]interface{}{
|
||||
"status": map[string]interface{}{
|
||||
"lastSyncedTime": time.Now().UTC(),
|
||||
"lastSyncedRevision": revision,
|
||||
},
|
||||
}
|
||||
|
||||
patchBytes, err := json.Marshal(patch)
|
||||
if err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error marshaling last-synced patch to JSON")
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err = c.backupLocationClient.BackupStorageLocations(c.namespace).Patch(
|
||||
location.Name,
|
||||
types.MergePatchType,
|
||||
patchBytes,
|
||||
); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error patching backup location's last-synced time and revision")
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,7 +254,7 @@ func (c *backupSyncController) deleteOrphanedBackups(locationName string, cloudB
|
||||
continue
|
||||
}
|
||||
|
||||
if err := c.client.Backups(backup.Namespace).Delete(backup.Name, nil); err != nil {
|
||||
if err := c.backupClient.Backups(backup.Namespace).Delete(backup.Name, nil); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error deleting orphaned backup from cluster")
|
||||
} else {
|
||||
log.Debug("Deleted orphaned backup from cluster")
|
||||
|
||||
Reference in New Issue
Block a user