mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-05 13:05:17 +00:00
refine what gets enqueued in PVB/PVR controllers, and log better
Signed-off-by: Steve Kriss <steve@heptio.com>
This commit is contained in:
@@ -111,13 +111,34 @@ func NewPodVolumeRestoreController(
|
||||
|
||||
func (c *podVolumeRestoreController) pvrHandler(obj interface{}) {
|
||||
pvr := obj.(*arkv1api.PodVolumeRestore)
|
||||
log := c.logger.WithField("key", kube.NamespaceAndName(pvr))
|
||||
log := loggerForPodVolumeRestore(c.logger, pvr)
|
||||
|
||||
if !shouldEnqueuePVR(pvr, c.podLister, c.nodeName, log) {
|
||||
if !isPVRNew(pvr) {
|
||||
log.Debugf("Restore is not new, not enqueuing")
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("enqueueing")
|
||||
pod, err := c.podLister.Pods(pvr.Spec.Pod.Namespace).Get(pvr.Spec.Pod.Name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
log.WithError(err).Debugf("Restore's pod %s/%s not found, not enqueueing.", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("Unable to get restore's pod %s/%s, not enqueueing.", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)
|
||||
return
|
||||
}
|
||||
|
||||
if !isPodOnNode(pod, c.nodeName) {
|
||||
log.Debugf("Restore's pod is not on this node, not enqueuing")
|
||||
return
|
||||
}
|
||||
|
||||
if !isResticInitContainerRunning(pod) {
|
||||
log.Debug("Restore's pod is not running restic-wait init container, not enqueuing")
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("Enqueueing")
|
||||
c.enqueue(obj)
|
||||
}
|
||||
|
||||
@@ -125,84 +146,53 @@ func (c *podVolumeRestoreController) podHandler(obj interface{}) {
|
||||
pod := obj.(*corev1api.Pod)
|
||||
log := c.logger.WithField("key", kube.NamespaceAndName(pod))
|
||||
|
||||
for _, pvr := range pvrsToEnqueueForPod(pod, c.podVolumeRestoreLister, c.nodeName, log) {
|
||||
c.enqueue(pvr)
|
||||
}
|
||||
}
|
||||
|
||||
func shouldProcessPod(pod *corev1api.Pod, nodeName string, log logrus.FieldLogger) bool {
|
||||
// if the pod lister being used is filtered to pods on this node, this is superfluous,
|
||||
// but retaining for safety.
|
||||
if pod.Spec.NodeName != nodeName {
|
||||
log.Debugf("Pod is scheduled on node %s, not enqueueing.", pod.Spec.NodeName)
|
||||
return false
|
||||
// the pod should always be for this node since the podInformer is filtered
|
||||
// based on node, so this is just a failsafe.
|
||||
if !isPodOnNode(pod, c.nodeName) {
|
||||
return
|
||||
}
|
||||
|
||||
// only process items for pods that have the restic initContainer running
|
||||
if !resticInitContainerRunning(pod) {
|
||||
log.Debugf("Pod is not running restic initContainer, not enqueueing.")
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func shouldProcessPVR(pvr *arkv1api.PodVolumeRestore, log logrus.FieldLogger) bool {
|
||||
// only process new items
|
||||
if pvr.Status.Phase != "" && pvr.Status.Phase != arkv1api.PodVolumeRestorePhaseNew {
|
||||
log.Debugf("Item has phase %s, not enqueueing.", pvr.Status.Phase)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func pvrsToEnqueueForPod(pod *corev1api.Pod, pvrLister listers.PodVolumeRestoreLister, nodeName string, log logrus.FieldLogger) []*arkv1api.PodVolumeRestore {
|
||||
if !shouldProcessPod(pod, nodeName, log) {
|
||||
return nil
|
||||
if !isResticInitContainerRunning(pod) {
|
||||
log.Debug("Pod is not running restic-wait init container, not enqueuing restores for pod")
|
||||
return
|
||||
}
|
||||
|
||||
selector, err := labels.Parse(fmt.Sprintf("%s=%s", arkv1api.PodUIDLabel, pod.UID))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to parse label selector %s", fmt.Sprintf("%s=%s", arkv1api.PodUIDLabel, pod.UID))
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
pvrs, err := pvrLister.List(selector)
|
||||
pvrs, err := c.podVolumeRestoreLister.List(selector)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to list pod volume restores")
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
var res []*arkv1api.PodVolumeRestore
|
||||
for i, pvr := range pvrs {
|
||||
if shouldProcessPVR(pvr, log) {
|
||||
res = append(res, pvrs[i])
|
||||
if len(pvrs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, pvr := range pvrs {
|
||||
log := loggerForPodVolumeRestore(log, pvr)
|
||||
if !isPVRNew(pvr) {
|
||||
log.Debug("Restore is not new, not enqueuing")
|
||||
continue
|
||||
}
|
||||
log.Debug("Enqueuing")
|
||||
c.enqueue(pvr)
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func shouldEnqueuePVR(pvr *arkv1api.PodVolumeRestore, podLister corev1listers.PodLister, nodeName string, log logrus.FieldLogger) bool {
|
||||
if !shouldProcessPVR(pvr, log) {
|
||||
return false
|
||||
}
|
||||
|
||||
pod, err := podLister.Pods(pvr.Spec.Pod.Namespace).Get(pvr.Spec.Pod.Name)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("Unable to get item's pod %s/%s, not enqueueing.", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)
|
||||
return false
|
||||
}
|
||||
|
||||
if !shouldProcessPod(pod, nodeName, log) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
func isPVRNew(pvr *arkv1api.PodVolumeRestore) bool {
|
||||
return pvr.Status.Phase == "" || pvr.Status.Phase == arkv1api.PodVolumeRestorePhaseNew
|
||||
}
|
||||
|
||||
func resticInitContainerRunning(pod *corev1api.Pod) bool {
|
||||
func isPodOnNode(pod *corev1api.Pod, node string) bool {
|
||||
return pod.Spec.NodeName == node
|
||||
}
|
||||
|
||||
func isResticInitContainerRunning(pod *corev1api.Pod) bool {
|
||||
// no init containers, or the first one is not the ark restic one: return false
|
||||
if len(pod.Spec.InitContainers) == 0 || pod.Spec.InitContainers[0].Name != restic.InitContainer {
|
||||
return false
|
||||
@@ -219,7 +209,7 @@ func resticInitContainerRunning(pod *corev1api.Pod) bool {
|
||||
|
||||
func (c *podVolumeRestoreController) processQueueItem(key string) error {
|
||||
log := c.logger.WithField("key", key)
|
||||
log.Debug("Running processItem")
|
||||
log.Debug("Running processQueueItem")
|
||||
|
||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
@@ -241,12 +231,24 @@ func (c *podVolumeRestoreController) processQueueItem(key string) error {
|
||||
return c.processRestoreFunc(reqCopy)
|
||||
}
|
||||
|
||||
func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeRestore) error {
|
||||
log := c.logger.WithFields(logrus.Fields{
|
||||
func loggerForPodVolumeRestore(baseLogger logrus.FieldLogger, req *arkv1api.PodVolumeRestore) logrus.FieldLogger {
|
||||
log := baseLogger.WithFields(logrus.Fields{
|
||||
"namespace": req.Namespace,
|
||||
"name": req.Name,
|
||||
})
|
||||
|
||||
if len(req.OwnerReferences) == 1 {
|
||||
log = log.WithField("restore", fmt.Sprintf("%s/%s", req.Namespace, req.OwnerReferences[0].Name))
|
||||
}
|
||||
|
||||
return log
|
||||
}
|
||||
|
||||
func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeRestore) error {
|
||||
log := loggerForPodVolumeRestore(c.logger, req)
|
||||
|
||||
log.Info("Restore starting")
|
||||
|
||||
var err error
|
||||
|
||||
// update status to InProgress
|
||||
@@ -288,6 +290,8 @@ func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeResto
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Restore completed")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user