mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* fix(mount): remove fid pool to stop master over-allocating volumes
The writeback-cache fid pool pre-allocated file IDs with
ExpectedDataSize = ChunkSizeLimit (typically 8+ MB). The master's
PickForWrite charges count * expectedDataSize against the volume's
effectiveSize, so a full pool refill could charge hundreds of MB
against a single volume before any bytes were actually written.
That tripped RecordAssign's hard-limit path and eagerly removed
volumes from writable, causing the master to grow new volumes
even when the real data being written was tiny.
Drop the pool entirely. Every chunk upload goes through
UploadWithRetry -> AssignVolume with no ExpectedDataSize hint,
letting the master fall back to the 1 MB default estimate. The
mount->filer grpc connection is already cached in pb.WithGrpcClient
(non-streaming mode), so per-chunk AssignVolume is a unary RPC
over an existing HTTP/2 stream, not a full dial. Path-based
filer.conf storage rules now apply to mount chunk assigns again,
which the pool had to skip.
Also remove the now-unused operation.UploadWithAssignFunc and its
AssignFunc type.
* fix(upload): populate ExpectedDataSize from actual chunk bytes
UploadWithRetry already buffers the full chunk into `data` before
calling AssignVolume, so the real size is known. Previously the
assign request went out with ExpectedDataSize=0, making the master
fall back to the 1 MB DefaultNeedleSizeEstimate per fid — same
over-reservation symptom the pool had, just smaller per call.
Stamp ExpectedDataSize = len(data) before the assign RPC when the
caller hasn't already set it. This covers mount chunk uploads,
filer_copy, filersink, mq/logstore, broker_write, gateway_upload,
and nfs — all the UploadWithRetry paths.
* fix(assign): pass real ExpectedDataSize at every assign call site
After removing the mount fid pool, per-chunk AssignVolume calls went
out with ExpectedDataSize=0, making the master fall back to its 1 MB
DefaultNeedleSizeEstimate. That's still an over-estimate for small
writes. Thread the real payload size through every remaining assign
site so RecordAssign charges effectiveSize accurately and stops
prematurely marking volumes full.
- filer: assignNewFileInfo now takes expectedDataSize and stamps it
on both primary and alternate VolumeAssignRequests. Callers pass:
- SSE data-to-chunk: len(data)
- copy manifest save: len(data)
- streamCopyChunk: srcChunk.Size
- TUS sub-chunk: bytes read
- saveAsChunk (autochunk/manifestize): 0 (small, size unknown
until the reader is drained; master uses 1 MB default)
- filer gRPC remote fetch-and-write: ExpectedDataSize = chunkSize
after the adaptive chunkSize is computed.
- ChunkedUploadOption.AssignFunc gains an expectedDataSize parameter;
upload_chunked.go passes the buffered dataSize at the call site.
S3 PUT assignFunc stamps it on the AssignVolumeRequest.
- S3 copy: assignNewVolume / prepareChunkCopy take expectedDataSize;
all seven call sites pass the source chunk's Size.
- operation.SubmitFiles / FilePart.Upload: derive per-fid size from
FileSize (average for batched requests, real per-chunk size for
sequential chunk assigns).
- benchmark: pass fileSize.
- filer append-to-file: pass len(data).
* fix(assign): thread size through SaveDataAsChunkFunctionType
The saveAsChunk path (autochunk, filer_copy, webdav, mount) ran
AssignVolume before the reader was drained, so it had to pass
ExpectedDataSize=0 and fall back to the master's 1 MB default.
Add an expectedDataSize parameter to SaveDataAsChunkFunctionType.
- mergeIntoManifest already has the serialized manifest bytes, so
it passes uint64(len(data)) directly.
- Mount's saveDataAsChunk ignores the parameter because it uses
UploadWithRetry, which already stamps len(data) on the assign
after reading the payload.
- webdav and filer_copy saveDataAsChunk follow the same UploadWithRetry
path and also ignore the hint.
- Filer's saveAsChunk (used for manifestize) plumbs the value to
assignNewFileInfo so manifest-chunk assigns get a real size.
Callers of saveFunc-as-value (weedfs_file_sync, dirty_pages_chunked)
pass the chunk size they're about to upload.
320 lines
8.1 KiB
Go
320 lines
8.1 KiB
Go
package operation
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"math/rand/v2"
|
|
"mime"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
)
|
|
|
|
type FilePart struct {
|
|
Reader io.Reader
|
|
FileName string
|
|
FileSize int64
|
|
MimeType string
|
|
ModTime int64 //in seconds
|
|
Pref StoragePreference
|
|
Server string //this comes from assign result
|
|
Fid string //this comes from assign result, but customizable
|
|
Fsync bool
|
|
}
|
|
|
|
type SubmitResult struct {
|
|
FileName string `json:"fileName,omitempty"`
|
|
FileUrl string `json:"url,omitempty"`
|
|
Fid string `json:"fid,omitempty"`
|
|
Size uint32 `json:"size,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
type StoragePreference struct {
|
|
Replication string
|
|
Collection string
|
|
DataCenter string
|
|
Ttl string
|
|
DiskType string
|
|
MaxMB int
|
|
}
|
|
|
|
type GetMasterFn func(ctx context.Context) pb.ServerAddress
|
|
|
|
func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []*FilePart, pref StoragePreference, usePublicUrl bool) ([]SubmitResult, error) {
|
|
results := make([]SubmitResult, len(files))
|
|
var totalBytes int64
|
|
for index, file := range files {
|
|
results[index].FileName = file.FileName
|
|
totalBytes += file.FileSize
|
|
}
|
|
var avgBytes uint64
|
|
if n := len(files); n > 0 {
|
|
avgBytes = uint64((totalBytes + int64(n) - 1) / int64(n))
|
|
}
|
|
ar := &VolumeAssignRequest{
|
|
Count: uint64(len(files)),
|
|
Replication: pref.Replication,
|
|
Collection: pref.Collection,
|
|
DataCenter: pref.DataCenter,
|
|
Ttl: pref.Ttl,
|
|
DiskType: pref.DiskType,
|
|
ExpectedDataSize: avgBytes,
|
|
}
|
|
ret, err := Assign(context.Background(), masterFn, grpcDialOption, ar)
|
|
if err != nil {
|
|
for index := range files {
|
|
results[index].Error = err.Error()
|
|
}
|
|
return results, err
|
|
}
|
|
for index, file := range files {
|
|
file.Fid = ret.Fid
|
|
if index > 0 {
|
|
file.Fid = file.Fid + "_" + strconv.Itoa(index)
|
|
}
|
|
file.Server = ret.Url
|
|
if usePublicUrl {
|
|
file.Server = ret.PublicUrl
|
|
}
|
|
file.Pref = pref
|
|
results[index].Size, err = file.Upload(pref.MaxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
|
|
if err != nil {
|
|
results[index].Error = err.Error()
|
|
}
|
|
results[index].Fid = file.Fid
|
|
results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
func NewFileParts(fullPathFilenames []string) (ret []*FilePart, err error) {
|
|
ret = make([]*FilePart, len(fullPathFilenames))
|
|
for index, file := range fullPathFilenames {
|
|
if ret[index], err = newFilePart(file); err != nil {
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
func newFilePart(fullPathFilename string) (ret *FilePart, err error) {
|
|
ret = &FilePart{}
|
|
fh, openErr := os.Open(fullPathFilename)
|
|
if openErr != nil {
|
|
glog.V(0).Info("Failed to open file: ", fullPathFilename)
|
|
return ret, openErr
|
|
}
|
|
ret.Reader = fh
|
|
|
|
fi, fiErr := fh.Stat()
|
|
if fiErr != nil {
|
|
glog.V(0).Info("Failed to stat file:", fullPathFilename)
|
|
return ret, fiErr
|
|
}
|
|
ret.ModTime = fi.ModTime().UTC().Unix()
|
|
ret.FileSize = fi.Size()
|
|
ext := strings.ToLower(path.Ext(fullPathFilename))
|
|
ret.FileName = fi.Name()
|
|
if ext != "" {
|
|
ret.MimeType = mime.TypeByExtension(ext)
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
|
|
fileUrl := "http://" + fi.Server + "/" + fi.Fid
|
|
if fi.ModTime != 0 {
|
|
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
|
|
}
|
|
if fi.Fsync {
|
|
fileUrl += "?fsync=true"
|
|
}
|
|
if closer, ok := fi.Reader.(io.Closer); ok {
|
|
defer closer.Close()
|
|
}
|
|
baseName := path.Base(fi.FileName)
|
|
if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
|
|
chunkSize := int64(maxMB * 1024 * 1024)
|
|
chunks := fi.FileSize/chunkSize + 1
|
|
cm := ChunkManifest{
|
|
Name: baseName,
|
|
Size: fi.FileSize,
|
|
Mime: fi.MimeType,
|
|
Chunks: make([]*ChunkInfo, 0, chunks),
|
|
}
|
|
|
|
var ret *AssignResult
|
|
var id string
|
|
if fi.Pref.DataCenter != "" {
|
|
ar := &VolumeAssignRequest{
|
|
Count: uint64(chunks),
|
|
Replication: fi.Pref.Replication,
|
|
Collection: fi.Pref.Collection,
|
|
Ttl: fi.Pref.Ttl,
|
|
DiskType: fi.Pref.DiskType,
|
|
ExpectedDataSize: uint64(chunkSize),
|
|
}
|
|
ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
for i := int64(0); i < chunks; i++ {
|
|
if fi.Pref.DataCenter == "" {
|
|
remaining := fi.FileSize - i*chunkSize
|
|
thisChunk := chunkSize
|
|
if remaining < thisChunk {
|
|
thisChunk = remaining
|
|
}
|
|
ar := &VolumeAssignRequest{
|
|
Count: 1,
|
|
Replication: fi.Pref.Replication,
|
|
Collection: fi.Pref.Collection,
|
|
Ttl: fi.Pref.Ttl,
|
|
DiskType: fi.Pref.DiskType,
|
|
ExpectedDataSize: uint64(thisChunk),
|
|
}
|
|
ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar)
|
|
if err != nil {
|
|
// delete all uploaded chunks
|
|
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
|
|
return
|
|
}
|
|
id = ret.Fid
|
|
} else {
|
|
id = ret.Fid
|
|
if i > 0 {
|
|
id += "_" + strconv.FormatInt(i, 10)
|
|
}
|
|
}
|
|
fileUrl := genFileUrl(ret, id, usePublicUrl)
|
|
count, e := uploadOneChunk(
|
|
baseName+"-"+strconv.FormatInt(i+1, 10),
|
|
io.LimitReader(fi.Reader, chunkSize),
|
|
masterFn, fileUrl,
|
|
ret.Auth)
|
|
if e != nil {
|
|
// delete all uploaded chunks
|
|
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
|
|
return 0, e
|
|
}
|
|
cm.Chunks = append(cm.Chunks,
|
|
&ChunkInfo{
|
|
Offset: i * chunkSize,
|
|
Size: int64(count),
|
|
Fid: id,
|
|
},
|
|
)
|
|
retSize += count
|
|
}
|
|
err = uploadChunkedFileManifest(fileUrl, &cm, jwt)
|
|
if err != nil {
|
|
// delete all uploaded chunks
|
|
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
|
|
}
|
|
} else {
|
|
uploadOption := &UploadOption{
|
|
UploadUrl: fileUrl,
|
|
Filename: baseName,
|
|
Cipher: false,
|
|
IsInputCompressed: false,
|
|
MimeType: fi.MimeType,
|
|
PairMap: nil,
|
|
Jwt: jwt,
|
|
}
|
|
|
|
uploader, e := NewUploader()
|
|
if e != nil {
|
|
return 0, e
|
|
}
|
|
|
|
ret, e, _ := uploader.Upload(context.Background(), fi.Reader, uploadOption)
|
|
if e != nil {
|
|
return 0, e
|
|
}
|
|
return ret.Size, e
|
|
}
|
|
return
|
|
}
|
|
|
|
func genFileUrl(ret *AssignResult, id string, usePublicUrl bool) string {
|
|
fileUrl := "http://" + ret.Url + "/" + id
|
|
if usePublicUrl {
|
|
fileUrl = "http://" + ret.PublicUrl + "/" + id
|
|
}
|
|
for _, replica := range ret.Replicas {
|
|
if rand.IntN(len(ret.Replicas)+1) == 0 {
|
|
fileUrl = "http://" + replica.Url + "/" + id
|
|
if usePublicUrl {
|
|
fileUrl = "http://" + replica.PublicUrl + "/" + id
|
|
}
|
|
}
|
|
}
|
|
return fileUrl
|
|
}
|
|
|
|
func uploadOneChunk(filename string, reader io.Reader, masterFn GetMasterFn,
|
|
fileUrl string, jwt security.EncodedJwt,
|
|
) (size uint32, e error) {
|
|
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
|
|
uploadOption := &UploadOption{
|
|
UploadUrl: fileUrl,
|
|
Filename: filename,
|
|
Cipher: false,
|
|
IsInputCompressed: false,
|
|
MimeType: "",
|
|
PairMap: nil,
|
|
Jwt: jwt,
|
|
}
|
|
|
|
uploader, uploaderError := NewUploader()
|
|
if uploaderError != nil {
|
|
return 0, uploaderError
|
|
}
|
|
|
|
uploadResult, uploadError, _ := uploader.Upload(context.Background(), reader, uploadOption)
|
|
if uploadError != nil {
|
|
return 0, uploadError
|
|
}
|
|
return uploadResult.Size, nil
|
|
}
|
|
|
|
func uploadChunkedFileManifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
|
|
buf, e := manifest.Marshal()
|
|
if e != nil {
|
|
return e
|
|
}
|
|
glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
|
|
u, _ := url.Parse(fileUrl)
|
|
q := u.Query()
|
|
q.Set("cm", "true")
|
|
u.RawQuery = q.Encode()
|
|
uploadOption := &UploadOption{
|
|
UploadUrl: u.String(),
|
|
Filename: manifest.Name,
|
|
Cipher: false,
|
|
IsInputCompressed: false,
|
|
MimeType: "application/json",
|
|
PairMap: nil,
|
|
Jwt: jwt,
|
|
}
|
|
|
|
uploader, e := NewUploader()
|
|
if e != nil {
|
|
return e
|
|
}
|
|
|
|
_, e = uploader.UploadData(context.Background(), buf, uploadOption)
|
|
return e
|
|
}
|