diff --git a/kmod/src/lock_server.c b/kmod/src/lock_server.c index fb33fd80..09ce48d7 100644 --- a/kmod/src/lock_server.c +++ b/kmod/src/lock_server.c @@ -486,7 +486,7 @@ static int process_waiting_requests(struct super_block *sb, /* processing waits for all invalidation responses or recovery */ if (!list_empty(&snode->invalidated) || - scoutfs_recov_next_pending(sb, SCOUTFS_RECOV_LOCKS) != 0) { + scoutfs_recov_next_pending(sb, 0, SCOUTFS_RECOV_LOCKS) != 0) { ret = 0; goto out; } diff --git a/kmod/src/omap.c b/kmod/src/omap.c index 37893914..3dfcbea8 100644 --- a/kmod/src/omap.c +++ b/kmod/src/omap.c @@ -619,7 +619,7 @@ static int handle_requests(struct super_block *sb) int ret; int err; - if (scoutfs_recov_next_pending(sb, SCOUTFS_RECOV_GREETING)) + if (scoutfs_recov_next_pending(sb, 0, SCOUTFS_RECOV_GREETING)) return 0; ret = 0; diff --git a/kmod/src/recov.c b/kmod/src/recov.c index b0d894c2..3829760b 100644 --- a/kmod/src/recov.c +++ b/kmod/src/recov.c @@ -16,9 +16,11 @@ #include #include #include +#include #include "super.h" #include "recov.h" +#include "cmp.h" /* * There are a few server messages which can't be processed until they @@ -47,18 +49,41 @@ struct recov_pending { int which; }; -static struct recov_pending *find_pending(struct recov_info *recinf, u64 rid, int which) +static struct recov_pending *next_pending(struct recov_info *recinf, u64 rid, int which) { struct recov_pending *pend; list_for_each_entry(pend, &recinf->pending, head) { - if ((rid == 0 || pend->rid == rid) && (pend->which & which)) + if (pend->rid > rid && pend->which & which) return pend; } return NULL; } +static struct recov_pending *lookup_pending(struct recov_info *recinf, u64 rid, int which) +{ + struct recov_pending *pend; + + pend = next_pending(recinf, rid - 1, which); + if (pend && pend->rid == rid) + return pend; + + return NULL; +} + +/* + * We keep the pending list sorted by rid so that we can iterate over + * them. The list should be small and shouldn't be used often. + */ +static int cmp_pending_rid(void *priv, struct list_head *A, struct list_head *B) +{ + struct recov_pending *a = list_entry(A, struct recov_pending, head); + struct recov_pending *b = list_entry(B, struct recov_pending, head); + + return scoutfs_cmp_u64s(a->rid, b->rid); +} + /* * Record that we'll be waiting for a client to recover something. * _finished will eventually be called for every _prepare, either @@ -80,14 +105,15 @@ int scoutfs_recov_prepare(struct super_block *sb, u64 rid, int which) spin_lock(&recinf->lock); - pend = find_pending(recinf, rid, SCOUTFS_RECOV_ALL); + pend = lookup_pending(recinf, rid, SCOUTFS_RECOV_ALL); if (pend) { pend->which |= which; } else { swap(pend, alloc); pend->rid = rid; pend->which = which; - list_add(&pend->head, &recinf->pending); + list_add_tail(&pend->head, &recinf->pending); + list_sort(NULL, &recinf->pending, cmp_pending_rid); } spin_unlock(&recinf->lock); @@ -159,7 +185,7 @@ int scoutfs_recov_finish(struct super_block *sb, u64 rid, int which) spin_lock(&recinf->lock); - pend = find_pending(recinf, rid, which); + pend = lookup_pending(recinf, rid, which); if (pend) { pend->which &= ~which; if (pend->which) { @@ -190,29 +216,28 @@ bool scoutfs_recov_is_pending(struct super_block *sb, u64 rid, int which) bool is_pending; spin_lock(&recinf->lock); - is_pending = find_pending(recinf, rid, which) != NULL; + is_pending = lookup_pending(recinf, rid, which) != NULL; spin_unlock(&recinf->lock); return is_pending; } /* - * Returns 0 if there are no rids waiting for the given state to be - * recovered. Returns the rid of a client still waiting if there are - * any, in no specified order. + * Return the next rid after the given rid of a client waiting for the + * given state to be recovered. Start with rid 0, returns 0 when there + * are no more clients waiting for recovery. * * This is inherently racey. Callers are responsible for resolving any * actions taken based on pending with the recovery finishing, perhaps * before we return. */ -u64 scoutfs_recov_next_pending(struct super_block *sb, int which) +u64 scoutfs_recov_next_pending(struct super_block *sb, u64 rid, int which) { DECLARE_RECOV_INFO(sb, recinf); struct recov_pending *pend; - u64 rid; spin_lock(&recinf->lock); - pend = find_pending(recinf, 0, which); + pend = next_pending(recinf, rid, which); rid = pend ? pend->rid : 0; spin_unlock(&recinf->lock); diff --git a/kmod/src/recov.h b/kmod/src/recov.h index cfdca30e..9be58b3a 100644 --- a/kmod/src/recov.h +++ b/kmod/src/recov.h @@ -14,7 +14,7 @@ int scoutfs_recov_begin(struct super_block *sb, void (*timeout_fn)(struct super_ unsigned int timeout_ms); int scoutfs_recov_finish(struct super_block *sb, u64 rid, int which); bool scoutfs_recov_is_pending(struct super_block *sb, u64 rid, int which); -u64 scoutfs_recov_next_pending(struct super_block *sb, int which); +u64 scoutfs_recov_next_pending(struct super_block *sb, u64 rid, int which); void scoutfs_recov_shutdown(struct super_block *sb); int scoutfs_recov_setup(struct super_block *sb);