feat: Add interchangeable write buffers based on io.File and filebuffer.Buffer

This commit is contained in:
Felicitas Pojtinger
2021-12-24 16:55:14 +01:00
parent 31459e0efd
commit 087630710c
9 changed files with 159 additions and 69 deletions

View File

@@ -3,7 +3,6 @@ package cmd
import (
"context"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
@@ -19,7 +18,6 @@ import (
"github.com/pojntfx/stfs/pkg/config"
"github.com/pojntfx/stfs/pkg/operations"
"github.com/pojntfx/stfs/pkg/tape"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
@@ -32,6 +30,8 @@ const (
signatureIdentityFlag = "signature-identity"
signaturePasswordFlag = "signature-password"
signatureRecipientFlag = "signature-recipient"
cacheWriteFlag = "cache-write-type"
)
var (
@@ -47,7 +47,11 @@ var serveFTPCmd = &cobra.Command{
return err
}
if err := cache.CheckCacheType(viper.GetString(cacheFlag)); err != nil {
if err := cache.CheckFileSystemCacheType(viper.GetString(cacheFileSystemFlag)); err != nil {
return err
}
if err := cache.CheckWriteCacheType(viper.GetString(cacheWriteFlag)); err != nil {
return err
}
@@ -193,32 +197,22 @@ var serveFTPCmd = &cobra.Command{
},
viper.GetString(compressionLevelFlag),
func() (afero.File, func() error, error) {
tmpdir := filepath.Join(viper.GetString(cacheDirFlag), "io")
if err := os.MkdirAll(tmpdir, os.ModePerm); err != nil {
return nil, nil, err
}
f, err := ioutil.TempFile(tmpdir, "*")
if err != nil {
return nil, nil, err
}
return f, func() error {
return os.Remove(filepath.Join(tmpdir, f.Name()))
}, nil
func() (sfs.WriteCache, func() error, error) {
return cache.NewCacheWrite(
filepath.Join(viper.GetString(cacheDirFlag), "write"),
viper.GetString(cacheWriteFlag),
)
},
logger.PrintHeader,
)
fs, err := cache.Cache(
fs, err := cache.NewCacheFilesystem(
stfs,
root,
viper.GetString(cacheFlag),
viper.GetString(cacheFileSystemFlag),
viper.GetDuration(cacheDurationFlag),
filepath.Join(viper.GetString(cacheDirFlag), "cache"),
filepath.Join(viper.GetString(cacheDirFlag), "filesystem"),
)
if err != nil {
return err
@@ -256,7 +250,8 @@ func init() {
serveFTPCmd.PersistentFlags().StringP(compressionLevelFlag, "l", config.CompressionLevelBalanced, fmt.Sprintf("Compression level to use (default %v, available are %v)", config.CompressionLevelBalanced, config.KnownCompressionLevels))
serveFTPCmd.PersistentFlags().StringP(laddrFlag, "a", "localhost:1337", "Listen address")
serveFTPCmd.PersistentFlags().StringP(cacheFlag, "n", config.NoneKey, fmt.Sprintf("Cache to use (default %v, available are %v)", config.NoneKey, cache.KnownCacheTypes))
serveFTPCmd.PersistentFlags().StringP(cacheFileSystemFlag, "n", config.NoneKey, fmt.Sprintf("File system cache to use (default %v, available are %v)", config.NoneKey, cache.KnownFileSystemCacheTypes))
serveFTPCmd.PersistentFlags().StringP(cacheWriteFlag, "q", cache.WriteCacheTypeFile, fmt.Sprintf("Write cache to use (default %v, available are %v)", cache.WriteCacheTypeFile, cache.KnownWriteCacheTypes))
serveFTPCmd.PersistentFlags().DurationP(cacheDurationFlag, "u", time.Hour, "Duration until cache is invalidated")
serveFTPCmd.PersistentFlags().StringP(cacheDirFlag, "w", cacheDir, "Directory to use if dir cache is enabled")

View File

@@ -3,10 +3,8 @@ package cmd
import (
"context"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"time"
@@ -25,10 +23,10 @@ import (
)
const (
laddrFlag = "laddr"
cacheFlag = "cache"
cacheDirFlag = "cache-dir"
cacheDurationFlag = "cache-duration"
laddrFlag = "laddr"
cacheFileSystemFlag = "cache-filesystem-type"
cacheDirFlag = "cache-dir"
cacheDurationFlag = "cache-duration"
)
var serveHTTPCmd = &cobra.Command{
@@ -40,7 +38,7 @@ var serveHTTPCmd = &cobra.Command{
return err
}
if err := cache.CheckCacheType(viper.GetString(cacheFlag)); err != nil {
if err := cache.CheckFileSystemCacheType(viper.GetString(cacheFileSystemFlag)); err != nil {
return err
}
@@ -127,33 +125,18 @@ var serveHTTPCmd = &cobra.Command{
Metadata: metadataPersister,
},
"", // We never write
func() (afero.File, func() error, error) {
tmpdir := filepath.Join(viper.GetString(cacheDirFlag), "io")
if err := os.MkdirAll(tmpdir, os.ModePerm); err != nil {
return nil, nil, err
}
f, err := ioutil.TempFile(tmpdir, "*")
if err != nil {
return nil, nil, err
}
return f, func() error {
return os.Remove(filepath.Join(tmpdir, f.Name()))
}, nil
},
"", // We never write
nil, // We never write
logger.PrintHeader,
)
fs, err := cache.Cache(
fs, err := cache.NewCacheFilesystem(
stfs,
root,
viper.GetString(cacheFlag),
viper.GetString(cacheFileSystemFlag),
viper.GetDuration(cacheDurationFlag),
filepath.Join(viper.GetString(cacheDirFlag), "cache"),
filepath.Join(viper.GetString(cacheDirFlag), "filesystem"),
)
if err != nil {
return err
@@ -178,7 +161,7 @@ func init() {
serveHTTPCmd.PersistentFlags().StringP(passwordFlag, "p", "", "Password for the private key")
serveHTTPCmd.PersistentFlags().StringP(recipientFlag, "r", "", "Path to the public key to verify with")
serveHTTPCmd.PersistentFlags().StringP(laddrFlag, "a", "localhost:1337", "Listen address")
serveHTTPCmd.PersistentFlags().StringP(cacheFlag, "n", config.NoneKey, fmt.Sprintf("Cache to use (default %v, available are %v)", config.NoneKey, cache.KnownCacheTypes))
serveHTTPCmd.PersistentFlags().StringP(cacheFileSystemFlag, "n", config.NoneKey, fmt.Sprintf("File system cache to use (default %v, available are %v)", config.NoneKey, cache.KnownFileSystemCacheTypes))
serveHTTPCmd.PersistentFlags().DurationP(cacheDurationFlag, "u", time.Hour, "Duration until cache is invalidated")
serveHTTPCmd.PersistentFlags().StringP(cacheDirFlag, "w", cacheDir, "Directory to use if dir cache is enabled")

View File

@@ -1,16 +1,32 @@
package cache
func CheckCacheType(cacheType string) error {
func CheckFileSystemCacheType(cacheType string) error {
cacheTypeIsKnown := false
for _, candidate := range KnownCacheTypes {
for _, candidate := range KnownFileSystemCacheTypes {
if cacheType == candidate {
cacheTypeIsKnown = true
}
}
if !cacheTypeIsKnown {
return ErrCacheTypeUnknown
return ErrFileSystemCacheTypeUnknown
}
return nil
}
func CheckWriteCacheType(cacheType string) error {
cacheTypeIsKnown := false
for _, candidate := range KnownWriteCacheTypes {
if cacheType == candidate {
cacheTypeIsKnown = true
}
}
if !cacheTypeIsKnown {
return ErrWriteCacheTypeUnknown
}
return nil

View File

@@ -5,10 +5,15 @@ import (
)
const (
CacheTypeMemory = "memory"
CacheTypeDir = "dir"
FileSystemCacheTypeMemory = "memory"
FileSystemCacheTypeDir = "dir"
WriteCacheTypeMemory = "memory"
WriteCacheTypeFile = "file"
)
var (
KnownCacheTypes = []string{config.NoneKey, CacheTypeMemory, CacheTypeDir}
KnownFileSystemCacheTypes = []string{config.NoneKey, FileSystemCacheTypeMemory, FileSystemCacheTypeDir}
KnownWriteCacheTypes = []string{WriteCacheTypeMemory, WriteCacheTypeFile}
)

View File

@@ -3,6 +3,9 @@ package cache
import "errors"
var (
ErrCacheTypeUnsupported = errors.New("cache type unsupported")
ErrCacheTypeUnknown = errors.New("cache type unknown")
ErrFileSystemCacheTypeUnsupported = errors.New("file system cache type unsupported")
ErrFileSystemCacheTypeUnknown = errors.New("file system cache type unknown")
ErrWriteCacheTypeUnsupported = errors.New("write cache type unsupported")
ErrWriteCacheTypeUnknown = errors.New("write cache type unknown")
)

View File

@@ -9,7 +9,7 @@ import (
"github.com/spf13/afero"
)
func Cache(
func NewCacheFilesystem(
base afero.Fs,
root string,
cacheType string,
@@ -17,13 +17,13 @@ func Cache(
cacheDir string,
) (afero.Fs, error) {
switch cacheType {
case CacheTypeMemory:
case FileSystemCacheTypeMemory:
if pathext.IsRoot(root) {
return afero.NewCacheOnReadFs(base, afero.NewMemMapFs(), ttl), nil
}
return afero.NewCacheOnReadFs(afero.NewBasePathFs(base, root), afero.NewMemMapFs(), ttl), nil
case CacheTypeDir:
case FileSystemCacheTypeDir:
if err := os.MkdirAll(cacheDir, os.ModePerm); err != nil {
return nil, err
}
@@ -40,6 +40,6 @@ func Cache(
return afero.NewBasePathFs(base, root), nil
default:
return nil, ErrCacheTypeUnsupported
return nil, ErrFileSystemCacheTypeUnsupported
}
}

76
internal/cache/write.go vendored Normal file
View File

@@ -0,0 +1,76 @@
package cache
import (
"io/ioutil"
"os"
"path/filepath"
"github.com/mattetti/filebuffer"
"github.com/pojntfx/stfs/internal/fs"
"github.com/spf13/afero"
)
type fileWithSize struct {
afero.File
}
func (f fileWithSize) Size() (int64, error) {
info, err := f.Stat()
if err != nil {
return -1, err
}
return info.Size(), nil
}
type filebufferWithSize struct {
*filebuffer.Buffer
}
func (f filebufferWithSize) Size() (int64, error) {
return int64(f.Buff.Len()), nil
}
func (f filebufferWithSize) Sync() error {
// No need to sync a in-memory buffer
return nil
}
func (f filebufferWithSize) Truncate(size int64) error {
f.Buff.Truncate(int(size))
return nil
}
func NewCacheWrite(
root string,
cacheType string,
) (cache fs.WriteCache, cleanup func() error, err error) {
switch cacheType {
case WriteCacheTypeMemory:
buff := &filebufferWithSize{filebuffer.New([]byte{})}
return buff, func() error {
buff = nil
return nil
}, nil
case WriteCacheTypeFile:
tmpdir := filepath.Join(root, "io")
if err := os.MkdirAll(tmpdir, os.ModePerm); err != nil {
return nil, nil, err
}
f, err := ioutil.TempFile(tmpdir, "*")
if err != nil {
return nil, nil, err
}
return fileWithSize{f}, func() error {
return os.Remove(filepath.Join(tmpdir, f.Name()))
}, nil
default:
return nil, nil, ErrWriteCacheTypeUnsupported
}
}

View File

@@ -21,6 +21,17 @@ var (
ErrIsDirectory = errors.New("is a directory")
)
type WriteCache interface {
io.Closer
io.Reader
io.Seeker
io.Writer
Truncate(size int64) error
Size() (int64, error)
Sync() error
}
type File struct {
afero.File
@@ -33,7 +44,7 @@ type File struct {
link string
compressionLevel string
getFileBuffer func() (afero.File, func() error, error)
getFileBuffer func() (WriteCache, func() error, error)
name string
info os.FileInfo
@@ -44,7 +55,8 @@ type File struct {
readOpWriter io.WriteCloser
// TODO: Find a non-in-memory method to do this
writeBuf afero.File
writeBuf WriteCache
asdf afero.File
cleanWriteBuf func() error
onHeader func(hdr *models.Header)
@@ -60,7 +72,7 @@ func NewFile(
link string,
compressionLevel string,
getFileBuffer func() (afero.File, func() error, error),
getFileBuffer func() (WriteCache, func() error, error),
name string,
info os.FileInfo,
@@ -154,14 +166,14 @@ func (f *File) syncWithoutLocking() error {
}
done = true
stat, err := f.writeBuf.Stat()
size, err := f.writeBuf.Size()
if err != nil {
return config.FileConfig{}, err
}
f.info = &FileInfo{
name: f.info.Name(),
size: stat.Size(),
size: size,
mode: f.info.Mode(),
modTime: f.info.ModTime(),
isDir: f.info.IsDir(),

View File

@@ -31,7 +31,7 @@ type FileSystem struct {
metadata config.MetadataConfig
compressionLevel string
getFileBuffer func() (afero.File, func() error, error)
getFileBuffer func() (WriteCache, func() error, error)
onHeader func(hdr *models.Header)
}
@@ -43,7 +43,7 @@ func NewFileSystem(
metadata config.MetadataConfig,
compressionLevel string,
getFileBuffer func() (afero.File, func() error, error),
getFileBuffer func() (WriteCache, func() error, error),
onHeader func(hdr *models.Header),
) afero.Fs {