feat: Add centralized tape access management to archive cmd

This commit is contained in:
Felicitas Pojtinger
2021-12-14 22:05:55 +01:00
parent 9d1c6fb731
commit 1a16fd2402
7 changed files with 213 additions and 32 deletions

View File

@@ -63,36 +63,24 @@ var archiveCmd = &cobra.Command{
return err
}
writer, writerIsRegular, err := tape.OpenTapeWriteOnly(
tm := tape.NewTapeManager(
viper.GetString(driveFlag),
viper.GetInt(recordSizeFlag),
viper.GetBool(overwriteFlag),
)
if err != nil {
return err
}
defer writer.Close()
reader, readerIsRegular, err := tape.OpenTapeReadOnly(
viper.GetString(driveFlag),
)
if err != nil {
return err
}
defer reader.Close()
if _, err := operations.Archive(
config.DriveWriterConfig{
Drive: writer,
DriveIsRegular: writerIsRegular,
},
config.DriveConfig{
Drive: reader,
DriveIsRegular: readerIsRegular,
},
config.DriveReaderConfig{
Drive: reader,
DriveIsRegular: readerIsRegular,
},
ops := operations.NewOperations(
tm.GetWriter,
tm.Close,
tm.GetReader,
tm.Close,
tm.GetDrive,
tm.Close,
)
if _, err := ops.Archive(
config.MetadataConfig{
Metadata: viper.GetString(metadataFlag),
},

View File

@@ -31,10 +31,7 @@ var (
errSocketsNotSupported = errors.New("archive/tar: sockets not supported")
)
func Archive(
writer config.DriveWriterConfig,
drive config.DriveConfig,
reader config.DriveReaderConfig,
func (o *Operations) Archive(
metadata config.MetadataConfig,
pipes config.PipeConfig,
crypto config.CryptoConfig,
@@ -46,12 +43,19 @@ func Archive(
onHeader func(hdr *models.Header),
) ([]*tar.Header, error) {
o.writeLock.Lock()
defer o.writeLock.Unlock()
writer, err := o.getWriter()
if err != nil {
return []*tar.Header{}, err
}
dirty := false
tw, cleanup, err := tarext.NewTapeWriter(writer.Drive, writer.DriveIsRegular, recordSize)
if err != nil {
return []*tar.Header{}, err
}
defer cleanup(&dirty)
metadataPersister := persisters.NewMetadataPersister(metadata.Metadata)
if err := metadataPersister.Open(); err != nil {
@@ -260,11 +264,31 @@ func Archive(
return []*tar.Header{}, err
}
if err := cleanup(&dirty); err != nil {
return []*tar.Header{}, err
}
index := 1 // Ignore the first header, which is the last header which we already indexed
if overwrite {
index = 0 // If we are starting fresh, index from start
}
if err := o.closeWriter(); err != nil {
return []*tar.Header{}, err
}
reader, err := o.getReader()
if err != nil {
return []*tar.Header{}, err
}
defer o.closeReader()
drive, err := o.getDrive()
if err != nil {
return []*tar.Header{}, err
}
defer o.closeDrive()
return hdrs, recovery.Index(
reader,
drive,

View File

@@ -33,7 +33,6 @@ func Delete(
if err != nil {
return err
}
defer cleanup(&dirty)
metadataPersister := persisters.NewMetadataPersister(metadata.Metadata)
if err := metadataPersister.Open(); err != nil {
@@ -100,6 +99,10 @@ func Delete(
dirty = true
}
if err := cleanup(&dirty); err != nil {
return err
}
return recovery.Index(
reader,
drive,

View File

@@ -35,7 +35,6 @@ func Move(
if err != nil {
return err
}
defer cleanup(&dirty)
metadataPersister := persisters.NewMetadataPersister(metadata.Metadata)
if err := metadataPersister.Open(); err != nil {
@@ -104,6 +103,10 @@ func Move(
dirty = true
}
if err := cleanup(&dirty); err != nil {
return err
}
return recovery.Index(
reader,
drive,

View File

@@ -0,0 +1,42 @@
package operations
import (
"sync"
"github.com/pojntfx/stfs/pkg/config"
)
type Operations struct {
getWriter func() (config.DriveWriterConfig, error)
closeWriter func() error
getReader func() (config.DriveReaderConfig, error)
closeReader func() error
getDrive func() (config.DriveConfig, error)
closeDrive func() error
writeLock sync.Mutex
}
func NewOperations(
getWriter func() (config.DriveWriterConfig, error),
closeWriter func() error,
getReader func() (config.DriveReaderConfig, error),
closeReader func() error,
getDrive func() (config.DriveConfig, error),
closeDrive func() error,
) *Operations {
return &Operations{
getWriter: getWriter,
closeWriter: closeWriter,
getReader: getReader,
closeReader: closeReader,
getDrive: getDrive,
closeDrive: closeDrive,
}
}

View File

@@ -280,6 +280,10 @@ func Update(
return []*tar.Header{}, err
}
if err := cleanup(&dirty); err != nil {
return []*tar.Header{}, err
}
return hdrs, recovery.Index(
reader,
drive,

117
pkg/tape/manager.go Normal file
View File

@@ -0,0 +1,117 @@
package tape
import (
"io"
"os"
"sync"
"syscall"
"github.com/pojntfx/stfs/pkg/config"
)
type TapeManager struct {
drive string
recordSize int
overwrite bool
driveLock sync.Mutex
readerLock sync.Mutex
reader *os.File
readerIsRegular bool
closer func() error
}
func NewTapeManager(
drive string,
recordSize int,
overwrite bool,
) *TapeManager {
return &TapeManager{
drive: drive,
recordSize: recordSize,
overwrite: overwrite,
}
}
func (m *TapeManager) GetWriter() (config.DriveWriterConfig, error) {
m.driveLock.Lock()
writer, writerIsRegular, err := OpenTapeWriteOnly(
m.drive,
m.recordSize,
m.overwrite,
)
if err != nil {
return config.DriveWriterConfig{}, err
}
m.closer = writer.Close
return config.DriveWriterConfig{
Drive: writer,
DriveIsRegular: writerIsRegular,
}, nil
}
func (m *TapeManager) GetReader() (config.DriveReaderConfig, error) {
if err := m.openOrReuseReader(); err != nil {
return config.DriveReaderConfig{}, err
}
return config.DriveReaderConfig{
Drive: m.reader,
DriveIsRegular: m.readerIsRegular,
}, nil
}
func (m *TapeManager) GetDrive() (config.DriveConfig, error) {
if err := m.openOrReuseReader(); err != nil {
return config.DriveConfig{}, err
}
return config.DriveConfig{
Drive: m.reader,
DriveIsRegular: m.readerIsRegular,
}, nil
}
func (m *TapeManager) Close() error {
if err := m.closer(); err != nil {
return err
}
m.driveLock.Unlock()
return nil
}
func (m *TapeManager) openOrReuseReader() error {
m.readerLock.Lock()
defer m.readerLock.Unlock()
reopen := false
if m.reader == nil {
reopen = true
} else if _, err := m.reader.Seek(0, io.SeekCurrent); err == syscall.EBADF {
// File is closed
reopen = true
}
if reopen {
m.driveLock.Lock()
r, rr, err := OpenTapeReadOnly(m.drive)
if err != nil {
return err
}
m.reader = r
m.readerIsRegular = rr
m.closer = r.Close
}
return nil
}