add backups for usage-caches to rely on upon error (#18029)
This allows scanner to avoid lengthy scans, skip things appropriately and also not lose metrics in any manner. reduce longer deadlines for usage-cache loads/saves to match the disk timeout which is 2minutes now per IOP.
This commit is contained in:
@@ -38,6 +38,7 @@ import (
|
||||
"github.com/minio/minio/internal/hash"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
"github.com/valyala/bytebufferpool"
|
||||
)
|
||||
|
||||
//go:generate msgp -file $GOFILE -unexported
|
||||
@@ -927,34 +928,42 @@ type objectIO interface {
|
||||
// The loader is optimistic and has no locking, but tries 5 times before giving up.
|
||||
// If the object is not found or unable to deserialize d is cleared and nil error is returned.
|
||||
func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) error {
|
||||
// Abandon if more than 5 minutes, so we don't hold up scanner.
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
load := func(name string, timeout time.Duration) (bool, error) {
|
||||
// Abandon if more than time.Minute, so we don't hold up scanner.
|
||||
// drive timeout by default is 2 minutes, we do not need to wait longer.
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
// Caches are read+written without locks,
|
||||
retries := 0
|
||||
for retries < 5 {
|
||||
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, ObjectOptions{NoLock: true})
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case ObjectNotFound, BucketNotFound:
|
||||
case InsufficientReadQuorum, StorageErr:
|
||||
retries++
|
||||
time.Sleep(time.Duration(rand.Int63n(int64(time.Second))))
|
||||
continue
|
||||
default:
|
||||
return toObjectErr(err, dataUsageBucket, name)
|
||||
return true, nil
|
||||
}
|
||||
*d = dataUsageCache{}
|
||||
return nil
|
||||
return false, toObjectErr(err, dataUsageBucket, name)
|
||||
}
|
||||
if err := d.deserialize(r); err != nil {
|
||||
r.Close()
|
||||
err = d.deserialize(r)
|
||||
r.Close()
|
||||
return err != nil, nil
|
||||
}
|
||||
|
||||
// Caches are read+written without locks,
|
||||
retries := 0
|
||||
for retries < 5 {
|
||||
retry, err := load(name, time.Minute)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if retry {
|
||||
retry, _ = load(name+".bkp", 30*time.Second)
|
||||
if !retry {
|
||||
break
|
||||
}
|
||||
retries++
|
||||
time.Sleep(time.Duration(rand.Int63n(int64(time.Second))))
|
||||
continue
|
||||
}
|
||||
r.Close()
|
||||
return nil
|
||||
}
|
||||
*d = dataUsageCache{}
|
||||
@@ -967,47 +976,52 @@ var maxConcurrentScannerSaves = make(chan struct{}, 4)
|
||||
// save the content of the cache to minioMetaBackgroundOpsBucket with the provided name.
|
||||
// Note that no locking is done when saving.
|
||||
func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) error {
|
||||
var r io.Reader
|
||||
maxConcurrentScannerSaves <- struct{}{}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case maxConcurrentScannerSaves <- struct{}{}:
|
||||
}
|
||||
defer func() {
|
||||
<-maxConcurrentScannerSaves
|
||||
}()
|
||||
// If big, do streaming...
|
||||
size := int64(-1)
|
||||
if len(d.Cache) > 10000 {
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
pw.CloseWithError(d.serializeTo(pw))
|
||||
}()
|
||||
defer pr.Close()
|
||||
r = pr
|
||||
} else {
|
||||
var buf bytes.Buffer
|
||||
err := d.serializeTo(&buf)
|
||||
if err != nil {
|
||||
return err
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-maxConcurrentScannerSaves:
|
||||
}
|
||||
r = &buf
|
||||
size = int64(buf.Len())
|
||||
}()
|
||||
|
||||
buf := bytebufferpool.Get()
|
||||
defer func() {
|
||||
buf.Reset()
|
||||
bytebufferpool.Put(buf)
|
||||
}()
|
||||
|
||||
if err := d.serializeTo(buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hr, err := hash.NewReader(r, size, "", "", size)
|
||||
hr, err := hash.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), "", "", int64(buf.Len()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Abandon if more than 5 minutes, so we don't hold up scanner.
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
_, err = store.PutObject(ctx,
|
||||
dataUsageBucket,
|
||||
name,
|
||||
NewPutObjReader(hr),
|
||||
ObjectOptions{NoLock: true})
|
||||
if isErrBucketNotFound(err) {
|
||||
return nil
|
||||
save := func(name string, timeout time.Duration) error {
|
||||
// Abandon if more than a minute, so we don't hold up scanner.
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
_, err = store.PutObject(ctx,
|
||||
dataUsageBucket,
|
||||
name,
|
||||
NewPutObjReader(hr),
|
||||
ObjectOptions{NoLock: true})
|
||||
if isErrBucketNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
defer save(name+".bkp", 30*time.Second) // Keep a backup as well
|
||||
|
||||
// drive timeout by default is 2 minutes, we do not need to wait longer.
|
||||
return save(name, time.Minute)
|
||||
}
|
||||
|
||||
// dataUsageCacheVer indicates the cache version.
|
||||
|
||||
Reference in New Issue
Block a user