diff --git a/weed/filer/redis3/ItemList.go b/weed/filer/redis3/ItemList.go index b4043d01c..7a700f435 100644 --- a/weed/filer/redis3/ItemList.go +++ b/weed/filer/redis3/ItemList.go @@ -127,7 +127,7 @@ func (nl *ItemList) WriteName(name string) error { // collect names before name, add them to X namesToX, err := nl.NodeRangeBeforeExclusive(prevNodeReference, name) if err != nil { - return nil + return err } // delete skiplist reference to old node if _, err := nl.skipList.DeleteByKey(prevNodeReference.Key); err != nil { @@ -136,32 +136,32 @@ func (nl *ItemList) WriteName(name string) error { // add namesToY and name to a new X namesToX = append(namesToX, name) if err := nl.ItemAdd([]byte(namesToX[0]), 0, namesToX...); err != nil { - return nil + return err } // remove names less than name from current Y if err := nl.NodeDeleteBeforeExclusive(prevNodeReference, name); err != nil { - return nil + return err } // point skip list to current Y if err := nl.ItemAdd(lookupKey, prevNodeReference.ElementPointer); err != nil { - return nil + return err } return nil } else { // collect names after name, add them to Y namesToY, err := nl.NodeRangeAfterExclusive(prevNodeReference, name) if err != nil { - return nil + return err } // add namesToY and name to a new Y namesToY = append(namesToY, name) if err := nl.ItemAdd(lookupKey, 0, namesToY...); err != nil { - return nil + return err } // remove names after name from current X if err := nl.NodeDeleteAfterExclusive(prevNodeReference, name); err != nil { - return nil + return err } return nil } diff --git a/weed/util/skiplist/skiplist.go b/weed/util/skiplist/skiplist.go index befb0389c..8bd62bf7a 100644 --- a/weed/util/skiplist/skiplist.go +++ b/weed/util/skiplist/skiplist.go @@ -240,9 +240,10 @@ func (t *SkipList) DeleteByKey(key []byte) (id int64, err error) { } } - // Link from start needs readjustments. - startNextKey := t.StartLevels[index].Key - if compareElement(nextNode, startNextKey) == 0 { + // Match by id, not the cached key: redis3 re-keys nodes in place, + // so StartLevels[index].Key can be stale and skip this update while + // the end readjustment below still clears EndLevels[index]. + if t.StartLevels[index] != nil && t.StartLevels[index].ElementPointer == nextNode.Id { t.HasChanges = true t.StartLevels[index] = nextNode.Next[index] // This was our currently highest node! @@ -307,8 +308,14 @@ func (t *SkipList) InsertByKey(key []byte, idIfKnown int64, value []byte) (id in newFirst := true newLast := true if !t.IsEmpty() { - newFirst = compareElement(elem, t.StartLevels[0].Key) < 0 - newLast = compareElement(elem, t.EndLevels[0].Key) > 0 + // Guard each end: a persisted skiplist may have one end nil, so + // self-heal instead of dereferencing it. + if t.StartLevels[0] != nil { + newFirst = compareElement(elem, t.StartLevels[0].Key) < 0 + } + if t.EndLevels[0] != nil { + newLast = compareElement(elem, t.EndLevels[0].Key) > 0 + } } normallyInserted := false diff --git a/weed/util/skiplist/skiplist_test.go b/weed/util/skiplist/skiplist_test.go index c5116a49a..b4c6405f8 100644 --- a/weed/util/skiplist/skiplist_test.go +++ b/weed/util/skiplist/skiplist_test.go @@ -29,6 +29,49 @@ func TestReverseInsert(t *testing.T) { } +// TestDeleteRekeyedNode reproduces the redis3 filer crash: a node re-keyed in +// place leaves a stale StartLevels key, so deleting it must still clear both +// ends together rather than leaving EndLevels[0] nil for the next insert. +func TestDeleteRekeyedNode(t *testing.T) { + list := New(memStore) + + list.InsertByKey([]byte("b"), 1, []byte("b")) + + // Drifted start reference: same element id, stale cached key. + list.StartLevels[0] = &SkipListElementReference{ElementPointer: 1, Key: []byte("stale")} + + if _, err := list.DeleteByKey([]byte("b")); err != nil { + t.Fatal(err) + } + + if (list.StartLevels[0] == nil) != (list.EndLevels[0] == nil) { + t.Fatalf("inconsistent ends after delete: start=%v end=%v", list.StartLevels[0], list.EndLevels[0]) + } + + if _, err := list.InsertByKey([]byte("c"), 2, []byte("c")); err != nil { + t.Fatal(err) + } + if _, _, ok, _ := list.Find([]byte("c")); !ok { + t.Fatal("expected to find inserted key") + } +} + +// TestInsertByKeyRecoversInconsistentEnds: a persisted skiplist with +// StartLevels[0] set but EndLevels[0] nil must self-heal, not crash on load. +func TestInsertByKeyRecoversInconsistentEnds(t *testing.T) { + list := New(memStore) + list.InsertByKey([]byte("m"), 1, []byte("m")) + + list.EndLevels[0] = nil + + if _, err := list.InsertByKey([]byte("n"), 2, []byte("n")); err != nil { + t.Fatal(err) + } + if list.IsEmpty() { + t.Fatal("list should not be empty") + } +} + func TestInsertAndFind(t *testing.T) { k0 := []byte("0")