mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-24 02:31:28 +00:00
* fix(master): include GrpcPort in LookupEcVolume response LookupVolume already passes loc.GrpcPort through to the client; LookupEcVolume builds Location with only Url / PublicUrl / DataCenter, so callers fall back to ServerToGrpcAddress (httpPort + 10000). On any deployment where that convention does not hold — multi-disk integration tests, custom port layouts — EC reads dial the wrong port and quietly degrade to parity recovery. * fix(volume/ec): probe every DiskLocation when serving local shard reads reconcileEcShardsAcrossDisks (issue 9212) registers each .ec?? against the DiskLocation that physically owns it, so a multi-disk volume server can hold shards for the same vid in two separate ecVolumes — one per disk — with .ecx on whichever disk owned the original .dat. The read path only consulted the single EcVolume FindEcVolume picked, so requests for shards on the sibling disk fell through to errShardNotLocal and then to remote/loopback recovery. Walk all DiskLocations after the first probe in both readLocalEcShardInterval and the VolumeEcShardRead gRPC handler; the latter also covers the loopback that recoverOneRemoteEcShardInterval falls back to when a peer dial fails. * test(volume/ec): cover the multi-disk EC lifecycle end-to-end Two integration tests against a real volume server with two data dirs: TestEcLifecycleAcrossMultipleDisks drives encode -> mount -> HTTP read -> drop .dat -> stop -> redistribute shards across disks -> restart -> verify reconcileEcShardsAcrossDisks attached the orphan shards and reads still work -> blob delete -> stop -> drop a shard -> restart -> VolumeEcShardsRebuild pulls input from both disks -> reads still work. TestEcPartialShardsOnSiblingDiskCleanedUpOnRestart is the issue 9478 reproducer at the cluster level: seed a healthy .dat on disk 0, plant the on-disk footprint of an interrupted EC encode on disk 1, restart, and assert pruneIncompleteEcWithSiblingDat wipes disk 1 without touching disk 0. Framework gets RestartVolumeServer / StopVolumeServer helpers; the previous run's volume.log is rotated to volume.log.previous so a startup regression on the second run does not lose the first run's diagnostics. * review: trim verbose comments * review: drop racy fast-path, use locked findEcShard directly gemini-code-assist flagged the two-step lookup in readLocalEcShardInterval and VolumeEcShardRead: the first probe (ecVolume.FindEcVolumeShard) reads the EcVolume's Shards slice without holding ecVolumesLock, so a concurrent mount / unmount could race with it. findEcShard already walks every DiskLocation under the right lock, so the fast-path adds nothing but the race. Collapse both call sites to a single locked call. Also note in RestartVolumeServer why the log-rotation error is swallowed: absence on first call is benign; anything else surfaces in the next os.Create in startVolume.
403 lines
12 KiB
Go
403 lines
12 KiB
Go
package weed_server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"math/rand/v2"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/topology"
|
|
|
|
"github.com/seaweedfs/raft"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
const (
|
|
volumeGrowStepCount = 2
|
|
)
|
|
|
|
func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) {
|
|
if ms.option.VolumeGrowthDisabled {
|
|
glog.V(1).Infof("automatic volume grow disabled")
|
|
return
|
|
}
|
|
glog.V(1).Infoln("starting automatic volume grow")
|
|
start := time.Now()
|
|
newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
|
|
glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start))
|
|
if err != nil {
|
|
glog.V(1).Infof("automatic volume grow failed: %+v", err)
|
|
return
|
|
}
|
|
for _, newVidLocation := range newVidLocations {
|
|
ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation})
|
|
}
|
|
}
|
|
|
|
func (ms *MasterServer) ProcessGrowRequest() {
|
|
go func() {
|
|
ctx := context.Background()
|
|
firstRun := true
|
|
for {
|
|
if firstRun {
|
|
firstRun = false
|
|
} else {
|
|
time.Sleep(5*time.Minute + time.Duration(30*rand.Float32())*time.Second)
|
|
}
|
|
if !ms.Topo.IsLeader() {
|
|
continue
|
|
}
|
|
dcs := ms.Topo.ListDCAndRacks()
|
|
for _, vlc := range ms.Topo.ListVolumeLayoutCollections() {
|
|
var err error
|
|
vl := vlc.VolumeLayout
|
|
lastGrowCount := vl.GetLastGrowCount()
|
|
if vl.HasGrowRequest() {
|
|
continue
|
|
}
|
|
writable, crowded := vl.GetWritableVolumeCount()
|
|
mustGrow := int(lastGrowCount) - writable
|
|
vgr := vlc.ToVolumeGrowRequest()
|
|
stats.MasterVolumeLayoutWritable.WithLabelValues(vlc.Collection, vgr.DiskType, vgr.Replication, vgr.Ttl).Set(float64(writable))
|
|
stats.MasterVolumeLayoutCrowded.WithLabelValues(vlc.Collection, vgr.DiskType, vgr.Replication, vgr.Ttl).Set(float64(crowded))
|
|
|
|
switch {
|
|
case mustGrow > 0:
|
|
if rp, rpErr := super_block.NewReplicaPlacementFromString(vgr.Replication); rpErr != nil {
|
|
glog.V(0).Infof("failed to parse replica placement %s: %v", vgr.Replication, rpErr)
|
|
} else {
|
|
vgr.WritableVolumeCount = uint32(mustGrow)
|
|
if ms.Topo.AvailableSpaceFor(&topology.VolumeGrowOption{DiskType: types.ToDiskType(vgr.DiskType)}) >= int64(vgr.WritableVolumeCount*uint32(rp.GetCopyCount())) {
|
|
_, err = ms.VolumeGrow(ctx, vgr)
|
|
}
|
|
}
|
|
case lastGrowCount > 0 && writable < int(lastGrowCount*2) && float64(crowded+volumeGrowStepCount) > float64(writable)*topology.VolumeGrowStrategy.Threshold:
|
|
vgr.WritableVolumeCount = volumeGrowStepCount
|
|
_, err = ms.VolumeGrow(ctx, vgr)
|
|
}
|
|
if err != nil {
|
|
glog.V(0).Infof("volume grow request failed: %+v", err)
|
|
}
|
|
writableVolumes := vl.CloneWritableVolumes()
|
|
for dcId, racks := range dcs {
|
|
for _, rackId := range racks {
|
|
if vl.ShouldGrowVolumesByDcAndRack(&writableVolumes, dcId, rackId) {
|
|
vgr.DataCenter = string(dcId)
|
|
vgr.Rack = string(rackId)
|
|
if lastGrowCount > 0 {
|
|
vgr.WritableVolumeCount = uint32(math.Ceil(float64(lastGrowCount) / float64(len(dcs)*len(racks))))
|
|
} else {
|
|
vgr.WritableVolumeCount = volumeGrowStepCount
|
|
}
|
|
|
|
if _, err = ms.VolumeGrow(ctx, vgr); err != nil {
|
|
glog.V(0).Infof("volume grow request for dc:%s rack:%s failed: %+v", dcId, rackId, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
go func() {
|
|
filter := sync.Map{}
|
|
for {
|
|
req, ok := <-ms.volumeGrowthRequestChan
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
option := req.Option
|
|
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
|
|
|
|
if !ms.Topo.IsLeader() {
|
|
//discard buffered requests
|
|
time.Sleep(time.Second * 1)
|
|
vl.DoneGrowRequest()
|
|
continue
|
|
}
|
|
|
|
// filter out identical requests being processed
|
|
found := false
|
|
filter.Range(func(k, v interface{}) bool {
|
|
existingReq := k.(*topology.VolumeGrowRequest)
|
|
if existingReq.Equals(req) {
|
|
found = true
|
|
}
|
|
return !found
|
|
})
|
|
|
|
// not atomic but it's okay
|
|
if found || (!req.Force && !vl.ShouldGrowVolumes()) {
|
|
glog.V(4).Infoln("discard volume grow request")
|
|
time.Sleep(time.Millisecond * 211)
|
|
vl.DoneGrowRequest()
|
|
continue
|
|
}
|
|
|
|
filter.Store(req, nil)
|
|
// we have lock called inside vg
|
|
glog.V(0).Infof("volume grow %+v", req)
|
|
go func(req *topology.VolumeGrowRequest, vl *topology.VolumeLayout) {
|
|
ms.DoAutomaticVolumeGrow(req)
|
|
vl.DoneGrowRequest()
|
|
filter.Delete(req)
|
|
}(req, vl)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupVolumeRequest) (*master_pb.LookupVolumeResponse, error) {
|
|
|
|
resp := &master_pb.LookupVolumeResponse{}
|
|
volumeLocations := ms.lookupVolumeId(req.VolumeOrFileIds, req.Collection)
|
|
|
|
notFoundCount := 0
|
|
for _, volumeOrFileId := range req.VolumeOrFileIds {
|
|
vid := volumeOrFileId
|
|
commaSep := strings.Index(vid, ",")
|
|
if commaSep > 0 {
|
|
vid = vid[0:commaSep]
|
|
}
|
|
if result, found := volumeLocations[vid]; found {
|
|
var locations []*master_pb.Location
|
|
for _, loc := range result.Locations {
|
|
locations = append(locations, &master_pb.Location{
|
|
Url: loc.Url,
|
|
PublicUrl: loc.PublicUrl,
|
|
DataCenter: loc.DataCenter,
|
|
GrpcPort: uint32(loc.GrpcPort),
|
|
})
|
|
}
|
|
var auth string
|
|
if commaSep > 0 { // this is a file id
|
|
auth = string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, result.VolumeOrFileId))
|
|
}
|
|
if result.NotFound {
|
|
notFoundCount++
|
|
}
|
|
resp.VolumeIdLocations = append(resp.VolumeIdLocations, &master_pb.LookupVolumeResponse_VolumeIdLocation{
|
|
VolumeOrFileId: result.VolumeOrFileId,
|
|
Locations: locations,
|
|
Error: result.Error,
|
|
Auth: auth,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Only return Unavailable during warmup when every requested ID was a transient not-found
|
|
if len(req.VolumeOrFileIds) > 0 && notFoundCount == len(req.VolumeOrFileIds) && ms.Topo.IsLeader() && ms.Topo.IsWarmingUp() {
|
|
glog.V(0).Infof("lookup volume warming up: topology is still loading (%d not found)", notFoundCount)
|
|
return nil, status.Errorf(codes.Unavailable, "master is warming up, topology is still loading")
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.StatisticsRequest) (*master_pb.StatisticsResponse, error) {
|
|
|
|
if !ms.Topo.IsLeader() {
|
|
return nil, raft.NotLeaderError
|
|
}
|
|
|
|
if req.Replication == "" {
|
|
req.Replication = ms.option.DefaultReplicaPlacement
|
|
}
|
|
replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ttl, err := needle.ReadTTL(req.Ttl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
|
|
stats := volumeLayout.Stats()
|
|
totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
|
|
resp := &master_pb.StatisticsResponse{
|
|
TotalSize: uint64(totalSize),
|
|
UsedSize: stats.UsedSize,
|
|
FileCount: stats.FileCount,
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (ms *MasterServer) VolumeList(ctx context.Context, req *master_pb.VolumeListRequest) (*master_pb.VolumeListResponse, error) {
|
|
|
|
if !ms.Topo.IsLeader() {
|
|
return nil, raft.NotLeaderError
|
|
}
|
|
|
|
resp := &master_pb.VolumeListResponse{
|
|
TopologyInfo: ms.Topo.ToTopologyInfo(),
|
|
VolumeSizeLimitMb: uint64(ms.option.VolumeSizeLimitMB),
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.LookupEcVolumeRequest) (*master_pb.LookupEcVolumeResponse, error) {
|
|
|
|
if !ms.Topo.IsLeader() {
|
|
return nil, raft.NotLeaderError
|
|
}
|
|
|
|
resp := &master_pb.LookupEcVolumeResponse{}
|
|
|
|
ecLocations, found := ms.Topo.LookupEcShards(needle.VolumeId(req.VolumeId))
|
|
|
|
if !found {
|
|
return resp, fmt.Errorf("ec volume %d not found", req.VolumeId)
|
|
}
|
|
|
|
resp.VolumeId = req.VolumeId
|
|
|
|
for shardId, shardLocations := range ecLocations.Locations {
|
|
var locations []*master_pb.Location
|
|
for _, dn := range shardLocations {
|
|
locations = append(locations, &master_pb.Location{
|
|
Url: dn.Url(),
|
|
PublicUrl: dn.PublicUrl,
|
|
DataCenter: dn.GetDataCenterId(),
|
|
// without this, clients derive grpc as httpPort+10000
|
|
GrpcPort: uint32(dn.GrpcPort),
|
|
})
|
|
}
|
|
resp.ShardIdLocations = append(resp.ShardIdLocations, &master_pb.LookupEcVolumeResponse_EcShardIdLocation{
|
|
ShardId: uint32(shardId),
|
|
Locations: locations,
|
|
})
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumVolumeRequest) (*master_pb.VacuumVolumeResponse, error) {
|
|
|
|
if !ms.Topo.IsLeader() {
|
|
return nil, raft.NotLeaderError
|
|
}
|
|
|
|
resp := &master_pb.VacuumVolumeResponse{}
|
|
|
|
ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.option.MaxParallelVacuumPerServer, req.VolumeId, req.Collection, ms.preallocateSize, false)
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (ms *MasterServer) DisableVacuum(ctx context.Context, req *master_pb.DisableVacuumRequest) (*master_pb.DisableVacuumResponse, error) {
|
|
// The caller explicitly indicates whether this disable request comes
|
|
// from the vacuum plugin monitor. Track ownership so the safety net
|
|
// in the vacuum loop won't override an operator's intentional disable.
|
|
if req.GetByPlugin() {
|
|
ms.Topo.DisableVacuumByPlugin()
|
|
} else {
|
|
ms.Topo.DisableVacuum()
|
|
}
|
|
resp := &master_pb.DisableVacuumResponse{}
|
|
return resp, nil
|
|
}
|
|
|
|
func (ms *MasterServer) EnableVacuum(ctx context.Context, req *master_pb.EnableVacuumRequest) (*master_pb.EnableVacuumResponse, error) {
|
|
if req.GetByPlugin() {
|
|
ms.Topo.EnableVacuumByPlugin()
|
|
} else {
|
|
ms.Topo.EnableVacuum()
|
|
}
|
|
resp := &master_pb.EnableVacuumResponse{}
|
|
return resp, nil
|
|
}
|
|
|
|
func (ms *MasterServer) VolumeMarkReadonly(ctx context.Context, req *master_pb.VolumeMarkReadonlyRequest) (*master_pb.VolumeMarkReadonlyResponse, error) {
|
|
|
|
if !ms.Topo.IsLeader() {
|
|
return nil, raft.NotLeaderError
|
|
}
|
|
|
|
resp := &master_pb.VolumeMarkReadonlyResponse{}
|
|
|
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(req.ReplicaPlacement))
|
|
vl := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, needle.LoadTTLFromUint32(req.Ttl), types.ToDiskType(req.DiskType))
|
|
dataNodes := ms.Topo.Lookup(req.Collection, needle.VolumeId(req.VolumeId))
|
|
|
|
for _, dn := range dataNodes {
|
|
if dn.Ip == req.Ip && dn.Port == int(req.Port) {
|
|
if req.IsReadonly {
|
|
vid := needle.VolumeId(req.VolumeId)
|
|
vl.SetVolumeReadOnly(dn, vid)
|
|
if pending := vl.GetPendingSize(vid); pending > 0 {
|
|
glog.V(0).Infof("volume %d marked readonly with %d pending bytes", vid, pending)
|
|
}
|
|
} else {
|
|
vl.SetVolumeWritable(dn, needle.VolumeId(req.VolumeId))
|
|
}
|
|
}
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (ms *MasterServer) VolumeGrow(ctx context.Context, req *master_pb.VolumeGrowRequest) (*master_pb.VolumeGrowResponse, error) {
|
|
if !ms.Topo.IsLeader() {
|
|
return nil, raft.NotLeaderError
|
|
}
|
|
if req.Replication == "" {
|
|
req.Replication = ms.option.DefaultReplicaPlacement
|
|
}
|
|
replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ttl, err := needle.ReadTTL(req.Ttl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if req.DataCenter != "" && !ms.Topo.DataCenterExists(req.DataCenter) {
|
|
return nil, fmt.Errorf("data center not exists")
|
|
}
|
|
|
|
ver := needle.GetCurrentVersion()
|
|
volumeGrowOption := topology.VolumeGrowOption{
|
|
Collection: req.Collection,
|
|
ReplicaPlacement: replicaPlacement,
|
|
Ttl: ttl,
|
|
DiskType: types.ToDiskType(req.DiskType),
|
|
Preallocate: ms.preallocateSize,
|
|
DataCenter: req.DataCenter,
|
|
Rack: req.Rack,
|
|
DataNode: req.DataNode,
|
|
MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
|
|
Version: uint32(ver),
|
|
}
|
|
volumeGrowRequest := topology.VolumeGrowRequest{
|
|
Option: &volumeGrowOption,
|
|
Count: req.WritableVolumeCount,
|
|
Force: true,
|
|
Reason: "grpc volume grow",
|
|
}
|
|
replicaCount := int64(req.WritableVolumeCount * uint32(replicaPlacement.GetCopyCount()))
|
|
|
|
if ms.Topo.AvailableSpaceFor(&volumeGrowOption) < replicaCount {
|
|
return nil, fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(&volumeGrowOption), replicaCount)
|
|
}
|
|
|
|
ms.DoAutomaticVolumeGrow(&volumeGrowRequest)
|
|
|
|
return &master_pb.VolumeGrowResponse{}, nil
|
|
}
|