scoutfs: don't lose items between segments

The refactoring of compaction to use the skip lists changed the nature
of item insertion.  Previously it would precisely count the number of
items to insert.  Now it discovers that the current output segment is
full by having _append_item() return false.

In this case the cursors currently point to the item that would have
been inserted but failed.  compact_items() caller loops around to
allocate the next segment.  Then it calls compact_items() again and it
mistakenly advances *past* the current item that still needed to be
inserted.

Hiding next_item() away from the segment loop made it hard to see this
mechanism.  Let's drop the compact_items() function and bring item
iteration and item appending into the main loop so we can more carefully
advance or not as we write and allocate new segments.

This stops losing items at segment boundaries.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2017-06-22 22:44:43 -07:00
parent e7655f00ee
commit 76a73baefd

View File

@@ -272,53 +272,21 @@ out:
return ret;
}
/*
* Walk the input segments for items and append them to the output segment.
* Items can exist in the input segments but not be written to the output
* segment, for example if they're deletions. The output segment can be
* full.
*
* Return -errno if something went wrong, then 1 or 0 indicating items written.
*/
static int compact_items(struct super_block *sb, struct compact_cursor *curs,
struct scoutfs_segment *seg)
{
struct scoutfs_key_buf item_key;
SCOUTFS_DECLARE_KVEC(item_val);
int has_next;
int ret = 0;
u8 flags;
for (;;) {
has_next = next_item(sb, curs, &item_key, item_val, &flags);
if (has_next <= 0) {
if (has_next < 0)
ret = has_next;
break;
}
if (scoutfs_seg_append_item(sb, seg, &item_key, item_val, flags,
curs->links))
ret = 1;
else
break;
}
return ret;
}
static int compact_segments(struct super_block *sb,
struct compact_cursor *curs,
struct scoutfs_bio_completion *comp,
struct list_head *results)
{
struct scoutfs_key_buf item_key;
SCOUTFS_DECLARE_KVEC(item_val);
struct scoutfs_segment *seg;
struct compact_seg *cseg;
struct compact_seg *upper;
struct compact_seg *lower;
unsigned next_segno = 0;
bool append_filled = false;
int ret = 0;
u8 flags;
scoutfs_inc_counter(sb, compact_operations);
if (curs->sticky)
@@ -394,6 +362,13 @@ static int compact_segments(struct super_block *sb,
if (ret)
break;
if (!append_filled)
ret = next_item(sb, curs, &item_key, item_val, &flags);
else
ret = 1;
if (ret <= 0)
break;
/* no cseg keys, manifest update uses seg item keys */
cseg = kzalloc(sizeof(struct compact_seg), GFP_NOFS);
if (!cseg) {
@@ -411,9 +386,7 @@ static int compact_segments(struct super_block *sb,
* case.
*/
ret = scoutfs_seg_alloc(sb, cseg->segno, &seg);
if (ret == 0)
ret = compact_items(sb, curs, seg);
if (ret < 1) {
if (ret < 0) {
next_segno--;
curs->segnos[next_segno] = cseg->segno;
kfree(cseg);
@@ -436,6 +409,22 @@ static int compact_segments(struct super_block *sb,
cseg->seg = seg;
list_add_tail(&cseg->entry, results);
for (;;) {
if (!scoutfs_seg_append_item(sb, seg, &item_key, item_val,
flags, curs->links)) {
append_filled = true;
ret = 0;
break;
}
ret = next_item(sb, curs, &item_key, item_val, &flags);
if (ret <= 0) {
append_filled = false;
break;
}
}
if (ret < 0)
break;
/* start a complete segment write now, we'll wait later */
ret = scoutfs_seg_submit_write(sb, seg, comp);
if (ret)