mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-23 18:21:28 +00:00
use batch key
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -39,21 +40,19 @@ type MasterClient struct {
|
||||
OnPeerUpdateLock sync.RWMutex
|
||||
|
||||
// Per-volume-ID in-flight tracking to prevent duplicate lookups
|
||||
vidLookupLock sync.Mutex
|
||||
vidLookupInFlight map[string]*singleflight.Group // volumeId -> singleflight group
|
||||
vidLookupGroup singleflight.Group
|
||||
}
|
||||
|
||||
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
|
||||
return &MasterClient{
|
||||
FilerGroup: filerGroup,
|
||||
clientType: clientType,
|
||||
clientHost: clientHost,
|
||||
rack: rack,
|
||||
masters: masters,
|
||||
grpcDialOption: grpcDialOption,
|
||||
vidMap: newVidMap(clientDataCenter),
|
||||
vidMapCacheSize: 5,
|
||||
vidLookupInFlight: make(map[string]*singleflight.Group),
|
||||
FilerGroup: filerGroup,
|
||||
clientType: clientType,
|
||||
clientHost: clientHost,
|
||||
rack: rack,
|
||||
masters: masters,
|
||||
grpcDialOption: grpcDialOption,
|
||||
vidMap: newVidMap(clientDataCenter),
|
||||
vidMapCacheSize: 5,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,10 +103,11 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str
|
||||
}
|
||||
|
||||
// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
|
||||
// Uses per-volume-ID singleflight to prevent duplicate lookups of the same volume
|
||||
// Uses per-volume-ID singleflight to prevent duplicate lookups, with batched master queries
|
||||
func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
|
||||
result := make(map[string][]Location)
|
||||
var needsLookup []string
|
||||
var lookupErrors []error
|
||||
|
||||
// Check cache first and separate volumes that need lookup
|
||||
for _, vidString := range volumeIds {
|
||||
@@ -127,86 +127,103 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// For each volume that needs lookup, use per-volume singleflight
|
||||
// to prevent duplicate master queries for the same volume ID
|
||||
for _, vidString := range needsLookup {
|
||||
// Get or create singleflight group for this volume ID
|
||||
mc.vidLookupLock.Lock()
|
||||
group, exists := mc.vidLookupInFlight[vidString]
|
||||
if !exists {
|
||||
group = &singleflight.Group{}
|
||||
mc.vidLookupInFlight[vidString] = group
|
||||
}
|
||||
mc.vidLookupLock.Unlock()
|
||||
// Batch query all missing volumes using singleflight on the batch key
|
||||
// Sort for stable key to coalesce identical batches
|
||||
sort.Strings(needsLookup)
|
||||
batchKey := strings.Join(needsLookup, ",")
|
||||
|
||||
// Use singleflight to ensure only one lookup per volume ID
|
||||
sfResult, err, _ := group.Do(vidString, func() (interface{}, error) {
|
||||
// Double-check cache in case it was populated while we were waiting
|
||||
sfResult, err, _ := mc.vidLookupGroup.Do(batchKey, func() (interface{}, error) {
|
||||
// Double-check cache for volumes that might have been populated while waiting
|
||||
stillNeedLookup := make([]string, 0, len(needsLookup))
|
||||
batchResult := make(map[string][]Location)
|
||||
|
||||
for _, vidString := range needsLookup {
|
||||
vid, _ := strconv.ParseUint(vidString, 10, 32)
|
||||
if locations, found := mc.GetLocations(uint32(vid)); found && len(locations) > 0 {
|
||||
return locations, nil
|
||||
batchResult[vidString] = locations
|
||||
} else {
|
||||
stillNeedLookup = append(stillNeedLookup, vidString)
|
||||
}
|
||||
}
|
||||
|
||||
// Query master for this volume
|
||||
glog.V(2).Infof("Looking up volume %s from master", vidString)
|
||||
var locations []Location
|
||||
if len(stillNeedLookup) == 0 {
|
||||
return batchResult, nil
|
||||
}
|
||||
|
||||
err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
|
||||
resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
|
||||
VolumeOrFileIds: []string{vidString},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("master lookup failed: %v", err)
|
||||
}
|
||||
// Query master with batched volume IDs
|
||||
glog.V(2).Infof("Looking up %d volumes from master: %v", len(stillNeedLookup), stillNeedLookup)
|
||||
|
||||
for _, vidLoc := range resp.VolumeIdLocations {
|
||||
if vidLoc.Error != "" {
|
||||
return fmt.Errorf("volume %s lookup error: %s", vidString, vidLoc.Error)
|
||||
}
|
||||
|
||||
// Parse volume ID from response
|
||||
parts := strings.Split(vidLoc.VolumeOrFileId, ",")
|
||||
vidOnly := parts[0]
|
||||
vid, err := strconv.ParseUint(vidOnly, 10, 32)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse volume id '%s': %v", vidOnly, err)
|
||||
}
|
||||
|
||||
for _, masterLoc := range vidLoc.Locations {
|
||||
loc := Location{
|
||||
Url: masterLoc.Url,
|
||||
PublicUrl: masterLoc.PublicUrl,
|
||||
GrpcPort: int(masterLoc.GrpcPort),
|
||||
DataCenter: masterLoc.DataCenter,
|
||||
}
|
||||
mc.vidMap.addLocation(uint32(vid), loc)
|
||||
locations = append(locations, loc)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
|
||||
resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
|
||||
VolumeOrFileIds: stillNeedLookup,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return fmt.Errorf("master lookup failed: %v", err)
|
||||
}
|
||||
return locations, nil
|
||||
|
||||
for _, vidLoc := range resp.VolumeIdLocations {
|
||||
if vidLoc.Error != "" {
|
||||
glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse volume ID from response
|
||||
parts := strings.Split(vidLoc.VolumeOrFileId, ",")
|
||||
vidOnly := parts[0]
|
||||
vid, err := strconv.ParseUint(vidOnly, 10, 32)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
|
||||
continue
|
||||
}
|
||||
|
||||
var locations []Location
|
||||
for _, masterLoc := range vidLoc.Locations {
|
||||
loc := Location{
|
||||
Url: masterLoc.Url,
|
||||
PublicUrl: masterLoc.PublicUrl,
|
||||
GrpcPort: int(masterLoc.GrpcPort),
|
||||
DataCenter: masterLoc.DataCenter,
|
||||
}
|
||||
mc.vidMap.addLocation(uint32(vid), loc)
|
||||
locations = append(locations, loc)
|
||||
}
|
||||
|
||||
if len(locations) > 0 {
|
||||
batchResult[vidOnly] = locations
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Clean up the singleflight group for this volume
|
||||
mc.vidLookupLock.Lock()
|
||||
delete(mc.vidLookupInFlight, vidString)
|
||||
mc.vidLookupLock.Unlock()
|
||||
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to lookup volume %s: %v", vidString, err)
|
||||
continue // Continue with other volumes
|
||||
return batchResult, err
|
||||
}
|
||||
return batchResult, nil
|
||||
})
|
||||
|
||||
if locations, ok := sfResult.([]Location); ok && len(locations) > 0 {
|
||||
result[vidString] = locations
|
||||
if err != nil {
|
||||
lookupErrors = append(lookupErrors, err)
|
||||
}
|
||||
|
||||
// Merge singleflight batch results
|
||||
if batchLocations, ok := sfResult.(map[string][]Location); ok {
|
||||
for vid, locs := range batchLocations {
|
||||
result[vid] = locs
|
||||
}
|
||||
}
|
||||
|
||||
// Check for volumes that still weren't found
|
||||
for _, vidString := range needsLookup {
|
||||
if _, found := result[vidString]; !found {
|
||||
lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString))
|
||||
}
|
||||
}
|
||||
|
||||
// Return aggregated errors
|
||||
if len(lookupErrors) > 0 {
|
||||
return result, fmt.Errorf("lookup errors: %v", lookupErrors)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user