From f47dcba80c9431acb4b22d49ef95d4a031efa41e Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 28 Oct 2024 14:21:08 -0700 Subject: [PATCH] WIP --- restore/pkg/restore/restore.go | 163 +++++++++++++++++++++++++++++---- 1 file changed, 146 insertions(+), 17 deletions(-) diff --git a/restore/pkg/restore/restore.go b/restore/pkg/restore/restore.go index 19768ab7..b588c53a 100644 --- a/restore/pkg/restore/restore.go +++ b/restore/pkg/restore/restore.go @@ -26,14 +26,14 @@ import ( ) const batchSize = 1000 -const bufSize = 4096 +const bufSize = 2 * 1024 * 1024 type WorkerWriter struct { writer *C.struct_scoutfs_parallel_restore_writer progressCh chan *ScoutfsParallelWriterProgress fileCreated int64 devFd int - buf []byte + buf unsafe.Pointer wg *sync.WaitGroup } @@ -104,18 +104,22 @@ func NewWriters(path string, numWriters int) (*MasterWriter, []*WorkerWriter, er var masterWriter MasterWriter masterWriter.progressCh = make(chan *ScoutfsParallelWriterProgress, numWriters*2) - masterWriter.workers = make([]*WorkerWriter, numWriters) + masterWriter.workers = make([]*WorkerWriter, 0, numWriters-1) masterWriter.devFd = devFd - masterWriter.super = (*C.struct_scoutfs_super_block)(C.malloc(C.size_t(unsafe.Sizeof(*masterWriter.super)))) - if masterWriter.super == nil { + var ret C.int + // Allocate aligned memory for superblock + var super unsafe.Pointer + ret = C.posix_memalign(&super, 4096, C.SCOUTFS_BLOCK_SM_SIZE) + if ret != 0 { masterWriter.Destroy() - return nil, nil, errors.New("failed to allocate memory for superblock") + return nil, nil, fmt.Errorf("failed to allocate aligned memory for superblock: %d", ret) } + masterWriter.super = (*C.struct_scoutfs_super_block)(super) - // Read the superblock from devFd. + // Read the superblock from devFd superOffset := C.SCOUTFS_SUPER_BLKNO << C.SCOUTFS_BLOCK_SM_SHIFT - count, err := syscall.Pread(devFd, (*[1 << 30]byte)(unsafe.Pointer(masterWriter.super))[:C.SCOUTFS_BLOCK_SM_SIZE], int64(superOffset)) + count, err := syscall.Pread(devFd, (*[1 << 30]byte)(super)[:C.SCOUTFS_BLOCK_SM_SIZE], int64(superOffset)) if err != nil { masterWriter.Destroy() return nil, nil, fmt.Errorf("failed to read superblock: %v", err) @@ -131,7 +135,6 @@ func NewWriters(path string, numWriters int) (*MasterWriter, []*WorkerWriter, er return nil, nil, errors.New("superblock is not a metadata device") } - var ret C.int // Create master writer ret = C.scoutfs_parallel_restore_create_writer(&masterWriter.writer) if ret != 0 { @@ -169,9 +172,16 @@ func NewWriters(path string, numWriters int) (*MasterWriter, []*WorkerWriter, er // Create worker writers for i := 1; i < numWriters; i++ { + var bufPtr unsafe.Pointer + if ret := C.posix_memalign(&bufPtr, 4096, bufSize); ret != 0 { + masterWriter.Destroy() + return nil, nil, fmt.Errorf("failed to allocate aligned worker buffer: %d", ret) + } + worker := &WorkerWriter{ progressCh: masterWriter.progressCh, - buf: make([]byte, bufSize), + buf: bufPtr, + wg: &masterWriter.wg, } ret = C.scoutfs_parallel_restore_create_writer(&worker.writer) if ret != 0 { @@ -191,7 +201,7 @@ func NewWriters(path string, numWriters int) (*MasterWriter, []*WorkerWriter, er return nil, nil, errors.New("failed to add slice to worker writer") } - masterWriter.workers[i] = worker + masterWriter.workers = append(masterWriter.workers, worker) } go masterWriter.aggregateProgress() @@ -239,9 +249,9 @@ func (w *WorkerWriter) getProgress(withSlice bool) (*ScoutfsParallelWriterProgre }, nil } -// writeBufs writes data from the buffer to the device file descriptor. +// writeBuffer writes data from the buffer to the device file descriptor. // It uses scoutfs_parallel_restore_write_buf to get data and pwrite to write it. -func (w *WorkerWriter) writeBufs() (int64, error) { +func (w *WorkerWriter) writeBuffer() (int64, error) { var totalWritten int64 var count int64 var off int64 @@ -252,15 +262,15 @@ func (w *WorkerWriter) writeBufs() (int64, error) { countPtr := (*C.size_t)(unsafe.Pointer(&count)) for { - ret = C.scoutfs_parallel_restore_write_buf(w.writer, unsafe.Pointer(&w.buf[0]), - C.size_t(len(w.buf)), offPtr, countPtr) + ret = C.scoutfs_parallel_restore_write_buf(w.writer, w.buf, + C.size_t(bufSize), offPtr, countPtr) if ret != 0 { return totalWritten, fmt.Errorf("failed to write buffer: error code %d", ret) } if count > 0 { - n, err := syscall.Pwrite(w.devFd, w.buf[:count], off) + n, err := syscall.Pwrite(w.devFd, unsafe.Slice((*byte)(w.buf), count), off) if err != nil { return totalWritten, fmt.Errorf("pwrite failed: %v", err) } @@ -288,7 +298,7 @@ func (w *WorkerWriter) InsertEntry(entry *C.struct_scoutfs_parallel_restore_entr // Increment the fileCreated counter w.fileCreated++ if w.fileCreated >= batchSize { - _, err := w.writeBufs() + _, err := w.writeBuffer() if err != nil { return fmt.Errorf("error writing buffers: %v", err) } @@ -327,6 +337,10 @@ func (w *WorkerWriter) Destroy() error { defer w.wg.Done() // Send final progress if there are remaining entries if w.fileCreated > 0 { + _, err := w.writeBuffer() + if err != nil { + return err + } progress := &ScoutfsParallelWriterProgress{ Progress: (*C.struct_scoutfs_parallel_restore_progress)(C.malloc(C.size_t(unsafe.Sizeof(C.struct_scoutfs_parallel_restore_progress{})))), Slice: (*C.struct_scoutfs_parallel_restore_slice)(C.malloc(C.size_t(unsafe.Sizeof(C.struct_scoutfs_parallel_restore_slice{})))), @@ -335,6 +349,121 @@ func (w *WorkerWriter) Destroy() error { w.fileCreated = 0 } + if w.buf != nil { + C.free(w.buf) + w.buf = nil + } + C.scoutfs_parallel_restore_destroy_writer(&w.writer) return nil } + +// Add these new types and functions to the existing restore.go file + +type FileInfo struct { + Ino uint64 + Mode uint32 + Uid uint32 + Gid uint32 + Size uint64 + Rdev uint64 + AtimeSec int64 + AtimeNsec int64 + MtimeSec int64 + MtimeNsec int64 + CtimeSec int64 + CtimeNsec int64 + NrSubdirs uint64 + NameBytes uint64 + IsDir bool + IsRegular bool +} + +type XAttr struct { + Name string + Value []byte +} + +// CreateInode creates a C inode structure from FileInfo +func (w *WorkerWriter) CreateInode(info FileInfo) error { + inode := (*C.struct_scoutfs_parallel_restore_inode)(C.malloc(C.size_t(unsafe.Sizeof(C.struct_scoutfs_parallel_restore_inode{})))) + if inode == nil { + return fmt.Errorf("failed to allocate inode") + } + defer C.free(unsafe.Pointer(inode)) + + inode.ino = C.__u64(info.Ino) + inode.mode = C.__u32(info.Mode) + inode.uid = C.__u32(info.Uid) + inode.gid = C.__u32(info.Gid) + inode.size = C.__u64(info.Size) + inode.rdev = C.uint(info.Rdev) + + inode.atime.tv_sec = C.__time_t(info.AtimeSec) + inode.atime.tv_nsec = C.long(info.AtimeNsec) + inode.mtime.tv_sec = C.__time_t(info.MtimeSec) + inode.mtime.tv_nsec = C.long(info.MtimeNsec) + inode.ctime.tv_sec = C.__time_t(info.CtimeSec) + inode.ctime.tv_nsec = C.long(info.CtimeNsec) + inode.crtime = inode.ctime + + if info.IsRegular && info.Size > 0 { + inode.offline = C.bool(true) + } + + if info.IsDir { + inode.nr_subdirs = C.__u64(info.NrSubdirs) + inode.total_entry_name_bytes = C.__u64(info.NameBytes) + } + + return w.InsertInode(inode) +} + +// CreateEntry creates a directory entry +func (w *WorkerWriter) CreateEntry(dirIno uint64, pos uint64, ino uint64, mode uint32, name string) error { + entryC := (*C.struct_scoutfs_parallel_restore_entry)(C.malloc(C.size_t(unsafe.Sizeof(C.struct_scoutfs_parallel_restore_entry{})) + C.size_t(len(name)))) + + if entryC == nil { + return fmt.Errorf("failed to allocate entry") + } + defer C.free(unsafe.Pointer(entryC)) + + entryC.dir_ino = C.__u64(dirIno) + entryC.pos = C.__u64(pos) + entryC.ino = C.__u64(ino) + entryC.mode = C.__u32(mode) + entryC.name_len = C.uint(len(name)) + + entryC.name = (*C.char)(C.malloc(C.size_t(len(name)))) + if entryC.name == nil { + return fmt.Errorf("failed to allocate entry name") + } + defer C.free(unsafe.Pointer(entryC.name)) + copy((*[1 << 30]byte)(unsafe.Pointer(entryC.name))[:len(name)], []byte(name)) + + return w.InsertEntry(entryC) +} + +// CreateXAttr creates an extended attribute +func (w *WorkerWriter) CreateXAttr(ino uint64, pos uint64, xattr XAttr) error { + xattrC := (*C.struct_scoutfs_parallel_restore_xattr)(C.malloc(C.size_t(unsafe.Sizeof(C.struct_scoutfs_parallel_restore_xattr{})) + C.size_t(len(xattr.Name)) + C.size_t(len(xattr.Value)))) + if xattrC == nil { + return fmt.Errorf("failed to allocate xattr") + } + defer C.free(unsafe.Pointer(xattrC)) + + xattrC.ino = C.__u64(ino) + xattrC.pos = C.__u64(pos) + xattrC.name_len = C.uint(len(xattr.Name)) + xattrC.value_len = C.__u32(len(xattr.Value)) + + xattrC.name = (*C.char)(C.malloc(C.size_t(len(xattr.Name)))) + if xattrC.name == nil { + return fmt.Errorf("failed to allocate xattr name") + } + copy((*[1 << 30]byte)(unsafe.Pointer(xattrC.name))[:len(xattr.Name)], []byte(xattr.Name)) + + xattrC.value = unsafe.Pointer(&xattr.Value[0]) + + return w.InsertXattr(xattrC) +}