BIAv2 async operations controller work

Signed-off-by: Scott Seago <sseago@redhat.com>
This commit is contained in:
Scott Seago
2023-01-24 12:59:53 -05:00
parent 94fec66bc8
commit c3d1d83da5
52 changed files with 3078 additions and 564 deletions

View File

@@ -76,15 +76,15 @@ func (c *BackupItemActionGRPCClient) AppliesTo() (velero.ResourceSelector, error
}, nil
}
func (c *BackupItemActionGRPCClient) Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, string, error) {
func (c *BackupItemActionGRPCClient) Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, string, []velero.ResourceIdentifier, error) {
itemJSON, err := json.Marshal(item.UnstructuredContent())
if err != nil {
return nil, nil, "", errors.WithStack(err)
return nil, nil, "", nil, errors.WithStack(err)
}
backupJSON, err := json.Marshal(backup)
if err != nil {
return nil, nil, "", errors.WithStack(err)
return nil, nil, "", nil, errors.WithStack(err)
}
req := &protobiav2.ExecuteRequest{
@@ -95,12 +95,12 @@ func (c *BackupItemActionGRPCClient) Execute(item runtime.Unstructured, backup *
res, err := c.grpcClient.Execute(context.Background(), req)
if err != nil {
return nil, nil, "", common.FromGRPCError(err)
return nil, nil, "", nil, common.FromGRPCError(err)
}
var updatedItem unstructured.Unstructured
if err := json.Unmarshal(res.Item, &updatedItem); err != nil {
return nil, nil, "", errors.WithStack(err)
return nil, nil, "", nil, errors.WithStack(err)
}
var additionalItems []velero.ResourceIdentifier
@@ -118,7 +118,22 @@ func (c *BackupItemActionGRPCClient) Execute(item runtime.Unstructured, backup *
additionalItems = append(additionalItems, newItem)
}
return &updatedItem, additionalItems, res.OperationID, nil
var itemsToUpdate []velero.ResourceIdentifier
for _, itm := range res.ItemsToUpdate {
newItem := velero.ResourceIdentifier{
GroupResource: schema.GroupResource{
Group: itm.Group,
Resource: itm.Resource,
},
Namespace: itm.Namespace,
Name: itm.Name,
}
itemsToUpdate = append(itemsToUpdate, newItem)
}
return &updatedItem, additionalItems, res.OperationID, itemsToUpdate, nil
}
func (c *BackupItemActionGRPCClient) Progress(operationID string, backup *api.Backup) (velero.OperationProgress, error) {
@@ -167,3 +182,9 @@ func (c *BackupItemActionGRPCClient) Cancel(operationID string, backup *api.Back
return nil
}
// This shouldn't be called on the GRPC client since the RestartableBackupItemAction won't delegate
// this method
func (c *BackupItemActionGRPCClient) Name() string {
return ""
}

View File

@@ -107,7 +107,7 @@ func (s *BackupItemActionGRPCServer) Execute(
return nil, common.NewGRPCError(errors.WithStack(err))
}
updatedItem, additionalItems, operationID, err := impl.Execute(&item, &backup)
updatedItem, additionalItems, operationID, itemsToUpdate, err := impl.Execute(&item, &backup)
if err != nil {
return nil, common.NewGRPCError(err)
}
@@ -132,6 +132,9 @@ func (s *BackupItemActionGRPCServer) Execute(
for _, item := range additionalItems {
res.AdditionalItems = append(res.AdditionalItems, backupResourceIdentifierToProto(item))
}
for _, item := range itemsToUpdate {
res.ItemsToUpdate = append(res.ItemsToUpdate, backupResourceIdentifierToProto(item))
}
return res, nil
}
@@ -210,3 +213,9 @@ func backupResourceIdentifierToProto(id velero.ResourceIdentifier) *proto.Resour
Name: id.Name,
}
}
// This shouldn't be called on the GRPC server since the server won't ever receive this request, as
// the RestartableBackupItemAction in Velero won't delegate this to the server
func (c *BackupItemActionGRPCServer) Name() string {
return ""
}

View File

@@ -97,6 +97,7 @@ func TestBackupItemActionGRPCServerExecute(t *testing.T) {
implUpdatedItem runtime.Unstructured
implAdditionalItems []velero.ResourceIdentifier
implOperationID string
implItemsToUpdate []velero.ResourceIdentifier
implError error
expectError bool
skipMock bool
@@ -153,7 +154,7 @@ func TestBackupItemActionGRPCServerExecute(t *testing.T) {
defer itemAction.AssertExpectations(t)
if !test.skipMock {
itemAction.On("Execute", &validItemObject, &validBackupObject).Return(test.implUpdatedItem, test.implAdditionalItems, test.implOperationID, test.implError)
itemAction.On("Execute", &validItemObject, &validBackupObject).Return(test.implUpdatedItem, test.implAdditionalItems, test.implOperationID, test.implItemsToUpdate, test.implError)
}
s := &BackupItemActionGRPCServer{mux: &common.ServerMux{