feat: Implement basic overwriting, in-memory File.Write

This commit is contained in:
Felicitas Pojtinger
2021-12-22 22:02:12 +01:00
parent 191896bcda
commit 92669961cc
12 changed files with 150 additions and 45 deletions

View File

@@ -146,6 +146,7 @@ var operationUpdateCmd = &cobra.Command{
},
viper.GetString(compressionLevelFlag),
viper.GetBool(overwriteFlag),
false,
); err != nil {
return err
}

View File

@@ -85,12 +85,12 @@ var serveFTPCmd = &cobra.Command{
return err
}
signatureIdentity, err := keys.ParseIdentity(viper.GetString(signatureFlag), signaturePrivkey, viper.GetString(signaturePasswordFlag))
signatureIdentity, err := keys.ParseSignerIdentity(viper.GetString(signatureFlag), signaturePrivkey, viper.GetString(signaturePasswordFlag))
if err != nil {
return err
}
encryptionRecipient, err := keys.ParseSignerRecipient(viper.GetString(encryptionFlag), encryptionPubkey)
encryptionRecipient, err := keys.ParseRecipient(viper.GetString(encryptionFlag), encryptionPubkey)
if err != nil {
return err
}

1
go.mod
View File

@@ -15,6 +15,7 @@ require (
github.com/friendsofgo/errors v0.9.2
github.com/klauspost/compress v1.13.6
github.com/klauspost/pgzip v1.2.5
github.com/mattetti/filebuffer v1.0.1
github.com/mattn/go-sqlite3 v1.14.9
github.com/pierrec/lz4/v4 v4.1.12
github.com/pkg/errors v0.9.1

2
go.sum
View File

@@ -394,6 +394,8 @@ github.com/markbates/oncer v1.0.0 h1:E83IaVAHygyndzPimgUYJjbshhDTALZyXxvk9FOlQRY
github.com/markbates/oncer v1.0.0/go.mod h1:Z59JA581E9GP6w96jai+TGqafHPW+cPfRxz2aSZ0mcI=
github.com/markbates/safe v1.0.1 h1:yjZkbvRM6IzKj9tlu/zMJLS0n/V351OZWRnF3QfaUxI=
github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0=
github.com/mattetti/filebuffer v1.0.1 h1:gG7pyfnSIZCxdoKq+cPa8T0hhYtD9NxCdI4D7PTjRLM=
github.com/mattetti/filebuffer v1.0.1/go.mod h1:YdMURNDOttIiruleeVr6f56OrMc+MydEnTcXwtkxNVs=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=

View File

@@ -21,7 +21,7 @@ func Compress(
compressionLevel string,
isRegular bool,
recordSize int,
) (ioext.Flusher, error) {
) (ioext.FlusherWriter, error) {
switch compressionFormat {
case config.CompressionFormatGZipKey:
fallthrough
@@ -104,7 +104,7 @@ func Compress(
return nil, err
}
return ioext.AddFlush(lz), nil
return ioext.AddFlushNop(lz), nil
case config.CompressionFormatZStandardKey:
l := zstd.SpeedDefault
switch compressionLevel {
@@ -171,9 +171,9 @@ func Compress(
return nil, err
}
return ioext.AddFlush(bz), nil
return ioext.AddFlushNop(bz), nil
case config.NoneKey:
return ioext.AddFlush(ioext.AddClose(dst)), nil
return ioext.AddFlushNop(ioext.AddCloseNopToWriter(dst)), nil
default:
return nil, config.ErrCompressionFormatUnsupported
}

View File

@@ -35,7 +35,7 @@ func Encrypt(
return openpgp.Encrypt(dst, recipient, nil, nil, nil)
case config.NoneKey:
return ioext.AddClose(dst), nil
return ioext.AddCloseNopToWriter(dst), nil
default:
return nil, config.ErrEncryptionFormatUnsupported
}

View File

@@ -9,6 +9,7 @@ import (
"os"
"sync"
"github.com/mattetti/filebuffer"
models "github.com/pojntfx/stfs/internal/db/sqlite/models/metadata"
"github.com/pojntfx/stfs/internal/ioext"
"github.com/pojntfx/stfs/pkg/config"
@@ -24,28 +25,38 @@ var (
type File struct {
afero.File
ops *operations.Operations
readOps *operations.Operations
writeOps *operations.Operations
metadata config.MetadataConfig
path string
path string
link string
compressionLevel string
name string
info os.FileInfo
ioLock sync.Mutex
reader *ioext.CounterReadCloser
writer io.WriteCloser
readOpReader *ioext.CounterReadCloser
readOpWriter io.WriteCloser
// TODO: Find a non-in-memory method to do this
writeBuf *filebuffer.Buffer
onHeader func(hdr *models.Header)
}
func NewFile(
ops *operations.Operations,
readOps *operations.Operations,
writeOps *operations.Operations,
metadata config.MetadataConfig,
path string,
link string,
compressionLevel string,
name string,
info os.FileInfo,
@@ -53,11 +64,14 @@ func NewFile(
onHeader func(hdr *models.Header),
) *File {
return &File{
ops: ops,
readOps: readOps,
writeOps: writeOps,
metadata: metadata,
path: path,
path: path,
link: link,
compressionLevel: compressionLevel,
name: name,
info: info,
@@ -117,14 +131,59 @@ func (f *File) Readdirnames(n int) ([]string, error) {
return names, err
}
func (f *File) Sync() error {
log.Println("File.Sync", f.name)
func (f *File) syncWithoutLocking() error {
log.Println("File.syncWithoutLocking", f.name)
if f.info.IsDir() {
return ErrIsDirectory
}
return ErrNotImplemented
if f.writeBuf != nil {
done := false
if _, err := f.writeOps.Update(
func() (config.FileConfig, error) {
// Exit after the first write
if done {
return config.FileConfig{}, io.EOF
}
done = true
f.info = &FileInfo{
name: f.info.Name(),
size: int64(f.writeBuf.Buff.Len()),
mode: f.info.Mode(),
modTime: f.info.ModTime(),
isDir: f.info.IsDir(),
}
return config.FileConfig{
GetFile: func() (io.ReadSeekCloser, error) {
if _, err := f.writeBuf.Seek(0, io.SeekStart); err != nil {
return nil, err
}
return f.writeBuf, nil
},
Info: f.info,
Path: f.path,
Link: f.link,
}, nil
},
f.compressionLevel,
true,
true,
); err != nil {
return err
}
}
return nil
}
func (f *File) Sync() error {
log.Println("File.Sync", f.name)
return f.syncWithoutLocking()
}
func (f *File) Truncate(size int64) error {
@@ -150,20 +209,28 @@ func (f *File) WriteString(s string) (ret int, err error) {
func (f *File) closeWithoutLocking() error {
log.Println("File.closeWithoutLocking", f.name)
if f.reader != nil {
if err := f.reader.Close(); err != nil {
if f.readOpReader != nil {
if err := f.readOpReader.Close(); err != nil {
return err
}
}
if f.writer != nil {
if err := f.writer.Close(); err != nil {
if f.readOpWriter != nil {
if err := f.readOpWriter.Close(); err != nil {
return err
}
}
f.reader = nil
f.writer = nil
if f.writeBuf != nil {
// No need to close write buffer, the `update` operation closes it itself
if err := f.syncWithoutLocking(); err != nil {
return err
}
}
f.readOpReader = nil
f.readOpWriter = nil
f.writeBuf = nil
return nil
}
@@ -187,7 +254,7 @@ func (f *File) Read(p []byte) (n int, err error) {
f.ioLock.Lock()
defer f.ioLock.Unlock()
if f.reader == nil || f.writer == nil {
if f.readOpReader == nil || f.readOpWriter == nil {
r, writer := io.Pipe()
reader := &ioext.CounterReadCloser{
Reader: r,
@@ -195,7 +262,7 @@ func (f *File) Read(p []byte) (n int, err error) {
}
go func() {
if err := f.ops.Restore(
if err := f.readOps.Restore(
func(path string, mode fs.FileMode) (io.WriteCloser, error) {
return writer, nil
},
@@ -217,12 +284,12 @@ func (f *File) Read(p []byte) (n int, err error) {
}
}()
f.reader = reader
f.writer = writer
f.readOpReader = reader
f.readOpWriter = writer
}
w := &bytes.Buffer{}
_, err = io.CopyN(w, f.reader, int64(len(p)))
_, err = io.CopyN(w, f.readOpReader, int64(len(p)))
if err == io.EOF {
return copy(p, w.Bytes()), io.EOF
}
@@ -264,8 +331,8 @@ func (f *File) Seek(offset int64, whence int) (int64, error) {
dst = offset
case io.SeekCurrent:
curr := 0
if f.reader != nil {
curr = f.reader.BytesRead
if f.readOpReader != nil {
curr = f.readOpReader.BytesRead
}
dst = int64(curr) + offset
case io.SeekEnd:
@@ -274,7 +341,7 @@ func (f *File) Seek(offset int64, whence int) (int64, error) {
return -1, ErrNotImplemented
}
if f.reader == nil || f.writer == nil || dst < int64(f.reader.BytesRead) { // We have to re-open as we can't seek backwards
if f.readOpReader == nil || f.readOpWriter == nil || dst < int64(f.readOpReader.BytesRead) { // We have to re-open as we can't seek backwards
_ = f.closeWithoutLocking() // Ignore errors here as it might not be opened
r, writer := io.Pipe()
@@ -284,7 +351,7 @@ func (f *File) Seek(offset int64, whence int) (int64, error) {
}
go func() {
if err := f.ops.Restore(
if err := f.readOps.Restore(
func(path string, mode fs.FileMode) (io.WriteCloser, error) {
return writer, nil
},
@@ -306,11 +373,11 @@ func (f *File) Seek(offset int64, whence int) (int64, error) {
}
}()
f.reader = reader
f.writer = writer
f.readOpReader = reader
f.readOpWriter = writer
}
written, err := io.CopyN(io.Discard, f.reader, dst-int64(f.reader.BytesRead))
written, err := io.CopyN(io.Discard, f.readOpReader, dst-int64(f.readOpReader.BytesRead))
if err == io.EOF {
return written, io.EOF
}
@@ -323,13 +390,20 @@ func (f *File) Seek(offset int64, whence int) (int64, error) {
}
func (f *File) Write(p []byte) (n int, err error) {
log.Println("File.Write", f.name, p)
log.Println("File.Write", f.name, len(p))
if f.info.IsDir() {
return -1, ErrIsDirectory
}
return -1, ErrNotImplemented
f.ioLock.Lock()
defer f.ioLock.Unlock()
if f.writeBuf == nil {
f.writeBuf = filebuffer.New([]byte{})
}
return f.writeBuf.Write(p)
}
func (f *File) WriteAt(p []byte, off int64) (n int, err error) {

View File

@@ -188,10 +188,13 @@ func (f *FileSystem) Open(name string) (afero.File, error) {
return NewFile(
f.readOps,
f.writeOps,
f.metadata,
hdr.Name,
hdr.Linkname,
f.compressionLevel,
path.Base(hdr.Name),
NewFileInfo(hdr),
@@ -274,6 +277,7 @@ func (f *FileSystem) updateMetadata(hdr *tar.Header) error {
},
f.compressionLevel,
false,
false,
); err != nil {
return err
}

View File

@@ -34,6 +34,28 @@ func (r *CounterReadCloser) Close() error {
return r.Reader.Close()
}
type CounterReadSeekCloser struct {
Reader io.ReadSeekCloser
BytesRead int
}
func (r *CounterReadSeekCloser) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.BytesRead += n
return n, err
}
func (r *CounterReadSeekCloser) Close() error {
return r.Reader.Close()
}
func (r *CounterReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
return r.Reader.Seek(offset, whence)
}
type CounterWriter struct {
Writer io.Writer

View File

@@ -2,18 +2,18 @@ package ioext
import "io"
type Flusher interface {
type FlusherWriter interface {
io.WriteCloser
Flush() error
}
type NopFlusher struct {
type NopFlusherWriter struct {
io.WriteCloser
}
func (NopFlusher) Flush() error { return nil }
func (NopFlusherWriter) Flush() error { return nil }
func AddFlush(w io.WriteCloser) NopFlusher {
return NopFlusher{w}
func AddFlushNop(w io.WriteCloser) NopFlusherWriter {
return NopFlusherWriter{w}
}

View File

@@ -8,6 +8,6 @@ type NopCloser struct {
func (NopCloser) Close() error { return nil }
func AddClose(w io.Writer) NopCloser {
func AddCloseNopToWriter(w io.Writer) NopCloser {
return NopCloser{w}
}

View File

@@ -25,6 +25,7 @@ func (o *Operations) Update(
getSrc func() (config.FileConfig, error),
compressionLevel string,
replace bool,
skipSizeCheck bool,
) ([]*tar.Header, error) {
o.diskOperationLock.Lock()
defer o.diskOperationLock.Unlock()
@@ -75,7 +76,7 @@ func (o *Operations) Update(
hdr.PAXRecords[records.STFSRecordAction] = records.STFSRecordActionUpdate
var f io.ReadSeekCloser
if file.Info.Mode().IsRegular() && replace && file.Info.Size() > 0 {
if file.Info.Mode().IsRegular() && replace && (file.Info.Size() > 0 || skipSizeCheck) {
// Get the compressed size for the header
fileSizeCounter := &ioext.CounterWriter{
Writer: io.Discard,
@@ -183,7 +184,7 @@ func (o *Operations) Update(
dirty = true
if !file.Info.Mode().IsRegular() || file.Info.Size() <= 0 {
if !file.Info.Mode().IsRegular() || (!skipSizeCheck && file.Info.Size() <= 0) {
if f != nil {
if err := f.Close(); err != nil {
return []*tar.Header{}, err