mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-05-22 07:01:32 +00:00
Merge pull request #9817 from Lyndon-Li/metadata-operator-for-kopia-repo
Some checks failed
Run the E2E test on kind / get-go-version (push) Failing after 1m0s
Run the E2E test on kind / build (push) Has been skipped
Run the E2E test on kind / setup-test-matrix (push) Successful in 3s
Run the E2E test on kind / run-e2e-test (push) Has been skipped
Main CI / get-go-version (push) Successful in 12s
Main CI / Build (push) Failing after 33s
Some checks failed
Run the E2E test on kind / get-go-version (push) Failing after 1m0s
Run the E2E test on kind / build (push) Has been skipped
Run the E2E test on kind / setup-test-matrix (push) Successful in 3s
Run the E2E test on kind / run-e2e-test (push) Has been skipped
Main CI / get-go-version (push) Successful in 12s
Main CI / Build (push) Failing after 33s
Metadata operation for kopia repo
This commit is contained in:
1
changelogs/unreleased/9817-Lyndon-Li
Normal file
1
changelogs/unreleased/9817-Lyndon-Li
Normal file
@@ -0,0 +1 @@
|
||||
Add metadata operation to Kopia repo for block data mover
|
||||
@@ -25,12 +25,15 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/compression"
|
||||
"github.com/kopia/kopia/repo/content/index"
|
||||
"github.com/kopia/kopia/repo/maintenance"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
"github.com/kopia/kopia/repo/object"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/snapshot/snapshotfs"
|
||||
"github.com/kopia/kopia/snapshot/snapshotmaintenance"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -410,14 +413,74 @@ func (kr *kopiaRepository) NewObjectWriter(ctx context.Context, opt udmrepo.Obje
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TODO add implementation in following PRs
|
||||
const kopiaDirStreamType = "kopia:directory"
|
||||
|
||||
func (kr *kopiaRepository) WriteMetadata(ctx context.Context, meta *udmrepo.Metadata, opt udmrepo.ObjectWriteOptions) (udmrepo.ID, error) {
|
||||
return "", errors.New("not supported")
|
||||
if kr.rawWriter == nil {
|
||||
return "", errors.New("repo writer is closed or not open")
|
||||
}
|
||||
|
||||
dirEntries := []*snapshot.DirEntry{}
|
||||
if meta.SubObjects != nil {
|
||||
for _, sub := range meta.SubObjects {
|
||||
rawID, err := object.ParseID(string(sub.ID))
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "error parsing object ID from %v", sub)
|
||||
}
|
||||
|
||||
dirEntries = append(dirEntries, &snapshot.DirEntry{
|
||||
Name: sub.Name,
|
||||
ObjectID: rawID,
|
||||
Type: getKopiaObjectType(sub.Type),
|
||||
FileSize: sub.Size,
|
||||
Permissions: snapshot.Permissions(sub.Permissions),
|
||||
ModTime: fs.UTCTimestampFromTime(sub.ModTime),
|
||||
UserID: sub.UserID,
|
||||
GroupID: sub.GroupID,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
dirManifest := snapshot.DirManifest{
|
||||
StreamType: kopiaDirStreamType,
|
||||
Entries: dirEntries,
|
||||
}
|
||||
|
||||
oid, err := snapshotfs.WriteDirManifest(ctx, kr.rawWriter, opt.Description, &dirManifest, getMetadataCompressor())
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "error writing dir manifest: %v", opt.Description)
|
||||
}
|
||||
|
||||
return udmrepo.ID(oid.String()), nil
|
||||
}
|
||||
|
||||
// TODO add implementation in following PRs
|
||||
func (kr *kopiaRepository) ReadMetadata(ctx context.Context, id udmrepo.ID) (*udmrepo.Metadata, error) {
|
||||
return nil, errors.New("not supported")
|
||||
reader, err := kr.OpenObject(ctx, id)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error to open metadata object %v", id)
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
dirManifest := snapshot.DirManifest{}
|
||||
if err := json.NewDecoder(reader).Decode(&dirManifest); err != nil {
|
||||
return nil, errors.Wrap(err, "unable to parse directory object")
|
||||
}
|
||||
|
||||
meta := udmrepo.Metadata{}
|
||||
for _, sub := range dirManifest.Entries {
|
||||
meta.SubObjects = append(meta.SubObjects, udmrepo.ObjectMetadata{
|
||||
ID: udmrepo.ID(sub.ObjectID.String()),
|
||||
Name: sub.Name,
|
||||
Type: getObjectDataType(sub.Type),
|
||||
Size: sub.FileSize,
|
||||
ModTime: sub.ModTime.ToTime(),
|
||||
Permissions: int(sub.Permissions),
|
||||
UserID: sub.UserID,
|
||||
GroupID: sub.GroupID,
|
||||
})
|
||||
}
|
||||
|
||||
return &meta, nil
|
||||
}
|
||||
|
||||
func (kr *kopiaRepository) PutManifest(ctx context.Context, manifest udmrepo.RepoManifest) (udmrepo.ID, error) {
|
||||
@@ -676,3 +739,25 @@ func openKopiaRepo(ctx context.Context, configFile string, password string, opti
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func getKopiaObjectType(tp int) snapshot.EntryType {
|
||||
switch tp {
|
||||
case udmrepo.ObjectDataTypeMetadata:
|
||||
return snapshot.EntryTypeDirectory
|
||||
case udmrepo.ObjectDataTypeData:
|
||||
return snapshot.EntryTypeFile
|
||||
default:
|
||||
return snapshot.EntryTypeUnknown
|
||||
}
|
||||
}
|
||||
|
||||
func getObjectDataType(tp snapshot.EntryType) int {
|
||||
switch tp {
|
||||
case snapshot.EntryTypeDirectory:
|
||||
return udmrepo.ObjectDataTypeMetadata
|
||||
case snapshot.EntryTypeFile:
|
||||
return udmrepo.ObjectDataTypeData
|
||||
default:
|
||||
return udmrepo.ObjectDataTypeUnknown
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package kopialib
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"math"
|
||||
"os"
|
||||
@@ -1282,3 +1283,170 @@ func TestIsReady(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeObjectReader struct {
|
||||
*bytes.Reader
|
||||
}
|
||||
|
||||
func (f *fakeObjectReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeObjectReader) Length() int64 {
|
||||
return int64(f.Reader.Len())
|
||||
}
|
||||
|
||||
func TestWriteMetadata(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
rawWriter *repomocks.MockRepositoryWriter
|
||||
rawObjWriter *repomocks.Writer
|
||||
meta *udmrepo.Metadata
|
||||
rawWriterRetErr error
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "raw writer is nil",
|
||||
expectedErr: "repo writer is closed or not open",
|
||||
},
|
||||
{
|
||||
name: "invalid object id",
|
||||
rawWriter: repomocks.NewMockRepositoryWriter(t),
|
||||
meta: &udmrepo.Metadata{
|
||||
SubObjects: []udmrepo.ObjectMetadata{
|
||||
{
|
||||
ID: "fake-id",
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedErr: "error parsing object ID from {fake-id 0 0 0001-01-01 00:00:00 +0000 UTC 0 0 0}: malformed content ID: \"fake-id\": invalid content prefix",
|
||||
},
|
||||
{
|
||||
name: "write dir manifest fail",
|
||||
rawWriter: repomocks.NewMockRepositoryWriter(t),
|
||||
rawObjWriter: repomocks.NewWriter(t),
|
||||
meta: &udmrepo.Metadata{
|
||||
SubObjects: []udmrepo.ObjectMetadata{
|
||||
{
|
||||
ID: "I123456",
|
||||
},
|
||||
},
|
||||
},
|
||||
rawWriterRetErr: errors.New("fake-write-error"),
|
||||
expectedErr: "error writing dir manifest: : unable to encode directory JSON: fake-write-error",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
rawWriter: repomocks.NewMockRepositoryWriter(t),
|
||||
rawObjWriter: repomocks.NewWriter(t),
|
||||
meta: &udmrepo.Metadata{
|
||||
SubObjects: []udmrepo.ObjectMetadata{
|
||||
{
|
||||
ID: "I123456",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
kr := &kopiaRepository{}
|
||||
|
||||
if tc.rawWriter != nil {
|
||||
if tc.rawObjWriter != nil {
|
||||
tc.rawWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(tc.rawObjWriter)
|
||||
if tc.rawWriterRetErr != nil {
|
||||
tc.rawObjWriter.On("Write", mock.Anything).Return(0, tc.rawWriterRetErr)
|
||||
tc.rawObjWriter.On("Close").Return(nil)
|
||||
} else {
|
||||
tc.rawObjWriter.On("Write", mock.Anything).Return(10, nil)
|
||||
tc.rawObjWriter.On("Result").Return(object.ID{}, nil)
|
||||
tc.rawObjWriter.On("Close").Return(nil)
|
||||
}
|
||||
}
|
||||
kr.rawWriter = tc.rawWriter
|
||||
}
|
||||
|
||||
_, err := kr.WriteMetadata(t.Context(), tc.meta, udmrepo.ObjectWriteOptions{})
|
||||
|
||||
if tc.expectedErr == "" {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.EqualError(t, err, tc.expectedErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadMetadata(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
rawRepo *repomocks.MockRepository
|
||||
objectID udmrepo.ID
|
||||
openErr error
|
||||
readData []byte
|
||||
expectedErr string
|
||||
expected *udmrepo.Metadata
|
||||
}{
|
||||
{
|
||||
name: "open object fail",
|
||||
rawRepo: repomocks.NewMockRepository(t),
|
||||
objectID: "I123456",
|
||||
openErr: errors.New("fake-open-error"),
|
||||
expectedErr: "error to open metadata object I123456: error to open object: fake-open-error",
|
||||
},
|
||||
{
|
||||
name: "invalid json",
|
||||
rawRepo: repomocks.NewMockRepository(t),
|
||||
objectID: "I123456",
|
||||
readData: []byte("invalid json"),
|
||||
expectedErr: "unable to parse directory object: invalid character 'i' looking for beginning of value",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
rawRepo: repomocks.NewMockRepository(t),
|
||||
objectID: "I123456",
|
||||
readData: []byte(`{"stream":"kopia:directory","entries":[{"name":"file1","type":"f","mode":"0644","size":100,"uid":1000,"gid":1000,"mtime":"2023-01-01T00:00:00Z","obj":"I123456"}]}`),
|
||||
expected: &udmrepo.Metadata{
|
||||
SubObjects: []udmrepo.ObjectMetadata{
|
||||
{
|
||||
ID: "I123456",
|
||||
Name: "file1",
|
||||
Type: udmrepo.ObjectDataTypeData,
|
||||
Size: 100,
|
||||
ModTime: time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC).Local(),
|
||||
Permissions: 420,
|
||||
UserID: 1000,
|
||||
GroupID: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
kr := &kopiaRepository{}
|
||||
|
||||
if tc.rawRepo != nil {
|
||||
if tc.openErr != nil {
|
||||
tc.rawRepo.On("OpenObject", mock.Anything, mock.Anything).Return(nil, tc.openErr)
|
||||
} else {
|
||||
reader := &fakeObjectReader{Reader: bytes.NewReader(tc.readData)}
|
||||
tc.rawRepo.On("OpenObject", mock.Anything, mock.Anything).Return(reader, nil)
|
||||
}
|
||||
kr.rawRepo = tc.rawRepo
|
||||
}
|
||||
|
||||
meta, err := kr.ReadMetadata(t.Context(), tc.objectID)
|
||||
|
||||
if tc.expectedErr == "" {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expected, meta)
|
||||
} else {
|
||||
assert.EqualError(t, err, tc.expectedErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,15 +77,19 @@ type AdvancedFeatureInfo struct {
|
||||
}
|
||||
|
||||
type ObjectMetadata struct {
|
||||
ID ID
|
||||
Type int // OBJECT_DATA_TYPE_*
|
||||
Size int64
|
||||
ID ID
|
||||
Name string
|
||||
Type int // OBJECT_DATA_TYPE_*
|
||||
Size int64
|
||||
ModTime time.Time
|
||||
Permissions int
|
||||
UserID uint32
|
||||
GroupID uint32
|
||||
}
|
||||
|
||||
type Metadata struct {
|
||||
SubObjects []ObjectMetadata // For dir metadata only, the sub objects in this dir.
|
||||
ExtraDataLen int // Extra data associated to this metadata.
|
||||
ExtraData []byte
|
||||
SubObjects []ObjectMetadata
|
||||
Summary string
|
||||
}
|
||||
|
||||
type Snapshot struct {
|
||||
|
||||
Reference in New Issue
Block a user