feat: Implement disk-based IO cache

This commit is contained in:
Felicitas Pojtinger
2021-12-23 22:12:29 +01:00
parent 70aad4db5d
commit 31459e0efd
4 changed files with 93 additions and 17 deletions

View File

@@ -3,6 +3,7 @@ package cmd
import ( import (
"context" "context"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
@@ -18,6 +19,7 @@ import (
"github.com/pojntfx/stfs/pkg/config" "github.com/pojntfx/stfs/pkg/config"
"github.com/pojntfx/stfs/pkg/operations" "github.com/pojntfx/stfs/pkg/operations"
"github.com/pojntfx/stfs/pkg/tape" "github.com/pojntfx/stfs/pkg/tape"
"github.com/spf13/afero"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@@ -32,6 +34,10 @@ const (
signatureRecipientFlag = "signature-recipient" signatureRecipientFlag = "signature-recipient"
) )
var (
cacheDir = filepath.Join(os.TempDir(), "stfs")
)
var serveFTPCmd = &cobra.Command{ var serveFTPCmd = &cobra.Command{
Use: "ftp", Use: "ftp",
Aliases: []string{"f"}, Aliases: []string{"f"},
@@ -187,6 +193,22 @@ var serveFTPCmd = &cobra.Command{
}, },
viper.GetString(compressionLevelFlag), viper.GetString(compressionLevelFlag),
func() (afero.File, func() error, error) {
tmpdir := filepath.Join(viper.GetString(cacheDirFlag), "io")
if err := os.MkdirAll(tmpdir, os.ModePerm); err != nil {
return nil, nil, err
}
f, err := ioutil.TempFile(tmpdir, "*")
if err != nil {
return nil, nil, err
}
return f, func() error {
return os.Remove(filepath.Join(tmpdir, f.Name()))
}, nil
},
logger.PrintHeader, logger.PrintHeader,
) )
@@ -196,7 +218,7 @@ var serveFTPCmd = &cobra.Command{
root, root,
viper.GetString(cacheFlag), viper.GetString(cacheFlag),
viper.GetDuration(cacheDurationFlag), viper.GetDuration(cacheDurationFlag),
viper.GetString(cacheDirFlag), filepath.Join(viper.GetString(cacheDirFlag), "cache"),
) )
if err != nil { if err != nil {
return err return err
@@ -236,7 +258,7 @@ func init() {
serveFTPCmd.PersistentFlags().StringP(laddrFlag, "a", "localhost:1337", "Listen address") serveFTPCmd.PersistentFlags().StringP(laddrFlag, "a", "localhost:1337", "Listen address")
serveFTPCmd.PersistentFlags().StringP(cacheFlag, "n", config.NoneKey, fmt.Sprintf("Cache to use (default %v, available are %v)", config.NoneKey, cache.KnownCacheTypes)) serveFTPCmd.PersistentFlags().StringP(cacheFlag, "n", config.NoneKey, fmt.Sprintf("Cache to use (default %v, available are %v)", config.NoneKey, cache.KnownCacheTypes))
serveFTPCmd.PersistentFlags().DurationP(cacheDurationFlag, "u", time.Hour, "Duration until cache is invalidated") serveFTPCmd.PersistentFlags().DurationP(cacheDurationFlag, "u", time.Hour, "Duration until cache is invalidated")
serveFTPCmd.PersistentFlags().StringP(cacheDirFlag, "w", filepath.Join(os.TempDir(), "stfs", "cache"), "Directory to use if dir cache is enabled") serveFTPCmd.PersistentFlags().StringP(cacheDirFlag, "w", cacheDir, "Directory to use if dir cache is enabled")
viper.AutomaticEnv() viper.AutomaticEnv()

View File

@@ -3,6 +3,7 @@ package cmd
import ( import (
"context" "context"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"net/http" "net/http"
"os" "os"
@@ -127,6 +128,22 @@ var serveHTTPCmd = &cobra.Command{
}, },
"", // We never write "", // We never write
func() (afero.File, func() error, error) {
tmpdir := filepath.Join(viper.GetString(cacheDirFlag), "io")
if err := os.MkdirAll(tmpdir, os.ModePerm); err != nil {
return nil, nil, err
}
f, err := ioutil.TempFile(tmpdir, "*")
if err != nil {
return nil, nil, err
}
return f, func() error {
return os.Remove(filepath.Join(tmpdir, f.Name()))
}, nil
},
logger.PrintHeader, logger.PrintHeader,
) )
@@ -136,7 +153,7 @@ var serveHTTPCmd = &cobra.Command{
root, root,
viper.GetString(cacheFlag), viper.GetString(cacheFlag),
viper.GetDuration(cacheDurationFlag), viper.GetDuration(cacheDurationFlag),
viper.GetString(cacheDirFlag), filepath.Join(viper.GetString(cacheDirFlag), "cache"),
) )
if err != nil { if err != nil {
return err return err
@@ -163,7 +180,7 @@ func init() {
serveHTTPCmd.PersistentFlags().StringP(laddrFlag, "a", "localhost:1337", "Listen address") serveHTTPCmd.PersistentFlags().StringP(laddrFlag, "a", "localhost:1337", "Listen address")
serveHTTPCmd.PersistentFlags().StringP(cacheFlag, "n", config.NoneKey, fmt.Sprintf("Cache to use (default %v, available are %v)", config.NoneKey, cache.KnownCacheTypes)) serveHTTPCmd.PersistentFlags().StringP(cacheFlag, "n", config.NoneKey, fmt.Sprintf("Cache to use (default %v, available are %v)", config.NoneKey, cache.KnownCacheTypes))
serveHTTPCmd.PersistentFlags().DurationP(cacheDurationFlag, "u", time.Hour, "Duration until cache is invalidated") serveHTTPCmd.PersistentFlags().DurationP(cacheDurationFlag, "u", time.Hour, "Duration until cache is invalidated")
serveHTTPCmd.PersistentFlags().StringP(cacheDirFlag, "w", filepath.Join(os.TempDir(), "stfs", "cache"), "Directory to use if dir cache is enabled") serveHTTPCmd.PersistentFlags().StringP(cacheDirFlag, "w", cacheDir, "Directory to use if dir cache is enabled")
viper.AutomaticEnv() viper.AutomaticEnv()

View File

@@ -9,7 +9,6 @@ import (
"os" "os"
"sync" "sync"
"github.com/mattetti/filebuffer"
models "github.com/pojntfx/stfs/internal/db/sqlite/models/metadata" models "github.com/pojntfx/stfs/internal/db/sqlite/models/metadata"
"github.com/pojntfx/stfs/internal/ioext" "github.com/pojntfx/stfs/internal/ioext"
"github.com/pojntfx/stfs/pkg/config" "github.com/pojntfx/stfs/pkg/config"
@@ -32,7 +31,9 @@ type File struct {
path string path string
link string link string
compressionLevel string compressionLevel string
getFileBuffer func() (afero.File, func() error, error)
name string name string
info os.FileInfo info os.FileInfo
@@ -43,7 +44,8 @@ type File struct {
readOpWriter io.WriteCloser readOpWriter io.WriteCloser
// TODO: Find a non-in-memory method to do this // TODO: Find a non-in-memory method to do this
writeBuf *filebuffer.Buffer writeBuf afero.File
cleanWriteBuf func() error
onHeader func(hdr *models.Header) onHeader func(hdr *models.Header)
} }
@@ -56,7 +58,9 @@ func NewFile(
path string, path string,
link string, link string,
compressionLevel string, compressionLevel string,
getFileBuffer func() (afero.File, func() error, error),
name string, name string,
info os.FileInfo, info os.FileInfo,
@@ -71,7 +75,9 @@ func NewFile(
path: path, path: path,
link: link, link: link,
compressionLevel: compressionLevel, compressionLevel: compressionLevel,
getFileBuffer: getFileBuffer,
name: name, name: name,
info: info, info: info,
@@ -148,9 +154,14 @@ func (f *File) syncWithoutLocking() error {
} }
done = true done = true
stat, err := f.writeBuf.Stat()
if err != nil {
return config.FileConfig{}, err
}
f.info = &FileInfo{ f.info = &FileInfo{
name: f.info.Name(), name: f.info.Name(),
size: int64(f.writeBuf.Buff.Len()), size: stat.Size(),
mode: f.info.Mode(), mode: f.info.Mode(),
modTime: f.info.ModTime(), modTime: f.info.ModTime(),
isDir: f.info.IsDir(), isDir: f.info.IsDir(),
@@ -197,12 +208,16 @@ func (f *File) Truncate(size int64) error {
defer f.ioLock.Unlock() defer f.ioLock.Unlock()
if f.writeBuf == nil { if f.writeBuf == nil {
f.writeBuf = filebuffer.New([]byte{}) writeBuf, cleanWriteBuf, err := f.getFileBuffer()
if err != nil {
return err
} }
f.writeBuf.Buff.Truncate(f.writeBuf.Buff.Len()) f.writeBuf = writeBuf
f.cleanWriteBuf = cleanWriteBuf
}
return nil return f.writeBuf.Truncate(0)
} }
func (f *File) WriteString(s string) (ret int, err error) { func (f *File) WriteString(s string) (ret int, err error) {
@@ -235,6 +250,10 @@ func (f *File) closeWithoutLocking() error {
if err := f.syncWithoutLocking(); err != nil { if err := f.syncWithoutLocking(); err != nil {
return err return err
} }
// if err := f.writeBuf.Del os.Remove(f.writeBuf.Name()); err != nil {
// return err
// }
} }
f.readOpReader = nil f.readOpReader = nil
@@ -409,10 +428,23 @@ func (f *File) Write(p []byte) (n int, err error) {
defer f.ioLock.Unlock() defer f.ioLock.Unlock()
if f.writeBuf == nil { if f.writeBuf == nil {
f.writeBuf = filebuffer.New([]byte{}) writeBuf, cleanWriteBuf, err := f.getFileBuffer()
if err != nil {
return -1, err
} }
return f.writeBuf.Write(p) f.writeBuf = writeBuf
f.cleanWriteBuf = cleanWriteBuf
}
n, err = f.writeBuf.Write(p)
if err != nil {
log.Fatal(err)
return -1, err
}
return n, f.writeBuf.Sync()
} }
func (f *File) WriteAt(p []byte, off int64) (n int, err error) { func (f *File) WriteAt(p []byte, off int64) (n int, err error) {

View File

@@ -31,6 +31,7 @@ type FileSystem struct {
metadata config.MetadataConfig metadata config.MetadataConfig
compressionLevel string compressionLevel string
getFileBuffer func() (afero.File, func() error, error)
onHeader func(hdr *models.Header) onHeader func(hdr *models.Header)
} }
@@ -42,6 +43,7 @@ func NewFileSystem(
metadata config.MetadataConfig, metadata config.MetadataConfig,
compressionLevel string, compressionLevel string,
getFileBuffer func() (afero.File, func() error, error),
onHeader func(hdr *models.Header), onHeader func(hdr *models.Header),
) afero.Fs { ) afero.Fs {
@@ -52,6 +54,7 @@ func NewFileSystem(
metadata: metadata, metadata: metadata,
compressionLevel: compressionLevel, compressionLevel: compressionLevel,
getFileBuffer: getFileBuffer,
onHeader: onHeader, onHeader: onHeader,
} }
@@ -194,7 +197,9 @@ func (f *FileSystem) Open(name string) (afero.File, error) {
hdr.Name, hdr.Name,
hdr.Linkname, hdr.Linkname,
f.compressionLevel, f.compressionLevel,
f.getFileBuffer,
path.Base(hdr.Name), path.Base(hdr.Name),
NewFileInfo(hdr), NewFileInfo(hdr),