feat: Add io.Reader-based API to update cmd

This commit is contained in:
Felicitas Pojtinger
2021-12-16 22:40:47 +01:00
parent 129cfab84a
commit e11a3bffcc
3 changed files with 106 additions and 69 deletions

View File

@@ -2,6 +2,10 @@ package cmd
import ( import (
"fmt" "fmt"
"io"
"io/fs"
"os"
"path/filepath"
"github.com/pojntfx/stfs/internal/compression" "github.com/pojntfx/stfs/internal/compression"
"github.com/pojntfx/stfs/internal/keys" "github.com/pojntfx/stfs/internal/keys"
@@ -95,8 +99,49 @@ var operationUpdateCmd = &cobra.Command{
logging.NewLogger().PrintHeaderEvent, logging.NewLogger().PrintHeaderEvent,
) )
files := make(chan config.FileConfig)
errs := make(chan error)
go func() {
if err := filepath.Walk(viper.GetString(fromFlag), func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
link := ""
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
if link, err = os.Readlink(path); err != nil {
return err
}
}
files <- config.FileConfig{
GetFile: func() (io.ReadSeekCloser, error) {
return os.Open(path)
},
Info: info,
Path: path,
Link: link,
}
return nil
}); err != nil {
errs <- err
return
}
errs <- io.EOF
}()
if _, err := ops.Update( if _, err := ops.Update(
viper.GetString(fromFlag), func() (config.FileConfig, error) {
select {
case file := <-files:
return file, err
case err := <-errs:
return config.FileConfig{}, err
}
},
viper.GetString(compressionLevelFlag), viper.GetString(compressionLevelFlag),
viper.GetBool(overwriteFlag), viper.GetBool(overwriteFlag),
); err != nil { ); err != nil {

View File

@@ -69,7 +69,7 @@ func (o *Operations) Archive(
if err != nil { if err != nil {
// Skip sockets // Skip sockets
if strings.Contains(err.Error(), errSocketsNotSupported.Error()) { if strings.Contains(err.Error(), errSocketsNotSupported.Error()) {
return []*tar.Header{}, nil continue
} }
return []*tar.Header{}, err return []*tar.Header{}, err

View File

@@ -4,10 +4,8 @@ import (
"archive/tar" "archive/tar"
"context" "context"
"io" "io"
"io/fs"
"os"
"path/filepath"
"strconv" "strconv"
"strings"
"github.com/pojntfx/stfs/internal/compression" "github.com/pojntfx/stfs/internal/compression"
"github.com/pojntfx/stfs/internal/converters" "github.com/pojntfx/stfs/internal/converters"
@@ -24,9 +22,9 @@ import (
) )
func (o *Operations) Update( func (o *Operations) Update(
from string, getSrc func() (config.FileConfig, error),
compressionLevel string, compressionLevel string,
overwrite bool, replace bool,
) ([]*tar.Header, error) { ) ([]*tar.Header, error) {
o.diskOperationLock.Lock() o.diskOperationLock.Lock()
defer o.diskOperationLock.Unlock() defer o.diskOperationLock.Unlock()
@@ -48,24 +46,27 @@ func (o *Operations) Update(
} }
hdrs := []*tar.Header{} hdrs := []*tar.Header{}
if err := filepath.Walk(from, func(path string, info fs.FileInfo, err error) error { for {
file, err := getSrc()
if err == io.EOF {
break
}
if err != nil { if err != nil {
return err return []*tar.Header{}, err
} }
link := "" hdr, err := tar.FileInfoHeader(file.Info, file.Link)
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
if link, err = os.Readlink(path); err != nil {
return err
}
}
hdr, err := tar.FileInfoHeader(info, link)
if err != nil { if err != nil {
return err // Skip sockets
if strings.Contains(err.Error(), errSocketsNotSupported.Error()) {
continue
} }
hdr.Name = path return []*tar.Header{}, err
}
hdr.Name = file.Path
hdr.Format = tar.FormatPAX hdr.Format = tar.FormatPAX
if hdr.PAXRecords == nil { if hdr.PAXRecords == nil {
hdr.PAXRecords = map[string]string{} hdr.PAXRecords = map[string]string{}
@@ -73,7 +74,8 @@ func (o *Operations) Update(
hdr.PAXRecords[records.STFSRecordVersion] = records.STFSRecordVersion1 hdr.PAXRecords[records.STFSRecordVersion] = records.STFSRecordVersion1
hdr.PAXRecords[records.STFSRecordAction] = records.STFSRecordActionUpdate hdr.PAXRecords[records.STFSRecordAction] = records.STFSRecordActionUpdate
if info.Mode().IsRegular() && overwrite { var f io.ReadSeekCloser
if file.Info.Mode().IsRegular() && replace {
// Get the compressed size for the header // Get the compressed size for the header
fileSizeCounter := &ioext.CounterWriter{ fileSizeCounter := &ioext.CounterWriter{
Writer: io.Discard, Writer: io.Discard,
@@ -81,7 +83,7 @@ func (o *Operations) Update(
encryptor, err := encryption.Encrypt(fileSizeCounter, o.pipes.Encryption, o.crypto.Recipient) encryptor, err := encryption.Encrypt(fileSizeCounter, o.pipes.Encryption, o.crypto.Recipient)
if err != nil { if err != nil {
return err return []*tar.Header{}, err
} }
compressor, err := compression.Compress( compressor, err := compression.Compress(
@@ -92,44 +94,41 @@ func (o *Operations) Update(
o.pipes.RecordSize, o.pipes.RecordSize,
) )
if err != nil { if err != nil {
return err return []*tar.Header{}, err
} }
file, err := os.Open(path) f, err = file.GetFile()
if err != nil { if err != nil {
return err return []*tar.Header{}, err
} }
defer f.Close()
signer, sign, err := signature.Sign(file, writer.DriveIsRegular, o.pipes.Signature, o.crypto.Identity) signer, sign, err := signature.Sign(f, writer.DriveIsRegular, o.pipes.Signature, o.crypto.Identity)
if err != nil { if err != nil {
return err return []*tar.Header{}, err
} }
if writer.DriveIsRegular { if writer.DriveIsRegular {
if _, err := io.Copy(compressor, signer); err != nil { if _, err := io.Copy(compressor, signer); err != nil {
return err return []*tar.Header{}, err
} }
} else { } else {
buf := make([]byte, mtio.BlockSize*o.pipes.RecordSize) buf := make([]byte, mtio.BlockSize*o.pipes.RecordSize)
if _, err := io.CopyBuffer(compressor, signer, buf); err != nil { if _, err := io.CopyBuffer(compressor, signer, buf); err != nil {
return err return []*tar.Header{}, err
} }
} }
if err := file.Close(); err != nil {
return err
}
if err := compressor.Flush(); err != nil { if err := compressor.Flush(); err != nil {
return err return []*tar.Header{}, err
} }
if err := compressor.Close(); err != nil { if err := compressor.Close(); err != nil {
return err return []*tar.Header{}, err
} }
if err := encryptor.Close(); err != nil { if err := encryptor.Close(); err != nil {
return err return []*tar.Header{}, err
} }
if hdr.PAXRecords == nil { if hdr.PAXRecords == nil {
@@ -138,7 +137,7 @@ func (o *Operations) Update(
hdr.PAXRecords[records.STFSRecordUncompressedSize] = strconv.Itoa(int(hdr.Size)) hdr.PAXRecords[records.STFSRecordUncompressedSize] = strconv.Itoa(int(hdr.Size))
signature, err := sign() signature, err := sign()
if err != nil { if err != nil {
return err return []*tar.Header{}, err
} }
if signature != "" { if signature != "" {
@@ -148,17 +147,17 @@ func (o *Operations) Update(
hdr.Name, err = suffix.AddSuffix(hdr.Name, o.pipes.Compression, o.pipes.Encryption) hdr.Name, err = suffix.AddSuffix(hdr.Name, o.pipes.Compression, o.pipes.Encryption)
if err != nil { if err != nil {
return err return []*tar.Header{}, err
} }
} }
if overwrite { if replace {
hdr.PAXRecords[records.STFSRecordReplacesContent] = records.STFSRecordReplacesContentTrue hdr.PAXRecords[records.STFSRecordReplacesContent] = records.STFSRecordReplacesContentTrue
if o.onHeader != nil { if o.onHeader != nil {
dbhdr, err := converters.TarHeaderToDBHeader(-1, -1, -1, -1, hdr) dbhdr, err := converters.TarHeaderToDBHeader(-1, -1, -1, -1, hdr)
if err != nil { if err != nil {
return err return []*tar.Header{}, err
} }
o.onHeader(&config.HeaderEvent{ o.onHeader(&config.HeaderEvent{
@@ -172,25 +171,27 @@ func (o *Operations) Update(
hdrs = append(hdrs, &hdrToAppend) hdrs = append(hdrs, &hdrToAppend)
if err := signature.SignHeader(hdr, writer.DriveIsRegular, o.pipes.Signature, o.crypto.Identity); err != nil { if err := signature.SignHeader(hdr, writer.DriveIsRegular, o.pipes.Signature, o.crypto.Identity); err != nil {
return err return []*tar.Header{}, err
} }
if err := encryption.EncryptHeader(hdr, o.pipes.Encryption, o.crypto.Recipient); err != nil { if err := encryption.EncryptHeader(hdr, o.pipes.Encryption, o.crypto.Recipient); err != nil {
return err return []*tar.Header{}, err
} }
if err := tw.WriteHeader(hdr); err != nil { if err := tw.WriteHeader(hdr); err != nil {
return err return []*tar.Header{}, err
} }
if !info.Mode().IsRegular() { dirty = true
return nil
if !file.Info.Mode().IsRegular() {
continue
} }
// Compress and write the file // Compress and write the file
encryptor, err := encryption.Encrypt(tw, o.pipes.Encryption, o.crypto.Recipient) encryptor, err := encryption.Encrypt(tw, o.pipes.Encryption, o.crypto.Recipient)
if err != nil { if err != nil {
return err return []*tar.Header{}, err
} }
compressor, err := compression.Compress( compressor, err := compression.Compress(
@@ -201,39 +202,34 @@ func (o *Operations) Update(
o.pipes.RecordSize, o.pipes.RecordSize,
) )
if err != nil { if err != nil {
return err return []*tar.Header{}, err
} }
file, err := os.Open(path) if _, err := f.Seek(0, io.SeekStart); err != nil {
if err != nil { return []*tar.Header{}, err
return err
} }
if writer.DriveIsRegular { if writer.DriveIsRegular {
if _, err := io.Copy(compressor, file); err != nil { if _, err := io.Copy(compressor, f); err != nil {
return err return []*tar.Header{}, err
} }
} else { } else {
buf := make([]byte, mtio.BlockSize*o.pipes.RecordSize) buf := make([]byte, mtio.BlockSize*o.pipes.RecordSize)
if _, err := io.CopyBuffer(compressor, file, buf); err != nil { if _, err := io.CopyBuffer(compressor, f, buf); err != nil {
return err return []*tar.Header{}, err
} }
} }
if err := file.Close(); err != nil {
return err
}
if err := compressor.Flush(); err != nil { if err := compressor.Flush(); err != nil {
return err return []*tar.Header{}, err
} }
if err := compressor.Close(); err != nil { if err := compressor.Close(); err != nil {
return err return []*tar.Header{}, err
} }
if err := encryptor.Close(); err != nil { if err := encryptor.Close(); err != nil {
return err return []*tar.Header{}, err
} }
} else { } else {
hdr.Size = 0 // Don't try to seek after the record hdr.Size = 0 // Don't try to seek after the record
@@ -241,7 +237,7 @@ func (o *Operations) Update(
if o.onHeader != nil { if o.onHeader != nil {
dbhdr, err := converters.TarHeaderToDBHeader(-1, -1, -1, -1, hdr) dbhdr, err := converters.TarHeaderToDBHeader(-1, -1, -1, -1, hdr)
if err != nil { if err != nil {
return err return []*tar.Header{}, err
} }
o.onHeader(&config.HeaderEvent{ o.onHeader(&config.HeaderEvent{
@@ -255,23 +251,19 @@ func (o *Operations) Update(
hdrs = append(hdrs, &hdrToAppend) hdrs = append(hdrs, &hdrToAppend)
if err := signature.SignHeader(hdr, writer.DriveIsRegular, o.pipes.Signature, o.crypto.Identity); err != nil { if err := signature.SignHeader(hdr, writer.DriveIsRegular, o.pipes.Signature, o.crypto.Identity); err != nil {
return err return []*tar.Header{}, err
} }
if err := encryption.EncryptHeader(hdr, o.pipes.Encryption, o.crypto.Recipient); err != nil { if err := encryption.EncryptHeader(hdr, o.pipes.Encryption, o.crypto.Recipient); err != nil {
return err return []*tar.Header{}, err
} }
if err := tw.WriteHeader(hdr); err != nil { if err := tw.WriteHeader(hdr); err != nil {
return err return []*tar.Header{}, err
} }
} }
dirty = true dirty = true
return nil
}); err != nil {
return []*tar.Header{}, err
} }
if err := cleanup(&dirty); err != nil { if err := cleanup(&dirty); err != nil {