diff --git a/cmd/stfs/cmd/operation_update.go b/cmd/stfs/cmd/operation_update.go index 6f644dd..5aba9df 100644 --- a/cmd/stfs/cmd/operation_update.go +++ b/cmd/stfs/cmd/operation_update.go @@ -146,6 +146,7 @@ var operationUpdateCmd = &cobra.Command{ }, viper.GetString(compressionLevelFlag), viper.GetBool(overwriteFlag), + false, ); err != nil { return err } diff --git a/cmd/stfs/cmd/serve_ftp.go b/cmd/stfs/cmd/serve_ftp.go index ac90b3a..5a88265 100644 --- a/cmd/stfs/cmd/serve_ftp.go +++ b/cmd/stfs/cmd/serve_ftp.go @@ -85,12 +85,12 @@ var serveFTPCmd = &cobra.Command{ return err } - signatureIdentity, err := keys.ParseIdentity(viper.GetString(signatureFlag), signaturePrivkey, viper.GetString(signaturePasswordFlag)) + signatureIdentity, err := keys.ParseSignerIdentity(viper.GetString(signatureFlag), signaturePrivkey, viper.GetString(signaturePasswordFlag)) if err != nil { return err } - encryptionRecipient, err := keys.ParseSignerRecipient(viper.GetString(encryptionFlag), encryptionPubkey) + encryptionRecipient, err := keys.ParseRecipient(viper.GetString(encryptionFlag), encryptionPubkey) if err != nil { return err } diff --git a/go.mod b/go.mod index 51287b6..1f02593 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/friendsofgo/errors v0.9.2 github.com/klauspost/compress v1.13.6 github.com/klauspost/pgzip v1.2.5 + github.com/mattetti/filebuffer v1.0.1 github.com/mattn/go-sqlite3 v1.14.9 github.com/pierrec/lz4/v4 v4.1.12 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index a5d629a..5757816 100644 --- a/go.sum +++ b/go.sum @@ -394,6 +394,8 @@ github.com/markbates/oncer v1.0.0 h1:E83IaVAHygyndzPimgUYJjbshhDTALZyXxvk9FOlQRY github.com/markbates/oncer v1.0.0/go.mod h1:Z59JA581E9GP6w96jai+TGqafHPW+cPfRxz2aSZ0mcI= github.com/markbates/safe v1.0.1 h1:yjZkbvRM6IzKj9tlu/zMJLS0n/V351OZWRnF3QfaUxI= github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0= +github.com/mattetti/filebuffer v1.0.1 h1:gG7pyfnSIZCxdoKq+cPa8T0hhYtD9NxCdI4D7PTjRLM= +github.com/mattetti/filebuffer v1.0.1/go.mod h1:YdMURNDOttIiruleeVr6f56OrMc+MydEnTcXwtkxNVs= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= diff --git a/internal/compression/compress.go b/internal/compression/compress.go index 2c77f9a..62597ed 100644 --- a/internal/compression/compress.go +++ b/internal/compression/compress.go @@ -21,7 +21,7 @@ func Compress( compressionLevel string, isRegular bool, recordSize int, -) (ioext.Flusher, error) { +) (ioext.FlusherWriter, error) { switch compressionFormat { case config.CompressionFormatGZipKey: fallthrough @@ -104,7 +104,7 @@ func Compress( return nil, err } - return ioext.AddFlush(lz), nil + return ioext.AddFlushNop(lz), nil case config.CompressionFormatZStandardKey: l := zstd.SpeedDefault switch compressionLevel { @@ -171,9 +171,9 @@ func Compress( return nil, err } - return ioext.AddFlush(bz), nil + return ioext.AddFlushNop(bz), nil case config.NoneKey: - return ioext.AddFlush(ioext.AddClose(dst)), nil + return ioext.AddFlushNop(ioext.AddCloseNopToWriter(dst)), nil default: return nil, config.ErrCompressionFormatUnsupported } diff --git a/internal/encryption/encrypt.go b/internal/encryption/encrypt.go index 73432d7..dca3a09 100644 --- a/internal/encryption/encrypt.go +++ b/internal/encryption/encrypt.go @@ -35,7 +35,7 @@ func Encrypt( return openpgp.Encrypt(dst, recipient, nil, nil, nil) case config.NoneKey: - return ioext.AddClose(dst), nil + return ioext.AddCloseNopToWriter(dst), nil default: return nil, config.ErrEncryptionFormatUnsupported } diff --git a/internal/fs/file.go b/internal/fs/file.go index b536e33..b6e1797 100644 --- a/internal/fs/file.go +++ b/internal/fs/file.go @@ -9,6 +9,7 @@ import ( "os" "sync" + "github.com/mattetti/filebuffer" models "github.com/pojntfx/stfs/internal/db/sqlite/models/metadata" "github.com/pojntfx/stfs/internal/ioext" "github.com/pojntfx/stfs/pkg/config" @@ -24,28 +25,38 @@ var ( type File struct { afero.File - ops *operations.Operations + readOps *operations.Operations + writeOps *operations.Operations metadata config.MetadataConfig - path string + path string + link string + compressionLevel string name string info os.FileInfo ioLock sync.Mutex - reader *ioext.CounterReadCloser - writer io.WriteCloser + + readOpReader *ioext.CounterReadCloser + readOpWriter io.WriteCloser + + // TODO: Find a non-in-memory method to do this + writeBuf *filebuffer.Buffer onHeader func(hdr *models.Header) } func NewFile( - ops *operations.Operations, + readOps *operations.Operations, + writeOps *operations.Operations, metadata config.MetadataConfig, path string, + link string, + compressionLevel string, name string, info os.FileInfo, @@ -53,11 +64,14 @@ func NewFile( onHeader func(hdr *models.Header), ) *File { return &File{ - ops: ops, + readOps: readOps, + writeOps: writeOps, metadata: metadata, - path: path, + path: path, + link: link, + compressionLevel: compressionLevel, name: name, info: info, @@ -117,14 +131,59 @@ func (f *File) Readdirnames(n int) ([]string, error) { return names, err } -func (f *File) Sync() error { - log.Println("File.Sync", f.name) +func (f *File) syncWithoutLocking() error { + log.Println("File.syncWithoutLocking", f.name) if f.info.IsDir() { return ErrIsDirectory } - return ErrNotImplemented + if f.writeBuf != nil { + done := false + if _, err := f.writeOps.Update( + func() (config.FileConfig, error) { + // Exit after the first write + if done { + return config.FileConfig{}, io.EOF + } + done = true + + f.info = &FileInfo{ + name: f.info.Name(), + size: int64(f.writeBuf.Buff.Len()), + mode: f.info.Mode(), + modTime: f.info.ModTime(), + isDir: f.info.IsDir(), + } + + return config.FileConfig{ + GetFile: func() (io.ReadSeekCloser, error) { + if _, err := f.writeBuf.Seek(0, io.SeekStart); err != nil { + return nil, err + } + + return f.writeBuf, nil + }, + Info: f.info, + Path: f.path, + Link: f.link, + }, nil + }, + f.compressionLevel, + true, + true, + ); err != nil { + return err + } + } + + return nil +} + +func (f *File) Sync() error { + log.Println("File.Sync", f.name) + + return f.syncWithoutLocking() } func (f *File) Truncate(size int64) error { @@ -150,20 +209,28 @@ func (f *File) WriteString(s string) (ret int, err error) { func (f *File) closeWithoutLocking() error { log.Println("File.closeWithoutLocking", f.name) - if f.reader != nil { - if err := f.reader.Close(); err != nil { + if f.readOpReader != nil { + if err := f.readOpReader.Close(); err != nil { return err } } - if f.writer != nil { - if err := f.writer.Close(); err != nil { + if f.readOpWriter != nil { + if err := f.readOpWriter.Close(); err != nil { return err } } - f.reader = nil - f.writer = nil + if f.writeBuf != nil { + // No need to close write buffer, the `update` operation closes it itself + if err := f.syncWithoutLocking(); err != nil { + return err + } + } + + f.readOpReader = nil + f.readOpWriter = nil + f.writeBuf = nil return nil } @@ -187,7 +254,7 @@ func (f *File) Read(p []byte) (n int, err error) { f.ioLock.Lock() defer f.ioLock.Unlock() - if f.reader == nil || f.writer == nil { + if f.readOpReader == nil || f.readOpWriter == nil { r, writer := io.Pipe() reader := &ioext.CounterReadCloser{ Reader: r, @@ -195,7 +262,7 @@ func (f *File) Read(p []byte) (n int, err error) { } go func() { - if err := f.ops.Restore( + if err := f.readOps.Restore( func(path string, mode fs.FileMode) (io.WriteCloser, error) { return writer, nil }, @@ -217,12 +284,12 @@ func (f *File) Read(p []byte) (n int, err error) { } }() - f.reader = reader - f.writer = writer + f.readOpReader = reader + f.readOpWriter = writer } w := &bytes.Buffer{} - _, err = io.CopyN(w, f.reader, int64(len(p))) + _, err = io.CopyN(w, f.readOpReader, int64(len(p))) if err == io.EOF { return copy(p, w.Bytes()), io.EOF } @@ -264,8 +331,8 @@ func (f *File) Seek(offset int64, whence int) (int64, error) { dst = offset case io.SeekCurrent: curr := 0 - if f.reader != nil { - curr = f.reader.BytesRead + if f.readOpReader != nil { + curr = f.readOpReader.BytesRead } dst = int64(curr) + offset case io.SeekEnd: @@ -274,7 +341,7 @@ func (f *File) Seek(offset int64, whence int) (int64, error) { return -1, ErrNotImplemented } - if f.reader == nil || f.writer == nil || dst < int64(f.reader.BytesRead) { // We have to re-open as we can't seek backwards + if f.readOpReader == nil || f.readOpWriter == nil || dst < int64(f.readOpReader.BytesRead) { // We have to re-open as we can't seek backwards _ = f.closeWithoutLocking() // Ignore errors here as it might not be opened r, writer := io.Pipe() @@ -284,7 +351,7 @@ func (f *File) Seek(offset int64, whence int) (int64, error) { } go func() { - if err := f.ops.Restore( + if err := f.readOps.Restore( func(path string, mode fs.FileMode) (io.WriteCloser, error) { return writer, nil }, @@ -306,11 +373,11 @@ func (f *File) Seek(offset int64, whence int) (int64, error) { } }() - f.reader = reader - f.writer = writer + f.readOpReader = reader + f.readOpWriter = writer } - written, err := io.CopyN(io.Discard, f.reader, dst-int64(f.reader.BytesRead)) + written, err := io.CopyN(io.Discard, f.readOpReader, dst-int64(f.readOpReader.BytesRead)) if err == io.EOF { return written, io.EOF } @@ -323,13 +390,20 @@ func (f *File) Seek(offset int64, whence int) (int64, error) { } func (f *File) Write(p []byte) (n int, err error) { - log.Println("File.Write", f.name, p) + log.Println("File.Write", f.name, len(p)) if f.info.IsDir() { return -1, ErrIsDirectory } - return -1, ErrNotImplemented + f.ioLock.Lock() + defer f.ioLock.Unlock() + + if f.writeBuf == nil { + f.writeBuf = filebuffer.New([]byte{}) + } + + return f.writeBuf.Write(p) } func (f *File) WriteAt(p []byte, off int64) (n int, err error) { diff --git a/internal/fs/filesystem.go b/internal/fs/filesystem.go index b92901c..d6b8374 100644 --- a/internal/fs/filesystem.go +++ b/internal/fs/filesystem.go @@ -188,10 +188,13 @@ func (f *FileSystem) Open(name string) (afero.File, error) { return NewFile( f.readOps, + f.writeOps, f.metadata, hdr.Name, + hdr.Linkname, + f.compressionLevel, path.Base(hdr.Name), NewFileInfo(hdr), @@ -274,6 +277,7 @@ func (f *FileSystem) updateMetadata(hdr *tar.Header) error { }, f.compressionLevel, false, + false, ); err != nil { return err } diff --git a/internal/ioext/counter.go b/internal/ioext/counter.go index 1dea264..340c16c 100644 --- a/internal/ioext/counter.go +++ b/internal/ioext/counter.go @@ -34,6 +34,28 @@ func (r *CounterReadCloser) Close() error { return r.Reader.Close() } +type CounterReadSeekCloser struct { + Reader io.ReadSeekCloser + + BytesRead int +} + +func (r *CounterReadSeekCloser) Read(p []byte) (n int, err error) { + n, err = r.Reader.Read(p) + + r.BytesRead += n + + return n, err +} + +func (r *CounterReadSeekCloser) Close() error { + return r.Reader.Close() +} + +func (r *CounterReadSeekCloser) Seek(offset int64, whence int) (int64, error) { + return r.Reader.Seek(offset, whence) +} + type CounterWriter struct { Writer io.Writer diff --git a/internal/ioext/flusher.go b/internal/ioext/flusher.go index 9bade59..bfde028 100644 --- a/internal/ioext/flusher.go +++ b/internal/ioext/flusher.go @@ -2,18 +2,18 @@ package ioext import "io" -type Flusher interface { +type FlusherWriter interface { io.WriteCloser Flush() error } -type NopFlusher struct { +type NopFlusherWriter struct { io.WriteCloser } -func (NopFlusher) Flush() error { return nil } +func (NopFlusherWriter) Flush() error { return nil } -func AddFlush(w io.WriteCloser) NopFlusher { - return NopFlusher{w} +func AddFlushNop(w io.WriteCloser) NopFlusherWriter { + return NopFlusherWriter{w} } diff --git a/internal/ioext/write.go b/internal/ioext/write.go index 506a1c3..5d2d4ba 100644 --- a/internal/ioext/write.go +++ b/internal/ioext/write.go @@ -8,6 +8,6 @@ type NopCloser struct { func (NopCloser) Close() error { return nil } -func AddClose(w io.Writer) NopCloser { +func AddCloseNopToWriter(w io.Writer) NopCloser { return NopCloser{w} } diff --git a/pkg/operations/update.go b/pkg/operations/update.go index 5843786..f961f3f 100644 --- a/pkg/operations/update.go +++ b/pkg/operations/update.go @@ -25,6 +25,7 @@ func (o *Operations) Update( getSrc func() (config.FileConfig, error), compressionLevel string, replace bool, + skipSizeCheck bool, ) ([]*tar.Header, error) { o.diskOperationLock.Lock() defer o.diskOperationLock.Unlock() @@ -75,7 +76,7 @@ func (o *Operations) Update( hdr.PAXRecords[records.STFSRecordAction] = records.STFSRecordActionUpdate var f io.ReadSeekCloser - if file.Info.Mode().IsRegular() && replace && file.Info.Size() > 0 { + if file.Info.Mode().IsRegular() && replace && (file.Info.Size() > 0 || skipSizeCheck) { // Get the compressed size for the header fileSizeCounter := &ioext.CounterWriter{ Writer: io.Discard, @@ -183,7 +184,7 @@ func (o *Operations) Update( dirty = true - if !file.Info.Mode().IsRegular() || file.Info.Size() <= 0 { + if !file.Info.Mode().IsRegular() || (!skipSizeCheck && file.Info.Size() <= 0) { if f != nil { if err := f.Close(); err != nil { return []*tar.Header{}, err