mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-23 18:21:28 +00:00
fix(redis3): prevent filer crash from inconsistent skiplist ends (#9602)
* fix(redis3): prevent filer crash from inconsistent skiplist ends DeleteByKey updated the two ends asymmetrically: the start side decided whether to clear StartLevels[index] by comparing a cached reference key, while the end side cleared EndLevels[index] structurally. The redis3 ItemList re-keys a node while keeping its id, so that cached key drifts. When such a node was the only one at a level and got deleted, the stale key comparison left StartLevels dangling while EndLevels was cleared to nil. The next InsertByKey then dereferenced a nil EndLevels[0] and took down the whole filer during a rename or delete. Match the deleted node by its unique id so both ends stay consistent, and guard each end in InsertByKey so an already-corrupted skiplist persisted in Redis self-heals instead of crashing on load. * fix(redis3): propagate errors from WriteName node split The case 2.3 split path returned nil instead of the error in seven branches. Because the split runs a multi-step sequence (DeleteByKey on the skiplist, ItemAdd, redis range-delete, ItemAdd), a swallowed failure let WriteName report success while the skiplist was half-updated, which the caller then persisted - silently corrupting the directory listing and setting up the very inconsistent-ends state that crashes the filer.
This commit is contained in:
@@ -127,7 +127,7 @@ func (nl *ItemList) WriteName(name string) error {
|
||||
// collect names before name, add them to X
|
||||
namesToX, err := nl.NodeRangeBeforeExclusive(prevNodeReference, name)
|
||||
if err != nil {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
// delete skiplist reference to old node
|
||||
if _, err := nl.skipList.DeleteByKey(prevNodeReference.Key); err != nil {
|
||||
@@ -136,32 +136,32 @@ func (nl *ItemList) WriteName(name string) error {
|
||||
// add namesToY and name to a new X
|
||||
namesToX = append(namesToX, name)
|
||||
if err := nl.ItemAdd([]byte(namesToX[0]), 0, namesToX...); err != nil {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
// remove names less than name from current Y
|
||||
if err := nl.NodeDeleteBeforeExclusive(prevNodeReference, name); err != nil {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// point skip list to current Y
|
||||
if err := nl.ItemAdd(lookupKey, prevNodeReference.ElementPointer); err != nil {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
} else {
|
||||
// collect names after name, add them to Y
|
||||
namesToY, err := nl.NodeRangeAfterExclusive(prevNodeReference, name)
|
||||
if err != nil {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
// add namesToY and name to a new Y
|
||||
namesToY = append(namesToY, name)
|
||||
if err := nl.ItemAdd(lookupKey, 0, namesToY...); err != nil {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
// remove names after name from current X
|
||||
if err := nl.NodeDeleteAfterExclusive(prevNodeReference, name); err != nil {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -240,9 +240,10 @@ func (t *SkipList) DeleteByKey(key []byte) (id int64, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Link from start needs readjustments.
|
||||
startNextKey := t.StartLevels[index].Key
|
||||
if compareElement(nextNode, startNextKey) == 0 {
|
||||
// Match by id, not the cached key: redis3 re-keys nodes in place,
|
||||
// so StartLevels[index].Key can be stale and skip this update while
|
||||
// the end readjustment below still clears EndLevels[index].
|
||||
if t.StartLevels[index] != nil && t.StartLevels[index].ElementPointer == nextNode.Id {
|
||||
t.HasChanges = true
|
||||
t.StartLevels[index] = nextNode.Next[index]
|
||||
// This was our currently highest node!
|
||||
@@ -307,8 +308,14 @@ func (t *SkipList) InsertByKey(key []byte, idIfKnown int64, value []byte) (id in
|
||||
newFirst := true
|
||||
newLast := true
|
||||
if !t.IsEmpty() {
|
||||
newFirst = compareElement(elem, t.StartLevels[0].Key) < 0
|
||||
newLast = compareElement(elem, t.EndLevels[0].Key) > 0
|
||||
// Guard each end: a persisted skiplist may have one end nil, so
|
||||
// self-heal instead of dereferencing it.
|
||||
if t.StartLevels[0] != nil {
|
||||
newFirst = compareElement(elem, t.StartLevels[0].Key) < 0
|
||||
}
|
||||
if t.EndLevels[0] != nil {
|
||||
newLast = compareElement(elem, t.EndLevels[0].Key) > 0
|
||||
}
|
||||
}
|
||||
|
||||
normallyInserted := false
|
||||
|
||||
@@ -29,6 +29,49 @@ func TestReverseInsert(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
// TestDeleteRekeyedNode reproduces the redis3 filer crash: a node re-keyed in
|
||||
// place leaves a stale StartLevels key, so deleting it must still clear both
|
||||
// ends together rather than leaving EndLevels[0] nil for the next insert.
|
||||
func TestDeleteRekeyedNode(t *testing.T) {
|
||||
list := New(memStore)
|
||||
|
||||
list.InsertByKey([]byte("b"), 1, []byte("b"))
|
||||
|
||||
// Drifted start reference: same element id, stale cached key.
|
||||
list.StartLevels[0] = &SkipListElementReference{ElementPointer: 1, Key: []byte("stale")}
|
||||
|
||||
if _, err := list.DeleteByKey([]byte("b")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if (list.StartLevels[0] == nil) != (list.EndLevels[0] == nil) {
|
||||
t.Fatalf("inconsistent ends after delete: start=%v end=%v", list.StartLevels[0], list.EndLevels[0])
|
||||
}
|
||||
|
||||
if _, err := list.InsertByKey([]byte("c"), 2, []byte("c")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, _, ok, _ := list.Find([]byte("c")); !ok {
|
||||
t.Fatal("expected to find inserted key")
|
||||
}
|
||||
}
|
||||
|
||||
// TestInsertByKeyRecoversInconsistentEnds: a persisted skiplist with
|
||||
// StartLevels[0] set but EndLevels[0] nil must self-heal, not crash on load.
|
||||
func TestInsertByKeyRecoversInconsistentEnds(t *testing.T) {
|
||||
list := New(memStore)
|
||||
list.InsertByKey([]byte("m"), 1, []byte("m"))
|
||||
|
||||
list.EndLevels[0] = nil
|
||||
|
||||
if _, err := list.InsertByKey([]byte("n"), 2, []byte("n")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if list.IsEmpty() {
|
||||
t.Fatal("list should not be empty")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInsertAndFind(t *testing.T) {
|
||||
|
||||
k0 := []byte("0")
|
||||
|
||||
Reference in New Issue
Block a user