From 087630710c3deee23c2796e7fd724ebc93c569ca Mon Sep 17 00:00:00 2001 From: Felicitas Pojtinger Date: Fri, 24 Dec 2021 16:55:14 +0100 Subject: [PATCH] feat: Add interchangeable write buffers based on `io.File` and `filebuffer.Buffer` --- cmd/stfs/cmd/serve_ftp.go | 39 +++++------ cmd/stfs/cmd/serve_http.go | 39 ++++------- internal/cache/check.go | 22 ++++++- internal/cache/constants.go | 11 +++- internal/cache/error.go | 7 +- internal/cache/{cache.go => filesystem.go} | 8 +-- internal/cache/write.go | 76 ++++++++++++++++++++++ internal/fs/file.go | 22 +++++-- internal/fs/filesystem.go | 4 +- 9 files changed, 159 insertions(+), 69 deletions(-) rename internal/cache/{cache.go => filesystem.go} (87%) create mode 100644 internal/cache/write.go diff --git a/cmd/stfs/cmd/serve_ftp.go b/cmd/stfs/cmd/serve_ftp.go index 1cc2512..5dc5d8a 100644 --- a/cmd/stfs/cmd/serve_ftp.go +++ b/cmd/stfs/cmd/serve_ftp.go @@ -3,7 +3,6 @@ package cmd import ( "context" "fmt" - "io/ioutil" "log" "os" "path/filepath" @@ -19,7 +18,6 @@ import ( "github.com/pojntfx/stfs/pkg/config" "github.com/pojntfx/stfs/pkg/operations" "github.com/pojntfx/stfs/pkg/tape" - "github.com/spf13/afero" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -32,6 +30,8 @@ const ( signatureIdentityFlag = "signature-identity" signaturePasswordFlag = "signature-password" signatureRecipientFlag = "signature-recipient" + + cacheWriteFlag = "cache-write-type" ) var ( @@ -47,7 +47,11 @@ var serveFTPCmd = &cobra.Command{ return err } - if err := cache.CheckCacheType(viper.GetString(cacheFlag)); err != nil { + if err := cache.CheckFileSystemCacheType(viper.GetString(cacheFileSystemFlag)); err != nil { + return err + } + + if err := cache.CheckWriteCacheType(viper.GetString(cacheWriteFlag)); err != nil { return err } @@ -193,32 +197,22 @@ var serveFTPCmd = &cobra.Command{ }, viper.GetString(compressionLevelFlag), - func() (afero.File, func() error, error) { - tmpdir := filepath.Join(viper.GetString(cacheDirFlag), "io") - - if err := os.MkdirAll(tmpdir, os.ModePerm); err != nil { - return nil, nil, err - } - - f, err := ioutil.TempFile(tmpdir, "*") - if err != nil { - return nil, nil, err - } - - return f, func() error { - return os.Remove(filepath.Join(tmpdir, f.Name())) - }, nil + func() (sfs.WriteCache, func() error, error) { + return cache.NewCacheWrite( + filepath.Join(viper.GetString(cacheDirFlag), "write"), + viper.GetString(cacheWriteFlag), + ) }, logger.PrintHeader, ) - fs, err := cache.Cache( + fs, err := cache.NewCacheFilesystem( stfs, root, - viper.GetString(cacheFlag), + viper.GetString(cacheFileSystemFlag), viper.GetDuration(cacheDurationFlag), - filepath.Join(viper.GetString(cacheDirFlag), "cache"), + filepath.Join(viper.GetString(cacheDirFlag), "filesystem"), ) if err != nil { return err @@ -256,7 +250,8 @@ func init() { serveFTPCmd.PersistentFlags().StringP(compressionLevelFlag, "l", config.CompressionLevelBalanced, fmt.Sprintf("Compression level to use (default %v, available are %v)", config.CompressionLevelBalanced, config.KnownCompressionLevels)) serveFTPCmd.PersistentFlags().StringP(laddrFlag, "a", "localhost:1337", "Listen address") - serveFTPCmd.PersistentFlags().StringP(cacheFlag, "n", config.NoneKey, fmt.Sprintf("Cache to use (default %v, available are %v)", config.NoneKey, cache.KnownCacheTypes)) + serveFTPCmd.PersistentFlags().StringP(cacheFileSystemFlag, "n", config.NoneKey, fmt.Sprintf("File system cache to use (default %v, available are %v)", config.NoneKey, cache.KnownFileSystemCacheTypes)) + serveFTPCmd.PersistentFlags().StringP(cacheWriteFlag, "q", cache.WriteCacheTypeFile, fmt.Sprintf("Write cache to use (default %v, available are %v)", cache.WriteCacheTypeFile, cache.KnownWriteCacheTypes)) serveFTPCmd.PersistentFlags().DurationP(cacheDurationFlag, "u", time.Hour, "Duration until cache is invalidated") serveFTPCmd.PersistentFlags().StringP(cacheDirFlag, "w", cacheDir, "Directory to use if dir cache is enabled") diff --git a/cmd/stfs/cmd/serve_http.go b/cmd/stfs/cmd/serve_http.go index ebdedaf..0feeebd 100644 --- a/cmd/stfs/cmd/serve_http.go +++ b/cmd/stfs/cmd/serve_http.go @@ -3,10 +3,8 @@ package cmd import ( "context" "fmt" - "io/ioutil" "log" "net/http" - "os" "path/filepath" "time" @@ -25,10 +23,10 @@ import ( ) const ( - laddrFlag = "laddr" - cacheFlag = "cache" - cacheDirFlag = "cache-dir" - cacheDurationFlag = "cache-duration" + laddrFlag = "laddr" + cacheFileSystemFlag = "cache-filesystem-type" + cacheDirFlag = "cache-dir" + cacheDurationFlag = "cache-duration" ) var serveHTTPCmd = &cobra.Command{ @@ -40,7 +38,7 @@ var serveHTTPCmd = &cobra.Command{ return err } - if err := cache.CheckCacheType(viper.GetString(cacheFlag)); err != nil { + if err := cache.CheckFileSystemCacheType(viper.GetString(cacheFileSystemFlag)); err != nil { return err } @@ -127,33 +125,18 @@ var serveHTTPCmd = &cobra.Command{ Metadata: metadataPersister, }, - "", // We never write - func() (afero.File, func() error, error) { - tmpdir := filepath.Join(viper.GetString(cacheDirFlag), "io") - - if err := os.MkdirAll(tmpdir, os.ModePerm); err != nil { - return nil, nil, err - } - - f, err := ioutil.TempFile(tmpdir, "*") - if err != nil { - return nil, nil, err - } - - return f, func() error { - return os.Remove(filepath.Join(tmpdir, f.Name())) - }, nil - }, + "", // We never write + nil, // We never write logger.PrintHeader, ) - fs, err := cache.Cache( + fs, err := cache.NewCacheFilesystem( stfs, root, - viper.GetString(cacheFlag), + viper.GetString(cacheFileSystemFlag), viper.GetDuration(cacheDurationFlag), - filepath.Join(viper.GetString(cacheDirFlag), "cache"), + filepath.Join(viper.GetString(cacheDirFlag), "filesystem"), ) if err != nil { return err @@ -178,7 +161,7 @@ func init() { serveHTTPCmd.PersistentFlags().StringP(passwordFlag, "p", "", "Password for the private key") serveHTTPCmd.PersistentFlags().StringP(recipientFlag, "r", "", "Path to the public key to verify with") serveHTTPCmd.PersistentFlags().StringP(laddrFlag, "a", "localhost:1337", "Listen address") - serveHTTPCmd.PersistentFlags().StringP(cacheFlag, "n", config.NoneKey, fmt.Sprintf("Cache to use (default %v, available are %v)", config.NoneKey, cache.KnownCacheTypes)) + serveHTTPCmd.PersistentFlags().StringP(cacheFileSystemFlag, "n", config.NoneKey, fmt.Sprintf("File system cache to use (default %v, available are %v)", config.NoneKey, cache.KnownFileSystemCacheTypes)) serveHTTPCmd.PersistentFlags().DurationP(cacheDurationFlag, "u", time.Hour, "Duration until cache is invalidated") serveHTTPCmd.PersistentFlags().StringP(cacheDirFlag, "w", cacheDir, "Directory to use if dir cache is enabled") diff --git a/internal/cache/check.go b/internal/cache/check.go index 577d209..9778e33 100644 --- a/internal/cache/check.go +++ b/internal/cache/check.go @@ -1,16 +1,32 @@ package cache -func CheckCacheType(cacheType string) error { +func CheckFileSystemCacheType(cacheType string) error { cacheTypeIsKnown := false - for _, candidate := range KnownCacheTypes { + for _, candidate := range KnownFileSystemCacheTypes { if cacheType == candidate { cacheTypeIsKnown = true } } if !cacheTypeIsKnown { - return ErrCacheTypeUnknown + return ErrFileSystemCacheTypeUnknown + } + + return nil +} + +func CheckWriteCacheType(cacheType string) error { + cacheTypeIsKnown := false + + for _, candidate := range KnownWriteCacheTypes { + if cacheType == candidate { + cacheTypeIsKnown = true + } + } + + if !cacheTypeIsKnown { + return ErrWriteCacheTypeUnknown } return nil diff --git a/internal/cache/constants.go b/internal/cache/constants.go index b6a5a91..32bf841 100644 --- a/internal/cache/constants.go +++ b/internal/cache/constants.go @@ -5,10 +5,15 @@ import ( ) const ( - CacheTypeMemory = "memory" - CacheTypeDir = "dir" + FileSystemCacheTypeMemory = "memory" + FileSystemCacheTypeDir = "dir" + + WriteCacheTypeMemory = "memory" + WriteCacheTypeFile = "file" ) var ( - KnownCacheTypes = []string{config.NoneKey, CacheTypeMemory, CacheTypeDir} + KnownFileSystemCacheTypes = []string{config.NoneKey, FileSystemCacheTypeMemory, FileSystemCacheTypeDir} + + KnownWriteCacheTypes = []string{WriteCacheTypeMemory, WriteCacheTypeFile} ) diff --git a/internal/cache/error.go b/internal/cache/error.go index 7722600..e408ecb 100644 --- a/internal/cache/error.go +++ b/internal/cache/error.go @@ -3,6 +3,9 @@ package cache import "errors" var ( - ErrCacheTypeUnsupported = errors.New("cache type unsupported") - ErrCacheTypeUnknown = errors.New("cache type unknown") + ErrFileSystemCacheTypeUnsupported = errors.New("file system cache type unsupported") + ErrFileSystemCacheTypeUnknown = errors.New("file system cache type unknown") + + ErrWriteCacheTypeUnsupported = errors.New("write cache type unsupported") + ErrWriteCacheTypeUnknown = errors.New("write cache type unknown") ) diff --git a/internal/cache/cache.go b/internal/cache/filesystem.go similarity index 87% rename from internal/cache/cache.go rename to internal/cache/filesystem.go index e5af60d..c0c7626 100644 --- a/internal/cache/cache.go +++ b/internal/cache/filesystem.go @@ -9,7 +9,7 @@ import ( "github.com/spf13/afero" ) -func Cache( +func NewCacheFilesystem( base afero.Fs, root string, cacheType string, @@ -17,13 +17,13 @@ func Cache( cacheDir string, ) (afero.Fs, error) { switch cacheType { - case CacheTypeMemory: + case FileSystemCacheTypeMemory: if pathext.IsRoot(root) { return afero.NewCacheOnReadFs(base, afero.NewMemMapFs(), ttl), nil } return afero.NewCacheOnReadFs(afero.NewBasePathFs(base, root), afero.NewMemMapFs(), ttl), nil - case CacheTypeDir: + case FileSystemCacheTypeDir: if err := os.MkdirAll(cacheDir, os.ModePerm); err != nil { return nil, err } @@ -40,6 +40,6 @@ func Cache( return afero.NewBasePathFs(base, root), nil default: - return nil, ErrCacheTypeUnsupported + return nil, ErrFileSystemCacheTypeUnsupported } } diff --git a/internal/cache/write.go b/internal/cache/write.go new file mode 100644 index 0000000..00a61c8 --- /dev/null +++ b/internal/cache/write.go @@ -0,0 +1,76 @@ +package cache + +import ( + "io/ioutil" + "os" + "path/filepath" + + "github.com/mattetti/filebuffer" + "github.com/pojntfx/stfs/internal/fs" + "github.com/spf13/afero" +) + +type fileWithSize struct { + afero.File +} + +func (f fileWithSize) Size() (int64, error) { + info, err := f.Stat() + if err != nil { + return -1, err + } + + return info.Size(), nil +} + +type filebufferWithSize struct { + *filebuffer.Buffer +} + +func (f filebufferWithSize) Size() (int64, error) { + return int64(f.Buff.Len()), nil +} + +func (f filebufferWithSize) Sync() error { + // No need to sync a in-memory buffer + return nil +} + +func (f filebufferWithSize) Truncate(size int64) error { + f.Buff.Truncate(int(size)) + + return nil +} + +func NewCacheWrite( + root string, + cacheType string, +) (cache fs.WriteCache, cleanup func() error, err error) { + switch cacheType { + case WriteCacheTypeMemory: + buff := &filebufferWithSize{filebuffer.New([]byte{})} + + return buff, func() error { + buff = nil + + return nil + }, nil + case WriteCacheTypeFile: + tmpdir := filepath.Join(root, "io") + + if err := os.MkdirAll(tmpdir, os.ModePerm); err != nil { + return nil, nil, err + } + + f, err := ioutil.TempFile(tmpdir, "*") + if err != nil { + return nil, nil, err + } + + return fileWithSize{f}, func() error { + return os.Remove(filepath.Join(tmpdir, f.Name())) + }, nil + default: + return nil, nil, ErrWriteCacheTypeUnsupported + } +} diff --git a/internal/fs/file.go b/internal/fs/file.go index a38749d..7286be8 100644 --- a/internal/fs/file.go +++ b/internal/fs/file.go @@ -21,6 +21,17 @@ var ( ErrIsDirectory = errors.New("is a directory") ) +type WriteCache interface { + io.Closer + io.Reader + io.Seeker + io.Writer + + Truncate(size int64) error + Size() (int64, error) + Sync() error +} + type File struct { afero.File @@ -33,7 +44,7 @@ type File struct { link string compressionLevel string - getFileBuffer func() (afero.File, func() error, error) + getFileBuffer func() (WriteCache, func() error, error) name string info os.FileInfo @@ -44,7 +55,8 @@ type File struct { readOpWriter io.WriteCloser // TODO: Find a non-in-memory method to do this - writeBuf afero.File + writeBuf WriteCache + asdf afero.File cleanWriteBuf func() error onHeader func(hdr *models.Header) @@ -60,7 +72,7 @@ func NewFile( link string, compressionLevel string, - getFileBuffer func() (afero.File, func() error, error), + getFileBuffer func() (WriteCache, func() error, error), name string, info os.FileInfo, @@ -154,14 +166,14 @@ func (f *File) syncWithoutLocking() error { } done = true - stat, err := f.writeBuf.Stat() + size, err := f.writeBuf.Size() if err != nil { return config.FileConfig{}, err } f.info = &FileInfo{ name: f.info.Name(), - size: stat.Size(), + size: size, mode: f.info.Mode(), modTime: f.info.ModTime(), isDir: f.info.IsDir(), diff --git a/internal/fs/filesystem.go b/internal/fs/filesystem.go index 3716fe3..2c34c68 100644 --- a/internal/fs/filesystem.go +++ b/internal/fs/filesystem.go @@ -31,7 +31,7 @@ type FileSystem struct { metadata config.MetadataConfig compressionLevel string - getFileBuffer func() (afero.File, func() error, error) + getFileBuffer func() (WriteCache, func() error, error) onHeader func(hdr *models.Header) } @@ -43,7 +43,7 @@ func NewFileSystem( metadata config.MetadataConfig, compressionLevel string, - getFileBuffer func() (afero.File, func() error, error), + getFileBuffer func() (WriteCache, func() error, error), onHeader func(hdr *models.Header), ) afero.Fs {