From 5e4e39988a0eb77b6dd825eae7a5cdc985b21cea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B4=94=E7=AB=9E=E5=AE=81?= Date: Wed, 14 Sep 2022 12:19:32 +0800 Subject: [PATCH] feat: add wrap for panic --- acp.go | 2 +- copy.go | 24 ++++++++++++------------ progress_bar.go | 8 ++++---- recover.go | 30 ++++++++++++++++++++++++++++++ 4 files changed, 47 insertions(+), 17 deletions(-) create mode 100644 recover.go diff --git a/acp.go b/acp.go index c7a29ce..959c509 100644 --- a/acp.go +++ b/acp.go @@ -85,7 +85,7 @@ func New(ctx context.Context, opts ...Option) (*Copyer, error) { } c.running.Add(1) - go c.run(ctx) + go wrap(ctx, func() { c.run(ctx) }) return c, nil } diff --git a/copy.go b/copy.go index f734dff..2e30a8d 100644 --- a/copy.go +++ b/copy.go @@ -30,7 +30,7 @@ func (c *Copyer) copy(ctx context.Context) { wg := new(sync.WaitGroup) wg.Add(1) - go func() { + go wrap(ctx, func() { defer wg.Done() defer close(c.writePipe) @@ -43,11 +43,11 @@ func (c *Copyer) copy(ctx context.Context) { default: } } - }() + }) for i := 0; i < c.threads; i++ { wg.Add(1) - go func() { + go wrap(ctx, func() { defer wg.Done() for { @@ -63,20 +63,20 @@ func (c *Copyer) copy(ctx context.Context) { return } } - }() + }) } - go func() { + go wrap(ctx, func() { for job := range c.postPipe { c.post(wg, job) } - }() + }) finished := make(chan struct{}, 1) - go func() { + go wrap(ctx, func() { wg.Wait() finished <- struct{}{} - }() + }) select { case <-finished: @@ -154,7 +154,7 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) { chans = append(chans, ch) wg.Add(1) - go func() { + go wrap(ctx, func() { defer wg.Done() defer sha256Pool.Put(sha) @@ -163,7 +163,7 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) { } job.setHash(sha.Sum(nil)) - }() + }) } var readErr error @@ -187,7 +187,7 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) { chans = append(chans, ch) wg.Add(1) - go func() { + go wrap(ctx, func() { defer wg.Done() var rerr error @@ -230,7 +230,7 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) { if readErr != nil { rerr = readErr } - }() + }) } if len(chans) == 0 { diff --git a/progress_bar.go b/progress_bar.go index 05f95a2..7089273 100644 --- a/progress_bar.go +++ b/progress_bar.go @@ -44,7 +44,7 @@ func (c *Copyer) startProgressBar(ctx context.Context) { ch <- func() { logrus.StandardLogger().Logf(l, format, args...) } } - go func() { + go wrap(ctx, func() { copying := make(map[int64]struct{}, c.threads) for f := range ch { @@ -77,9 +77,9 @@ func (c *Copyer) startProgressBar(ctx context.Context) { v() } } - }() + }) - go func() { + go wrap(ctx, func() { defer close(ch) ticker := time.NewTicker(barUpdateInterval) // around 255ms, avoid conflict with progress bar fresh by second @@ -112,5 +112,5 @@ func (c *Copyer) startProgressBar(ctx context.Context) { return } } - }() + }) } diff --git a/recover.go b/recover.go new file mode 100644 index 0000000..098f937 --- /dev/null +++ b/recover.go @@ -0,0 +1,30 @@ +package acp + +import ( + "context" + "fmt" + "runtime/debug" + + "github.com/sirupsen/logrus" +) + +func wrap(ctx context.Context, f func()) { + defer func() { + e := recover() + if e == nil { + return + } + + var err error + switch v := e.(type) { + case error: + err = v + default: + err = fmt.Errorf("%v", err) + } + + logrus.WithContext(ctx).WithError(err).Errorf("panic: %s", debug.Stack()) + }() + + f() +}