feat: add library

This commit is contained in:
Samuel N Cui
2022-09-07 16:54:54 +08:00
parent cbcc6cc575
commit af8c37b18e
11 changed files with 352 additions and 79 deletions

11
cmd/argvtest/main.go Normal file
View File

@@ -0,0 +1,11 @@
package main
import (
"os"
"github.com/davecgh/go-spew/spew"
)
func main() {
spew.Dump(os.Args)
}

View File

@@ -2,14 +2,20 @@ package main
import (
"context"
"encoding/json"
"fmt"
"hash"
"io"
"os"
"os/signal"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/abc950309/tapewriter/library"
"github.com/abc950309/tapewriter/mmap"
"github.com/minio/sha256-simd"
"github.com/schollz/progressbar/v3"
"github.com/sirupsen/logrus"
)
@@ -19,61 +25,103 @@ const (
batchSize = 1024 * 1024
)
var (
shaPool = &sync.Pool{New: func() interface{} { return sha256.New() }}
)
func main() {
src, dst := os.Args[1], os.Args[2]
c, err := NewCopyer(dst, src)
if err != nil {
panic(err)
}
c.Run()
if p := os.Getenv("ORDERCP_REPORT_PATH"); p != "" {
errs := make([]string, 0, len(c.errs))
for _, e := range c.errs {
errs = append(errs, e.Error())
}
report, _ := json.Marshal(map[string]interface{}{"errors": errs, "files": c.results})
n := os.Getenv("ORDERCP_REPORT_FILENAME")
if n == "" {
n = time.Now().Format("2006-01-02T15:04:05.999999.csv")
}
r, err := os.Create(fmt.Sprintf("%s/%s", p, n))
if err != nil {
logrus.Warnf("open report fail, path= '%s', err= %w", fmt.Sprintf("%s/%s", p, n), err)
logrus.Infof("report: %s", report)
return
}
defer r.Close()
r.Write(report)
}
}
type Copyer struct {
bar *progressbar.ProgressBar
dst, src string
fromTape bool
src []string
dst string
copyed int64
num int64
files []*Job
errs []error
copyPipe chan *CopyJob
changePipe chan *Job
results []*library.TapeFile
}
func NewCopyer(dst, src string) (*Copyer, error) {
dst, src = strings.TrimSpace(dst), strings.TrimSpace(src)
func NewCopyer(dst string, src ...string) (*Copyer, error) {
dst = strings.TrimSpace(dst)
if dst == "" {
return nil, fmt.Errorf("dst not found")
}
if src == "" {
return nil, fmt.Errorf("src not found")
}
if dst[len(dst)-1] != '/' {
dst = dst + "/"
}
filtered := make([]string, 0, len(src))
for _, s := range src {
s = strings.TrimSpace(s)
if s == "" {
continue
}
srcStat, err := os.Stat(s)
if err != nil {
return nil, fmt.Errorf("check src path '%s', %w", src, err)
}
if srcStat.IsDir() && s[len(s)-1] != '/' {
s = s + "/"
}
filtered = append(filtered, s)
}
if len(filtered) == 0 {
return nil, fmt.Errorf("src not found")
}
src = filtered
dstStat, err := os.Stat(dst)
if err != nil {
return nil, fmt.Errorf("dst path '%s', %w", dst, err)
return nil, fmt.Errorf("check dst path '%s', %w", dst, err)
}
if !dstStat.IsDir() {
return nil, fmt.Errorf("dst path is not a dir")
}
srcStat, err := os.Stat(src)
if err != nil {
return nil, fmt.Errorf("src path '%s', %w", src, err)
}
if srcStat.IsDir() && src[len(src)-1] != '/' {
src = src + "/"
}
c := &Copyer{
dst: dst, src: src,
copyPipe: make(chan *CopyJob, 32),
changePipe: make(chan *Job, 8),
}
c.walk("", true)
for _, s := range c.src {
c.walk(s, "", true)
}
var total int64
for _, file := range c.files {
@@ -86,6 +134,7 @@ func NewCopyer(dst, src string) (*Copyer, error) {
func (c *Copyer) Run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
@@ -98,6 +147,25 @@ func (c *Copyer) Run() {
}
}()
go func() {
ticker := time.NewTicker(time.Millisecond * 500)
defer ticker.Stop()
last := int64(0)
for range ticker.C {
current := atomic.LoadInt64(&c.copyed)
c.bar.Add(int(current - last))
last = current
select {
case <-ctx.Done():
close(c.copyPipe)
return
default:
}
}
}()
go func() {
for _, file := range c.files {
c.prepare(ctx, file)
@@ -114,11 +182,23 @@ func (c *Copyer) Run() {
go func() {
for copyer := range c.copyPipe {
if err := c.copy(ctx, copyer); err != nil {
hash, err := c.copy(ctx, copyer)
if err != nil {
c.ReportError(c.dst+copyer.Path, err)
if err := os.Remove(c.dst + copyer.Path); err != nil {
c.ReportError(c.dst+copyer.Path, fmt.Errorf("delete file with error fail, %w", err))
}
} else {
if !copyer.Mode.IsDir() {
c.results = append(c.results, &library.TapeFile{
Path: copyer.Path,
Size: copyer.Size,
Mode: copyer.Mode,
ModTime: copyer.ModTime,
WriteTime: time.Now(),
Hash: hash,
})
}
}
select {
@@ -141,14 +221,16 @@ func (c *Copyer) ReportError(file string, err error) {
c.errs = append(c.errs, fmt.Errorf("'%s': %w", file, err))
}
func (c *Copyer) walk(path string, first bool) {
stat, err := os.Stat(c.src + path)
func (c *Copyer) walk(src, path string, first bool) {
name := src + path
stat, err := os.Stat(name)
if err != nil {
c.ReportError(c.src+path, fmt.Errorf("walk get stat, %w", err))
c.ReportError(name, fmt.Errorf("walk get stat, %w", err))
return
}
job := NewJobFromFileInfo(path, stat)
job := NewJobFromFileInfo(src, path, stat)
if job.Mode&unexpectFileMode != 0 {
return
}
@@ -160,14 +242,14 @@ func (c *Copyer) walk(path string, first bool) {
return
}
if first {
files, err := os.ReadDir(c.src + path)
files, err := os.ReadDir(name)
if err != nil {
c.ReportError(c.src+path, fmt.Errorf("walk read dir, %w", err))
c.ReportError(name, fmt.Errorf("walk read dir, %w", err))
return
}
for _, file := range files {
c.walk(file.Name(), false)
c.walk(src, file.Name(), false)
}
return
}
@@ -177,18 +259,14 @@ func (c *Copyer) walk(path string, first bool) {
enterJob.Type = JobTypeEnterDir
c.files = append(c.files, enterJob)
files, err := os.ReadDir(c.src + path)
files, err := os.ReadDir(name)
if err != nil {
c.ReportError(c.src+path, fmt.Errorf("walk read dir, %w", err))
c.ReportError(name, fmt.Errorf("walk read dir, %w", err))
return
}
for _, file := range files {
if first {
c.walk(file.Name(), false)
continue
}
c.walk(path+"/"+file.Name(), false)
c.walk(src, path+"/"+file.Name(), false)
}
exitJob := new(Job)
@@ -212,7 +290,7 @@ func (c *Copyer) prepare(ctx context.Context, job *Job) {
return
}
name := c.src + job.Path
name := job.Source + job.Path
file, err := mmap.Open(name)
if err != nil {
c.ReportError(name, fmt.Errorf("open src file fail, %w", err))
@@ -222,27 +300,28 @@ func (c *Copyer) prepare(ctx context.Context, job *Job) {
c.copyPipe <- &CopyJob{Job: job, src: file}
}
func (c *Copyer) copy(ctx context.Context, job *CopyJob) error {
func (c *Copyer) copy(ctx context.Context, job *CopyJob) ([]byte, error) {
if job.src == nil {
c.changePipe <- job.Job
return nil
return nil, nil
}
defer job.src.Close()
name := c.dst + job.Path
file, err := os.Create(name)
if err != nil {
return fmt.Errorf("open dst file fail, %w", err)
return nil, fmt.Errorf("open dst file fail, %w", err)
}
defer file.Close()
c.bar.Describe(fmt.Sprintf("[%d/%d]: %s", job.Number, c.num, job.Path))
if err := c.streamCopy(ctx, file, job.src); err != nil {
return fmt.Errorf("copy file fail, %w", err)
hash, err := c.streamCopy(ctx, file, job.src)
if err != nil {
return nil, fmt.Errorf("copy file fail, %w", err)
}
c.changePipe <- job.Job
return nil
return hash, nil
}
func (c *Copyer) changeInfo(info *Job) {
@@ -256,37 +335,69 @@ func (c *Copyer) changeInfo(info *Job) {
}
}
func (c *Copyer) streamCopy(ctx context.Context, dst io.Writer, src *mmap.ReaderAt) error {
for idx := int64(0); ; idx += batchSize {
buf, err := src.Slice(idx, batchSize)
if err != nil {
return fmt.Errorf("slice mmap fail, %w", err)
}
nr := len(buf)
nw, ew := dst.Write(buf)
if nw < 0 || nr < nw {
nw = 0
if ew == nil {
return fmt.Errorf("write fail, unexpected return, byte_num= %d", nw)
}
return fmt.Errorf("write fail, %w", ew)
}
if nr != nw {
return fmt.Errorf("write fail, write and read bytes not equal, read= %d write= %d", nr, nw)
}
c.bar.Add(nr)
if len(buf) < batchSize {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
func (c *Copyer) streamCopy(ctx context.Context, dst io.Writer, src *mmap.ReaderAt) (h []byte, err error) {
if src.Len() == 0 {
return nil, nil
}
sha := shaPool.Get().(hash.Hash)
sha.Reset()
defer shaPool.Put(sha)
var wg sync.WaitGroup
hashChan := make(chan []byte, 4)
defer func() {
close(hashChan)
if err != nil {
return
}
wg.Wait()
h = sha.Sum(nil)
}()
wg.Add(1)
go func() {
defer wg.Done()
for buf := range hashChan {
sha.Write(buf)
}
}()
err = func() error {
for idx := int64(0); ; idx += batchSize {
buf, err := src.Slice(idx, batchSize)
if err != nil {
return fmt.Errorf("slice mmap fail, %w", err)
}
nr := len(buf)
hashChan <- buf
nw, ew := dst.Write(buf)
if nw < 0 || nr < nw {
nw = 0
if ew == nil {
return fmt.Errorf("write fail, unexpected return, byte_num= %d", nw)
}
return fmt.Errorf("write fail, %w", ew)
}
if nr != nw {
return fmt.Errorf("write fail, write and read bytes not equal, read= %d write= %d", nr, nw)
}
atomic.AddInt64(&c.copyed, int64(nr))
if len(buf) < batchSize {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
}()
return
}
type JobType uint8
@@ -298,6 +409,7 @@ const (
)
type Job struct {
Source string
Path string
Type JobType
Number int64
@@ -307,8 +419,9 @@ type Job struct {
ModTime time.Time // modification time
}
func NewJobFromFileInfo(path string, info os.FileInfo) *Job {
func NewJobFromFileInfo(src, path string, info os.FileInfo) *Job {
job := &Job{
Source: src,
Path: path,
Name: info.Name(),
Size: info.Size(),

9
go.mod
View File

@@ -8,10 +8,19 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/schollz/progressbar/v3 v3.10.1
github.com/sirupsen/logrus v1.9.0
gorm.io/driver/mysql v1.3.6
gorm.io/driver/sqlite v1.3.6
gorm.io/gorm v1.23.8
)
require (
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/cpuid/v2 v2.0.4 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mattn/go-sqlite3 v1.14.12 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/rivo/uniseg v0.3.4 // indirect
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect

20
go.sum
View File

@@ -5,10 +5,23 @@ github.com/benmcclelland/sgio v0.0.0-20180629175614-f710aebf64c1/go.mod h1:Wdrap
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QHbiw=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ1Z0=
github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -34,3 +47,10 @@ golang.org/x/term v0.0.0-20220722155259-a9ba230a4035/go.mod h1:jbD1KX2456YbFQfuX
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/mysql v1.3.6 h1:BhX1Y/RyALb+T9bZ3t07wLnPZBukt+IRkMn8UZSNbGM=
gorm.io/driver/mysql v1.3.6/go.mod h1:sSIebwZAVPiT+27jK9HIwvsqOGKx3YMPmrA3mBJR10c=
gorm.io/driver/sqlite v1.3.6 h1:Fi8xNYCUplOqWiPa3/GuCeowRNBRGTf62DEmhMDHeQQ=
gorm.io/driver/sqlite v1.3.6/go.mod h1:Sg1/pvnKtbQ7jLXxfZa+jSHvoX8hoZA8cn4xllOMTgE=
gorm.io/gorm v1.23.4/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
gorm.io/gorm v1.23.8 h1:h8sGJ+biDgBA1AD1Ha9gFCx7h8npU7AsLdlkX0n2TpE=
gorm.io/gorm v1.23.8/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=

10
library/file.go Normal file
View File

@@ -0,0 +1,10 @@
package library
type File struct {
ID int64 `gorm:"primaryKey;autoIncrement"`
Path string `gorm:"type:varchar(4096)"`
Name string `gorm:"type:varchar(256)"`
Hash []byte `gorm:"type:varbinary(32)"` // sha256
Size int64
}

14
library/library.go Normal file
View File

@@ -0,0 +1,14 @@
package library
import (
"gorm.io/gorm"
)
type Library struct {
db *gorm.DB
prefix string
}
func NewLibrary(db *gorm.DB, prefix string) *Library {
return &Library{db: db, prefix: prefix}
}

27
library/position.go Normal file
View File

@@ -0,0 +1,27 @@
package library
import (
"time"
"gorm.io/gorm"
)
type Position struct {
ID int64 `gorm:"primaryKey;autoIncrement"`
FileID int64
TapeID int64
Path string `gorm:"type:varchar(4096)"`
Mode uint32
ModTime time.Time
WriteTime time.Time
Size int64
Hash []byte `gorm:"type:varbinary(32)"` // sha256
}
func (l *Library) PositionScope(db *gorm.DB) *gorm.DB {
if l.prefix == "" {
return db
}
return db.Table(l.prefix + "_position")
}

45
library/tape.go Normal file
View File

@@ -0,0 +1,45 @@
package library
import (
"os"
"time"
"gorm.io/gorm"
)
type Tape struct {
ID int64 `gorm:"primaryKey;autoIncrement"`
Barcode string
Name string
Encryption string
CreateTimestamp int64
DestroyTimestamp int64
}
func (l *Library) TapeScope(db *gorm.DB) *gorm.DB {
if l.prefix == "" {
return db
}
return db.Table(l.prefix + "_tape")
}
type TapeFile struct {
Path string `json:"path"`
Size int64 `json:"size"`
Mode os.FileMode `json:"mode"`
ModTime time.Time `json:"mod_time"`
WriteTime time.Time `json:"write_time"`
Hash []byte `json:"hash"` // sha256
}
// func (l *Library) SaveTape(ctx context.Context, tape *Tape, files []*TapeFile) (*Tape, error) {
// if r := l.db.WithContext(ctx).Scopes(l.TapeScope).Save(tape); r.Error != nil {
// return nil, fmt.Errorf("save tape fail, err= %w", r.Error)
// }
// positions := make([]*Position, 0, len(files))
// for _, file := range files {
// }
// l.db.WithContext(ctx).Scopes(l.PositionScope).CreateBatchSize()
// }

View File

@@ -5,12 +5,11 @@ echo "format tape as number '$1', name '$2'"
echo "copy '$3' to tape"
stenc -f /dev/st0 -e on -k /root/tape.key -a 1 --ckod
sleep 3
mkltfs -f -d /dev/st0 -s $1 -n $2
sleep 3
ltfs -o noatime -o sync_type=unmount -o work_directory=/opt/ltfs -o capture_index -o min_pool_size=256 -o max_pool_size=1024 -o eject /ltfs
sleep 3
ordercp $3 /ltfs/
sleep 3
umount /ltfs
until mt -f /dev/st0 rewoffl; do
echo 'waiting for unmount write index...'
sleep 5
done

View File

@@ -12,12 +12,15 @@ import (
"errors"
"fmt"
"io"
"log"
"os"
"runtime"
"syscall"
)
const (
prefetchMaxSize = 16 * 1024 * 1024
)
// debug is whether to print debugging messages for manual testing.
//
// The runtime.SetFinalizer documentation says that, "The finalizer for x is
@@ -123,8 +126,10 @@ func Open(filename string) (*ReaderAt, error) {
if err != nil {
return nil, fmt.Errorf("create mmap fail, %q, %w", filename, err)
}
if err := syscall.Madvise(mem, syscall.MADV_SEQUENTIAL|syscall.MADV_WILLNEED); err != nil {
return nil, fmt.Errorf("madvise fail, %q, %w", filename, err)
if size <= prefetchMaxSize {
if err := syscall.Madvise(data, syscall.MADV_SEQUENTIAL|syscall.MADV_WILLNEED); err != nil {
return nil, fmt.Errorf("madvise fail, %q, %w", filename, err)
}
}
r := &ReaderAt{data}

20
resource/db.go Normal file
View File

@@ -0,0 +1,20 @@
package resource
import (
"gorm.io/driver/mysql"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func NewDBConn(dialect, dsn string) (*gorm.DB, error) {
var dialector gorm.Dialector
switch dialect {
case "mysql":
dialector = mysql.Open(dsn)
case "sqlite":
dialector = sqlite.Open(dsn)
}
return gorm.Open(dialector)
}