This commit is contained in:
Chao Wang
2024-10-28 14:21:08 -07:00
parent 1f345e350d
commit f47dcba80c

View File

@@ -26,14 +26,14 @@ import (
)
const batchSize = 1000
const bufSize = 4096
const bufSize = 2 * 1024 * 1024
type WorkerWriter struct {
writer *C.struct_scoutfs_parallel_restore_writer
progressCh chan *ScoutfsParallelWriterProgress
fileCreated int64
devFd int
buf []byte
buf unsafe.Pointer
wg *sync.WaitGroup
}
@@ -104,18 +104,22 @@ func NewWriters(path string, numWriters int) (*MasterWriter, []*WorkerWriter, er
var masterWriter MasterWriter
masterWriter.progressCh = make(chan *ScoutfsParallelWriterProgress, numWriters*2)
masterWriter.workers = make([]*WorkerWriter, numWriters)
masterWriter.workers = make([]*WorkerWriter, 0, numWriters-1)
masterWriter.devFd = devFd
masterWriter.super = (*C.struct_scoutfs_super_block)(C.malloc(C.size_t(unsafe.Sizeof(*masterWriter.super))))
if masterWriter.super == nil {
var ret C.int
// Allocate aligned memory for superblock
var super unsafe.Pointer
ret = C.posix_memalign(&super, 4096, C.SCOUTFS_BLOCK_SM_SIZE)
if ret != 0 {
masterWriter.Destroy()
return nil, nil, errors.New("failed to allocate memory for superblock")
return nil, nil, fmt.Errorf("failed to allocate aligned memory for superblock: %d", ret)
}
masterWriter.super = (*C.struct_scoutfs_super_block)(super)
// Read the superblock from devFd.
// Read the superblock from devFd
superOffset := C.SCOUTFS_SUPER_BLKNO << C.SCOUTFS_BLOCK_SM_SHIFT
count, err := syscall.Pread(devFd, (*[1 << 30]byte)(unsafe.Pointer(masterWriter.super))[:C.SCOUTFS_BLOCK_SM_SIZE], int64(superOffset))
count, err := syscall.Pread(devFd, (*[1 << 30]byte)(super)[:C.SCOUTFS_BLOCK_SM_SIZE], int64(superOffset))
if err != nil {
masterWriter.Destroy()
return nil, nil, fmt.Errorf("failed to read superblock: %v", err)
@@ -131,7 +135,6 @@ func NewWriters(path string, numWriters int) (*MasterWriter, []*WorkerWriter, er
return nil, nil, errors.New("superblock is not a metadata device")
}
var ret C.int
// Create master writer
ret = C.scoutfs_parallel_restore_create_writer(&masterWriter.writer)
if ret != 0 {
@@ -169,9 +172,16 @@ func NewWriters(path string, numWriters int) (*MasterWriter, []*WorkerWriter, er
// Create worker writers
for i := 1; i < numWriters; i++ {
var bufPtr unsafe.Pointer
if ret := C.posix_memalign(&bufPtr, 4096, bufSize); ret != 0 {
masterWriter.Destroy()
return nil, nil, fmt.Errorf("failed to allocate aligned worker buffer: %d", ret)
}
worker := &WorkerWriter{
progressCh: masterWriter.progressCh,
buf: make([]byte, bufSize),
buf: bufPtr,
wg: &masterWriter.wg,
}
ret = C.scoutfs_parallel_restore_create_writer(&worker.writer)
if ret != 0 {
@@ -191,7 +201,7 @@ func NewWriters(path string, numWriters int) (*MasterWriter, []*WorkerWriter, er
return nil, nil, errors.New("failed to add slice to worker writer")
}
masterWriter.workers[i] = worker
masterWriter.workers = append(masterWriter.workers, worker)
}
go masterWriter.aggregateProgress()
@@ -239,9 +249,9 @@ func (w *WorkerWriter) getProgress(withSlice bool) (*ScoutfsParallelWriterProgre
}, nil
}
// writeBufs writes data from the buffer to the device file descriptor.
// writeBuffer writes data from the buffer to the device file descriptor.
// It uses scoutfs_parallel_restore_write_buf to get data and pwrite to write it.
func (w *WorkerWriter) writeBufs() (int64, error) {
func (w *WorkerWriter) writeBuffer() (int64, error) {
var totalWritten int64
var count int64
var off int64
@@ -252,15 +262,15 @@ func (w *WorkerWriter) writeBufs() (int64, error) {
countPtr := (*C.size_t)(unsafe.Pointer(&count))
for {
ret = C.scoutfs_parallel_restore_write_buf(w.writer, unsafe.Pointer(&w.buf[0]),
C.size_t(len(w.buf)), offPtr, countPtr)
ret = C.scoutfs_parallel_restore_write_buf(w.writer, w.buf,
C.size_t(bufSize), offPtr, countPtr)
if ret != 0 {
return totalWritten, fmt.Errorf("failed to write buffer: error code %d", ret)
}
if count > 0 {
n, err := syscall.Pwrite(w.devFd, w.buf[:count], off)
n, err := syscall.Pwrite(w.devFd, unsafe.Slice((*byte)(w.buf), count), off)
if err != nil {
return totalWritten, fmt.Errorf("pwrite failed: %v", err)
}
@@ -288,7 +298,7 @@ func (w *WorkerWriter) InsertEntry(entry *C.struct_scoutfs_parallel_restore_entr
// Increment the fileCreated counter
w.fileCreated++
if w.fileCreated >= batchSize {
_, err := w.writeBufs()
_, err := w.writeBuffer()
if err != nil {
return fmt.Errorf("error writing buffers: %v", err)
}
@@ -327,6 +337,10 @@ func (w *WorkerWriter) Destroy() error {
defer w.wg.Done()
// Send final progress if there are remaining entries
if w.fileCreated > 0 {
_, err := w.writeBuffer()
if err != nil {
return err
}
progress := &ScoutfsParallelWriterProgress{
Progress: (*C.struct_scoutfs_parallel_restore_progress)(C.malloc(C.size_t(unsafe.Sizeof(C.struct_scoutfs_parallel_restore_progress{})))),
Slice: (*C.struct_scoutfs_parallel_restore_slice)(C.malloc(C.size_t(unsafe.Sizeof(C.struct_scoutfs_parallel_restore_slice{})))),
@@ -335,6 +349,121 @@ func (w *WorkerWriter) Destroy() error {
w.fileCreated = 0
}
if w.buf != nil {
C.free(w.buf)
w.buf = nil
}
C.scoutfs_parallel_restore_destroy_writer(&w.writer)
return nil
}
// Add these new types and functions to the existing restore.go file
type FileInfo struct {
Ino uint64
Mode uint32
Uid uint32
Gid uint32
Size uint64
Rdev uint64
AtimeSec int64
AtimeNsec int64
MtimeSec int64
MtimeNsec int64
CtimeSec int64
CtimeNsec int64
NrSubdirs uint64
NameBytes uint64
IsDir bool
IsRegular bool
}
type XAttr struct {
Name string
Value []byte
}
// CreateInode creates a C inode structure from FileInfo
func (w *WorkerWriter) CreateInode(info FileInfo) error {
inode := (*C.struct_scoutfs_parallel_restore_inode)(C.malloc(C.size_t(unsafe.Sizeof(C.struct_scoutfs_parallel_restore_inode{}))))
if inode == nil {
return fmt.Errorf("failed to allocate inode")
}
defer C.free(unsafe.Pointer(inode))
inode.ino = C.__u64(info.Ino)
inode.mode = C.__u32(info.Mode)
inode.uid = C.__u32(info.Uid)
inode.gid = C.__u32(info.Gid)
inode.size = C.__u64(info.Size)
inode.rdev = C.uint(info.Rdev)
inode.atime.tv_sec = C.__time_t(info.AtimeSec)
inode.atime.tv_nsec = C.long(info.AtimeNsec)
inode.mtime.tv_sec = C.__time_t(info.MtimeSec)
inode.mtime.tv_nsec = C.long(info.MtimeNsec)
inode.ctime.tv_sec = C.__time_t(info.CtimeSec)
inode.ctime.tv_nsec = C.long(info.CtimeNsec)
inode.crtime = inode.ctime
if info.IsRegular && info.Size > 0 {
inode.offline = C.bool(true)
}
if info.IsDir {
inode.nr_subdirs = C.__u64(info.NrSubdirs)
inode.total_entry_name_bytes = C.__u64(info.NameBytes)
}
return w.InsertInode(inode)
}
// CreateEntry creates a directory entry
func (w *WorkerWriter) CreateEntry(dirIno uint64, pos uint64, ino uint64, mode uint32, name string) error {
entryC := (*C.struct_scoutfs_parallel_restore_entry)(C.malloc(C.size_t(unsafe.Sizeof(C.struct_scoutfs_parallel_restore_entry{})) + C.size_t(len(name))))
if entryC == nil {
return fmt.Errorf("failed to allocate entry")
}
defer C.free(unsafe.Pointer(entryC))
entryC.dir_ino = C.__u64(dirIno)
entryC.pos = C.__u64(pos)
entryC.ino = C.__u64(ino)
entryC.mode = C.__u32(mode)
entryC.name_len = C.uint(len(name))
entryC.name = (*C.char)(C.malloc(C.size_t(len(name))))
if entryC.name == nil {
return fmt.Errorf("failed to allocate entry name")
}
defer C.free(unsafe.Pointer(entryC.name))
copy((*[1 << 30]byte)(unsafe.Pointer(entryC.name))[:len(name)], []byte(name))
return w.InsertEntry(entryC)
}
// CreateXAttr creates an extended attribute
func (w *WorkerWriter) CreateXAttr(ino uint64, pos uint64, xattr XAttr) error {
xattrC := (*C.struct_scoutfs_parallel_restore_xattr)(C.malloc(C.size_t(unsafe.Sizeof(C.struct_scoutfs_parallel_restore_xattr{})) + C.size_t(len(xattr.Name)) + C.size_t(len(xattr.Value))))
if xattrC == nil {
return fmt.Errorf("failed to allocate xattr")
}
defer C.free(unsafe.Pointer(xattrC))
xattrC.ino = C.__u64(ino)
xattrC.pos = C.__u64(pos)
xattrC.name_len = C.uint(len(xattr.Name))
xattrC.value_len = C.__u32(len(xattr.Value))
xattrC.name = (*C.char)(C.malloc(C.size_t(len(xattr.Name))))
if xattrC.name == nil {
return fmt.Errorf("failed to allocate xattr name")
}
copy((*[1 << 30]byte)(unsafe.Pointer(xattrC.name))[:len(xattr.Name)], []byte(xattr.Name))
xattrC.value = unsafe.Pointer(&xattr.Value[0])
return w.InsertXattr(xattrC)
}