mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-22 17:51:30 +00:00
Guard collection deletion against duplicate volume IDs
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -2,12 +2,14 @@ package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/seaweedfs/raft"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/topology"
|
||||
)
|
||||
|
||||
func (ms *MasterServer) CollectionList(ctx context.Context, req *master_pb.CollectionListRequest) (*master_pb.CollectionListResponse, error) {
|
||||
@@ -35,6 +37,10 @@ func (ms *MasterServer) CollectionDelete(ctx context.Context, req *master_pb.Col
|
||||
|
||||
resp := &master_pb.CollectionDeleteResponse{}
|
||||
|
||||
if err := ms.ensureCollectionDeleteSafe(req.Name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := ms.doDeleteNormalCollection(req.Name)
|
||||
|
||||
if err != nil {
|
||||
@@ -50,6 +56,19 @@ func (ms *MasterServer) CollectionDelete(ctx context.Context, req *master_pb.Col
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (ms *MasterServer) ensureCollectionDeleteSafe(collectionName string) error {
|
||||
duplicates := ms.Topo.FindDuplicateVolumeIds(collectionName)
|
||||
if len(duplicates) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf(
|
||||
"refusing to delete collection %q: duplicate volume IDs exist across collections (%s). Run `volume.check.duplicates` to inspect and resolve them first",
|
||||
collectionName,
|
||||
topology.FormatDuplicateVolumeIds(duplicates),
|
||||
)
|
||||
}
|
||||
|
||||
func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error {
|
||||
|
||||
collection, ok := ms.Topo.FindCollection(collectionName)
|
||||
|
||||
@@ -4,6 +4,11 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
"github.com/seaweedfs/seaweedfs/weed/sequence"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
"github.com/seaweedfs/seaweedfs/weed/topology"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -35,3 +40,41 @@ func TestInitialLockRingUpdateSkipsNonFilers(t *testing.T) {
|
||||
|
||||
assert.Nil(t, ms.initialLockRingUpdate(cluster.BrokerType, "group-a"))
|
||||
}
|
||||
|
||||
func TestEnsureCollectionDeleteSafeRejectsDuplicateVolumeIds(t *testing.T) {
|
||||
topo := topology.NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
||||
dc := topo.GetOrCreateDataCenter("dc1")
|
||||
rack := dc.GetOrCreateRack("rack1")
|
||||
maxVolumeCounts := map[string]uint32{"": 25}
|
||||
|
||||
dn1 := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
|
||||
dn2 := rack.GetOrCreateDataNode("127.0.0.2", 34535, 0, "127.0.0.2", "", maxVolumeCounts)
|
||||
|
||||
replicaPlacement := &super_block.ReplicaPlacement{}
|
||||
collectionA := storage.VolumeInfo{
|
||||
Id: needle.VolumeId(100),
|
||||
Collection: "collection-a",
|
||||
Version: needle.GetCurrentVersion(),
|
||||
ReplicaPlacement: replicaPlacement,
|
||||
Ttl: needle.EMPTY_TTL,
|
||||
}
|
||||
collectionB := storage.VolumeInfo{
|
||||
Id: needle.VolumeId(100),
|
||||
Collection: "collection-b",
|
||||
Version: needle.GetCurrentVersion(),
|
||||
ReplicaPlacement: replicaPlacement,
|
||||
Ttl: needle.EMPTY_TTL,
|
||||
}
|
||||
|
||||
dn1.UpdateVolumes([]storage.VolumeInfo{collectionA})
|
||||
dn2.UpdateVolumes([]storage.VolumeInfo{collectionB})
|
||||
topo.RegisterVolumeLayout(collectionA, dn1)
|
||||
topo.RegisterVolumeLayout(collectionB, dn2)
|
||||
|
||||
ms := &MasterServer{Topo: topo}
|
||||
|
||||
err := ms.ensureCollectionDeleteSafe("collection-a")
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "refusing to delete collection")
|
||||
assert.Contains(t, err.Error(), "100:[collection-a, collection-b]")
|
||||
}
|
||||
|
||||
@@ -30,6 +30,10 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
|
||||
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName))
|
||||
return
|
||||
}
|
||||
if err := ms.ensureCollectionDeleteSafe(collectionName); err != nil {
|
||||
writeJsonError(w, r, http.StatusConflict, err)
|
||||
return
|
||||
}
|
||||
for _, server := range collection.ListVolumeServers() {
|
||||
err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -366,6 +367,74 @@ func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool)
|
||||
return ret
|
||||
}
|
||||
|
||||
func (t *Topology) FindDuplicateVolumeIds(collectionName string) map[needle.VolumeId][]string {
|
||||
duplicates := make(map[needle.VolumeId][]string)
|
||||
collectionsByVid := make(map[needle.VolumeId]map[string]struct{})
|
||||
|
||||
t.collectionMap.RLock()
|
||||
defer t.collectionMap.RUnlock()
|
||||
|
||||
for _, item := range t.collectionMap.Items() {
|
||||
collection := item.(*Collection)
|
||||
for _, layout := range collection.GetAllVolumeLayouts() {
|
||||
layout.accessLock.RLock()
|
||||
for vid := range layout.vid2location {
|
||||
if collectionsByVid[vid] == nil {
|
||||
collectionsByVid[vid] = make(map[string]struct{})
|
||||
}
|
||||
collectionsByVid[vid][collection.Name] = struct{}{}
|
||||
}
|
||||
layout.accessLock.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
for vid, collectionSet := range collectionsByVid {
|
||||
if len(collectionSet) < 2 {
|
||||
continue
|
||||
}
|
||||
if collectionName != "" {
|
||||
if _, found := collectionSet[collectionName]; !found {
|
||||
continue
|
||||
}
|
||||
}
|
||||
duplicateCollections := make([]string, 0, len(collectionSet))
|
||||
for name := range collectionSet {
|
||||
duplicateCollections = append(duplicateCollections, name)
|
||||
}
|
||||
slices.Sort(duplicateCollections)
|
||||
duplicates[vid] = duplicateCollections
|
||||
}
|
||||
|
||||
return duplicates
|
||||
}
|
||||
|
||||
func FormatDuplicateVolumeIds(duplicates map[needle.VolumeId][]string) string {
|
||||
if len(duplicates) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
vids := make([]needle.VolumeId, 0, len(duplicates))
|
||||
for vid := range duplicates {
|
||||
vids = append(vids, vid)
|
||||
}
|
||||
slices.Sort(vids)
|
||||
|
||||
lines := make([]string, 0, len(vids))
|
||||
for _, vid := range vids {
|
||||
displayCollections := make([]string, 0, len(duplicates[vid]))
|
||||
for _, name := range duplicates[vid] {
|
||||
if name == "" {
|
||||
displayCollections = append(displayCollections, "(default)")
|
||||
} else {
|
||||
displayCollections = append(displayCollections, name)
|
||||
}
|
||||
}
|
||||
lines = append(lines, fmt.Sprintf("%d:[%s]", vid, strings.Join(displayCollections, ", ")))
|
||||
}
|
||||
|
||||
return strings.Join(lines, "; ")
|
||||
}
|
||||
|
||||
func (t *Topology) FindCollection(collectionName string) (*Collection, bool) {
|
||||
c, hasCollection := t.collectionMap.Find(collectionName)
|
||||
if !hasCollection {
|
||||
|
||||
@@ -14,6 +14,75 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestFindDuplicateVolumeIds(t *testing.T) {
|
||||
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
||||
|
||||
dc := topo.GetOrCreateDataCenter("dc1")
|
||||
rack := dc.GetOrCreateRack("rack1")
|
||||
maxVolumeCounts := map[string]uint32{"": 25}
|
||||
|
||||
dn1 := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
|
||||
dn2 := rack.GetOrCreateDataNode("127.0.0.2", 34535, 0, "127.0.0.2", "", maxVolumeCounts)
|
||||
|
||||
replicaPlacement := &super_block.ReplicaPlacement{}
|
||||
|
||||
collectionA := storage.VolumeInfo{
|
||||
Id: needle.VolumeId(100),
|
||||
Collection: "collection-a",
|
||||
DiskType: "",
|
||||
ReadOnly: false,
|
||||
Version: needle.GetCurrentVersion(),
|
||||
ReplicaPlacement: replicaPlacement,
|
||||
Ttl: needle.EMPTY_TTL,
|
||||
}
|
||||
collectionB := storage.VolumeInfo{
|
||||
Id: needle.VolumeId(100),
|
||||
Collection: "collection-b",
|
||||
DiskType: "",
|
||||
ReadOnly: false,
|
||||
Version: needle.GetCurrentVersion(),
|
||||
ReplicaPlacement: replicaPlacement,
|
||||
Ttl: needle.EMPTY_TTL,
|
||||
}
|
||||
unique := storage.VolumeInfo{
|
||||
Id: needle.VolumeId(200),
|
||||
Collection: "collection-a",
|
||||
DiskType: "",
|
||||
ReadOnly: false,
|
||||
Version: needle.GetCurrentVersion(),
|
||||
ReplicaPlacement: replicaPlacement,
|
||||
Ttl: needle.EMPTY_TTL,
|
||||
}
|
||||
|
||||
dn1.UpdateVolumes([]storage.VolumeInfo{collectionA, unique})
|
||||
dn2.UpdateVolumes([]storage.VolumeInfo{collectionB})
|
||||
topo.RegisterVolumeLayout(collectionA, dn1)
|
||||
topo.RegisterVolumeLayout(unique, dn1)
|
||||
topo.RegisterVolumeLayout(collectionB, dn2)
|
||||
|
||||
duplicates := topo.FindDuplicateVolumeIds("")
|
||||
got, ok := duplicates[needle.VolumeId(100)]
|
||||
if !ok {
|
||||
t.Fatalf("expected duplicate volume 100, got %+v", duplicates)
|
||||
}
|
||||
expected := []string{"collection-a", "collection-b"}
|
||||
if !reflect.DeepEqual(got, expected) {
|
||||
t.Fatalf("unexpected duplicate collections: got=%v want=%v", got, expected)
|
||||
}
|
||||
if _, exists := duplicates[needle.VolumeId(200)]; exists {
|
||||
t.Fatalf("did not expect unique volume 200 in duplicates: %+v", duplicates)
|
||||
}
|
||||
|
||||
scoped := topo.FindDuplicateVolumeIds("collection-a")
|
||||
if _, ok := scoped[needle.VolumeId(100)]; !ok {
|
||||
t.Fatalf("expected scoped duplicate for collection-a, got %+v", scoped)
|
||||
}
|
||||
scopedMissing := topo.FindDuplicateVolumeIds("collection-c")
|
||||
if len(scopedMissing) != 0 {
|
||||
t.Fatalf("expected no duplicates for unrelated collection, got %+v", scopedMissing)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveDataCenter(t *testing.T) {
|
||||
topo := setup(topologyLayout)
|
||||
topo.UnlinkChildNode(NodeId("dc2"))
|
||||
|
||||
Reference in New Issue
Block a user