feat: Implement automatic indexing after archival

This commit is contained in:
Felix Pojtinger
2021-11-21 21:20:46 +01:00
parent 9b63130c5e
commit 7d53509a73
3 changed files with 357 additions and 270 deletions

View File

@@ -3,6 +3,7 @@ package cmd
import (
"archive/tar"
"bufio"
"context"
"io"
"io/fs"
"os"
@@ -12,6 +13,7 @@ import (
"github.com/pojntfx/stfs/pkg/controllers"
"github.com/pojntfx/stfs/pkg/counters"
"github.com/pojntfx/stfs/pkg/formatting"
"github.com/pojntfx/stfs/pkg/persisters"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
@@ -31,157 +33,199 @@ var archiveCmd = &cobra.Command{
return err
}
isRegular := true
stat, err := os.Stat(viper.GetString(tapeFlag))
if err == nil {
isRegular = stat.Mode().IsRegular()
} else {
if os.IsNotExist(err) {
isRegular = true
} else {
return err
}
metadataPersister := persisters.NewMetadataPersister(viper.GetString(metadataFlag))
if err := metadataPersister.Open(); err != nil {
return err
}
var f *os.File
if isRegular {
if viper.GetBool(overwriteFlag) {
f, err = os.OpenFile(viper.GetString(tapeFlag), os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
}
if err := f.Truncate(0); err != nil {
return err
}
} else {
f, err = os.OpenFile(viper.GetString(tapeFlag), os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
}
}
// No need to go to end manually due to `os.O_APPEND`
} else {
f, err = os.OpenFile(viper.GetString(tapeFlag), os.O_APPEND|os.O_WRONLY, os.ModeCharDevice)
lastIndexedRecord := int64(0)
lastIndexedBlock := int64(0)
if !viper.GetBool(overwriteFlag) {
r, b, err := metadataPersister.GetLastIndexedRecordAndBlock(context.Background(), viper.GetInt(recordSizeFlag))
if err != nil {
return err
}
if viper.GetBool(overwriteFlag) {
// Go to start of tape
if err := controllers.SeekToRecordOnTape(f, 0); err != nil {
return err
}
} else {
// Go to end of tape
if err := controllers.GoToEndOfTape(f); err != nil {
return err
}
lastIndexedRecord = r
lastIndexedBlock = b
}
if err := archive(
viper.GetString(tapeFlag),
viper.GetInt(recordSizeFlag),
viper.GetString(srcFlag),
viper.GetBool(overwriteFlag),
); err != nil {
return err
}
return index(
viper.GetString(tapeFlag),
viper.GetString(metadataFlag),
viper.GetInt(recordSizeFlag),
int(lastIndexedRecord),
int(lastIndexedBlock),
viper.GetBool(overwriteFlag),
)
},
}
func archive(
tape string,
recordSize int,
src string,
overwrite bool,
) error {
isRegular := true
stat, err := os.Stat(tape)
if err == nil {
isRegular = stat.Mode().IsRegular()
} else {
if os.IsNotExist(err) {
isRegular = true
} else {
return err
}
}
var f *os.File
if isRegular {
if overwrite {
f, err = os.OpenFile(tape, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
}
if err := f.Truncate(0); err != nil {
return err
}
} else {
f, err = os.OpenFile(tape, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
}
}
defer f.Close()
dirty := false
var tw *tar.Writer
var bw *bufio.Writer
var counter *counters.CounterWriter
if isRegular {
tw = tar.NewWriter(f)
} else {
bw = bufio.NewWriterSize(f, controllers.BlockSize*viper.GetInt(recordSizeFlag))
counter = &counters.CounterWriter{Writer: bw, BytesRead: 0}
tw = tar.NewWriter(counter)
// No need to go to end manually due to `os.O_APPEND`
} else {
f, err = os.OpenFile(tape, os.O_APPEND|os.O_WRONLY, os.ModeCharDevice)
if err != nil {
return err
}
defer func() {
// Only write the trailer if we wrote to the archive
if dirty {
if err := tw.Close(); err != nil {
panic(err)
}
if !isRegular {
if controllers.BlockSize*viper.GetInt(recordSizeFlag)-counter.BytesRead > 0 {
// Fill the rest of the record with zeros
if _, err := bw.Write(make([]byte, controllers.BlockSize*viper.GetInt(recordSizeFlag)-counter.BytesRead)); err != nil {
panic(err)
}
}
if overwrite {
// Go to start of tape
if err := controllers.SeekToRecordOnTape(f, 0); err != nil {
return err
}
} else {
// Go to end of tape
if err := controllers.GoToEndOfTape(f); err != nil {
return err
}
}
}
defer f.Close()
if err := bw.Flush(); err != nil {
dirty := false
var tw *tar.Writer
var bw *bufio.Writer
var counter *counters.CounterWriter
if isRegular {
tw = tar.NewWriter(f)
} else {
bw = bufio.NewWriterSize(f, controllers.BlockSize*recordSize)
counter = &counters.CounterWriter{Writer: bw, BytesRead: 0}
tw = tar.NewWriter(counter)
}
defer func() {
// Only write the trailer if we wrote to the archive
if dirty {
if err := tw.Close(); err != nil {
panic(err)
}
if !isRegular {
if controllers.BlockSize*recordSize-counter.BytesRead > 0 {
// Fill the rest of the record with zeros
if _, err := bw.Write(make([]byte, controllers.BlockSize*recordSize-counter.BytesRead)); err != nil {
panic(err)
}
}
}
}()
first := true
return filepath.Walk(viper.GetString(srcFlag), func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
link := ""
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
if link, err = os.Readlink(path); err != nil {
return err
if err := bw.Flush(); err != nil {
panic(err)
}
}
}
}()
hdr, err := tar.FileInfoHeader(info, link)
if err != nil {
first := true
return filepath.Walk(src, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
link := ""
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
if link, err = os.Readlink(path); err != nil {
return err
}
}
hdr, err := tar.FileInfoHeader(info, link)
if err != nil {
return err
}
if err := adapters.EnhanceHeader(path, hdr); err != nil {
return err
}
hdr.Name = path
hdr.Format = tar.FormatPAX
if first {
if err := formatting.PrintCSV(formatting.TARHeaderCSV); err != nil {
return err
}
if err := adapters.EnhanceHeader(path, hdr); err != nil {
return err
}
first = false
}
hdr.Name = path
hdr.Format = tar.FormatPAX
if err := formatting.PrintCSV(formatting.GetTARHeaderAsCSV(-1, -1, hdr)); err != nil {
return err
}
if first {
if err := formatting.PrintCSV(formatting.TARHeaderCSV); err != nil {
return err
}
first = false
}
if err := formatting.PrintCSV(formatting.GetTARHeaderAsCSV(-1, -1, hdr)); err != nil {
return err
}
if err := tw.WriteHeader(hdr); err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
if isRegular {
if _, err := io.Copy(tw, file); err != nil {
return err
}
} else {
buf := make([]byte, controllers.BlockSize*viper.GetInt(recordSizeFlag))
if _, err := io.CopyBuffer(tw, file, buf); err != nil {
return err
}
}
dirty = true
if err := tw.WriteHeader(hdr); err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
})
},
}
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
if isRegular {
if _, err := io.Copy(tw, file); err != nil {
return err
}
} else {
buf := make([]byte, controllers.BlockSize*recordSize)
if _, err := io.CopyBuffer(tw, file, buf); err != nil {
return err
}
}
dirty = true
return nil
})
}
func init() {

View File

@@ -27,181 +27,199 @@ var indexCmd = &cobra.Command{
return err
}
if viper.GetBool(overwriteFlag) {
f, err := os.OpenFile(viper.GetString(metadataFlag), os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
}
return index(
viper.GetString(tapeFlag),
viper.GetString(metadataFlag),
viper.GetInt(recordSizeFlag),
viper.GetInt(recordFlag),
viper.GetInt(blockFlag),
viper.GetBool(overwriteFlag),
)
},
}
if err := f.Truncate(0); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
}
metadataPersister := persisters.NewMetadataPersister(viper.GetString(metadataFlag))
if err := metadataPersister.Open(); err != nil {
return err
}
fileDescription, err := os.Stat(viper.GetString(tapeFlag))
func index(
tape string,
metadata string,
recordSize int,
record int,
block int,
overwrite bool,
) error {
if overwrite {
f, err := os.OpenFile(metadata, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
}
var f *os.File
if fileDescription.Mode().IsRegular() {
f, err = os.Open(viper.GetString(tapeFlag))
if err != nil {
return err
}
} else {
f, err = os.OpenFile(viper.GetString(tapeFlag), os.O_RDONLY, os.ModeCharDevice)
if err != nil {
return err
}
if err := f.Truncate(0); err != nil {
return err
}
defer f.Close()
if fileDescription.Mode().IsRegular() {
// Seek to record and block
if _, err := f.Seek(int64((viper.GetInt(recordSizeFlag)*controllers.BlockSize*viper.GetInt(recordFlag))+viper.GetInt(blockFlag)*controllers.BlockSize), 0); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
}
tr := tar.NewReader(f)
metadataPersister := persisters.NewMetadataPersister(metadata)
if err := metadataPersister.Open(); err != nil {
return err
}
record := viper.GetInt64(recordFlag)
block := viper.GetInt64(blockFlag)
fileDescription, err := os.Stat(tape)
if err != nil {
return err
}
for {
hdr, err := tr.Next()
if err != nil {
// Seek right after the next two blocks to skip the trailer
if _, err := f.Seek((controllers.BlockSize * 2), io.SeekCurrent); err == nil {
curr, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
var f *os.File
if fileDescription.Mode().IsRegular() {
f, err = os.Open(tape)
if err != nil {
return err
}
} else {
f, err = os.OpenFile(tape, os.O_RDONLY, os.ModeCharDevice)
if err != nil {
return err
}
}
defer f.Close()
nextTotalBlocks := math.Ceil(float64((curr)) / float64(controllers.BlockSize))
record = int64(nextTotalBlocks) / int64(viper.GetInt(recordSizeFlag))
block = int64(nextTotalBlocks) - (record * int64(viper.GetInt(recordSizeFlag))) - 2
if fileDescription.Mode().IsRegular() {
// Seek to record and block
if _, err := f.Seek(int64((recordSize*controllers.BlockSize*record)+block*controllers.BlockSize), 0); err != nil {
return err
}
if block < 0 {
record--
block = int64(viper.GetInt(recordSizeFlag)) - 1
} else if block >= int64(viper.GetInt(recordSizeFlag)) {
record++
block = 0
}
tr := tar.NewReader(f)
// Seek to record and block
if _, err := f.Seek(int64((viper.GetInt(recordSizeFlag)*controllers.BlockSize*int(record))+int(block)*controllers.BlockSize), io.SeekStart); err != nil {
return err
}
record := int64(record)
block := int64(block)
tr = tar.NewReader(f)
hdr, err = tr.Next()
if err != nil {
if err == io.EOF {
break
}
return err
}
} else {
for {
hdr, err := tr.Next()
if err != nil {
// Seek right after the next two blocks to skip the trailer
if _, err := f.Seek((controllers.BlockSize * 2), io.SeekCurrent); err == nil {
curr, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
}
if err := indexHeader(record, block, hdr, metadataPersister); err != nil {
return nil
}
nextTotalBlocks := math.Ceil(float64((curr)) / float64(controllers.BlockSize))
record = int64(nextTotalBlocks) / int64(recordSize)
block = int64(nextTotalBlocks) - (record * int64(recordSize)) - 2
curr, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
if block < 0 {
record--
block = int64(recordSize) - 1
} else if block >= int64(recordSize) {
record++
block = 0
}
nextTotalBlocks := math.Ceil(float64((curr + hdr.Size)) / float64(controllers.BlockSize))
record = int64(nextTotalBlocks) / int64(viper.GetInt(recordSizeFlag))
block = int64(nextTotalBlocks) - (record * int64(viper.GetInt(recordSizeFlag)))
// Seek to record and block
if _, err := f.Seek(int64((recordSize*controllers.BlockSize*int(record))+int(block)*controllers.BlockSize), io.SeekStart); err != nil {
return err
}
if block > int64(viper.GetInt(recordSizeFlag)) {
record++
block = 0
}
}
} else {
// Seek to record
if err := controllers.SeekToRecordOnTape(f, int32(viper.GetInt(recordFlag))); err != nil {
return err
}
// Seek to block
br := bufio.NewReaderSize(f, controllers.BlockSize*viper.GetInt(recordSizeFlag))
if _, err := br.Read(make([]byte, viper.GetInt(blockFlag)*controllers.BlockSize)); err != nil {
return err
}
record := viper.GetInt64(recordFlag)
block := viper.GetInt64(blockFlag)
curr := int64((viper.GetInt(recordSizeFlag) * controllers.BlockSize * viper.GetInt(recordFlag)) + (viper.GetInt(blockFlag) * controllers.BlockSize))
counter := &counters.CounterReader{Reader: br, BytesRead: int(curr)}
tr := tar.NewReader(counter)
for {
hdr, err := tr.Next()
if err != nil {
if err == io.EOF {
if err := controllers.GoToNextFileOnTape(f); err != nil {
// EOD
tr = tar.NewReader(f)
hdr, err = tr.Next()
if err != nil {
if err == io.EOF {
break
}
record, err = controllers.GetCurrentRecordFromTape(f)
if err != nil {
return err
}
block = 0
br = bufio.NewReaderSize(f, controllers.BlockSize*viper.GetInt(recordSizeFlag))
curr := int64(int64(viper.GetInt(recordSizeFlag)) * controllers.BlockSize * record)
counter := &counters.CounterReader{Reader: br, BytesRead: int(curr)}
tr = tar.NewReader(counter)
continue
} else {
return err
}
}
if err := indexHeader(record, block, hdr, metadataPersister); err != nil {
return nil
}
curr = int64(counter.BytesRead)
nextTotalBlocks := math.Ceil(float64((curr + hdr.Size)) / float64(controllers.BlockSize))
record = int64(nextTotalBlocks) / int64(viper.GetInt(recordSizeFlag))
block = int64(nextTotalBlocks) - (record * int64(viper.GetInt(recordSizeFlag)))
if block > int64(viper.GetInt(recordSizeFlag)) {
record++
block = 0
} else {
return err
}
}
if err := indexHeader(record, block, hdr, metadataPersister); err != nil {
return nil
}
curr, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
nextTotalBlocks := math.Ceil(float64((curr + hdr.Size)) / float64(controllers.BlockSize))
record = int64(nextTotalBlocks) / int64(recordSize)
block = int64(nextTotalBlocks) - (record * int64(recordSize))
if block > int64(recordSize) {
record++
block = 0
}
}
} else {
// Seek to record
if err := controllers.SeekToRecordOnTape(f, int32(record)); err != nil {
return err
}
return nil
},
// Seek to block
br := bufio.NewReaderSize(f, controllers.BlockSize*recordSize)
if _, err := br.Read(make([]byte, block*controllers.BlockSize)); err != nil {
return err
}
record := int64(record)
block := int64(block)
curr := int64((recordSize * controllers.BlockSize * int(record)) + (int(block) * controllers.BlockSize))
counter := &counters.CounterReader{Reader: br, BytesRead: int(curr)}
tr := tar.NewReader(counter)
for {
hdr, err := tr.Next()
if err != nil {
if err == io.EOF {
if err := controllers.GoToNextFileOnTape(f); err != nil {
// EOD
break
}
record, err = controllers.GetCurrentRecordFromTape(f)
if err != nil {
return err
}
block = 0
br = bufio.NewReaderSize(f, controllers.BlockSize*recordSize)
curr = int64(int64(recordSize) * controllers.BlockSize * record)
counter = &counters.CounterReader{Reader: br, BytesRead: int(curr)}
tr = tar.NewReader(counter)
continue
} else {
return err
}
}
if err := indexHeader(record, block, hdr, metadataPersister); err != nil {
return nil
}
curr = int64(counter.BytesRead)
nextTotalBlocks := math.Ceil(float64((curr + hdr.Size)) / float64(controllers.BlockSize))
record = int64(nextTotalBlocks) / int64(recordSize)
block = int64(nextTotalBlocks) - (record * int64(recordSize))
if block > int64(recordSize) {
record++
block = 0
}
}
}
return nil
}
func init() {

View File

@@ -6,11 +6,13 @@ package persisters
import (
"context"
"database/sql"
"fmt"
"github.com/pojntfx/stfs/pkg/db/sqlite/migrations/metadata"
models "github.com/pojntfx/stfs/pkg/db/sqlite/models/metadata"
migrate "github.com/rubenv/sql-migrate"
"github.com/volatiletech/sqlboiler/v4/boil"
"github.com/volatiletech/sqlboiler/v4/queries"
)
type MetadataPersister struct {
@@ -70,3 +72,26 @@ func (p *MetadataPersister) DeleteHeader(ctx context.Context, name string, ignor
return hdr, nil
}
func (p *MetadataPersister) GetLastIndexedRecordAndBlock(ctx context.Context, recordSize int) (int64, int64, error) {
var header models.Header
if err := queries.Raw(
fmt.Sprintf(
`select %v, %v, ((%v*$1)+%v) as location from %v order by location desc limit 1`,
models.HeaderColumns.Record,
models.HeaderColumns.Block,
models.HeaderColumns.Record,
models.HeaderColumns.Block,
models.TableNames.Headers,
),
recordSize,
).Bind(ctx, p.db, &header); err != nil {
if err == sql.ErrNoRows {
return 0, 0, nil
}
return 0, 0, err
}
return header.Record, header.Block, nil
}