rds: recycle FMRs through lockless lists

FRM allocation and recycling is performance critical and fairly lock
intensive.  The current code has a per connection lock that all
processes bang on and it becomes a major bottleneck on large systems.

This changes things to use a number of cmpxchg based lists instead,
allowing us to go through the whole FMR lifecycle without locking inside
RDS.

Zach Brown pointed out that our usage of cmpxchg for xlist removal is
racey if someone manages to remove and add back an FMR struct into the list
while another CPU can see the FMR's address at the head of the list.

The second CPU might assume the list hasn't changed when in fact any
number of operations might have happened in between the deletion and
reinsertion.

This commit maintains a per cpu count of CPUs that are currently
in xlist removal, and establishes a grace period to make sure that
nobody can see an entry we have just removed from the list.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
This commit is contained in:
Chris Mason 2010-06-11 11:17:59 -07:00 committed by Andy Grover
parent 0f4b1c7e89
commit 6fa70da608
2 changed files with 282 additions and 42 deletions

View File

@ -36,7 +36,10 @@
#include "rds.h"
#include "ib.h"
#include "xlist.h"
static DEFINE_PER_CPU(unsigned long, clean_list_grace);
#define CLEAN_LIST_BUSY_BIT 0
/*
* This is stored as mr->r_trans_private.
@ -45,7 +48,11 @@ struct rds_ib_mr {
struct rds_ib_device *device;
struct rds_ib_mr_pool *pool;
struct ib_fmr *fmr;
struct list_head list;
struct xlist_head xlist;
/* unmap_list is for freeing */
struct list_head unmap_list;
unsigned int remap_count;
struct scatterlist *sg;
@ -61,12 +68,14 @@ struct rds_ib_mr_pool {
struct mutex flush_lock; /* serialize fmr invalidate */
struct work_struct flush_worker; /* flush worker */
spinlock_t list_lock; /* protect variables below */
atomic_t item_count; /* total # of MRs */
atomic_t dirty_count; /* # dirty of MRs */
struct list_head drop_list; /* MRs that have reached their max_maps limit */
struct list_head free_list; /* unused MRs */
struct list_head clean_list; /* unused & unamapped MRs */
struct xlist_head drop_list; /* MRs that have reached their max_maps limit */
struct xlist_head free_list; /* unused MRs */
struct xlist_head clean_list; /* global unused & unamapped MRs */
wait_queue_head_t flush_wait;
atomic_t free_pinned; /* memory pinned by free MRs */
unsigned long max_items;
unsigned long max_items_soft;
@ -74,7 +83,7 @@ struct rds_ib_mr_pool {
struct ib_fmr_attr fmr_attr;
};
static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all, struct rds_ib_mr **);
static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
@ -212,11 +221,11 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
if (!pool)
return ERR_PTR(-ENOMEM);
INIT_LIST_HEAD(&pool->free_list);
INIT_LIST_HEAD(&pool->drop_list);
INIT_LIST_HEAD(&pool->clean_list);
INIT_XLIST_HEAD(&pool->free_list);
INIT_XLIST_HEAD(&pool->drop_list);
INIT_XLIST_HEAD(&pool->clean_list);
mutex_init(&pool->flush_lock);
spin_lock_init(&pool->list_lock);
init_waitqueue_head(&pool->flush_wait);
INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
pool->fmr_attr.max_pages = fmr_message_size;
@ -246,27 +255,50 @@ void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_co
void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
{
cancel_work_sync(&pool->flush_worker);
rds_ib_flush_mr_pool(pool, 1);
rds_ib_flush_mr_pool(pool, 1, NULL);
WARN_ON(atomic_read(&pool->item_count));
WARN_ON(atomic_read(&pool->free_pinned));
kfree(pool);
}
static void refill_local(struct rds_ib_mr_pool *pool, struct xlist_head *xl,
struct rds_ib_mr **ibmr_ret)
{
struct xlist_head *ibmr_xl;
ibmr_xl = xlist_del_head_fast(xl);
*ibmr_ret = list_entry(ibmr_xl, struct rds_ib_mr, xlist);
}
static inline struct rds_ib_mr *rds_ib_reuse_fmr(struct rds_ib_mr_pool *pool)
{
struct rds_ib_mr *ibmr = NULL;
unsigned long flags;
struct xlist_head *ret;
unsigned long *flag;
spin_lock_irqsave(&pool->list_lock, flags);
if (!list_empty(&pool->clean_list)) {
ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, list);
list_del_init(&ibmr->list);
}
spin_unlock_irqrestore(&pool->list_lock, flags);
preempt_disable();
flag = &__get_cpu_var(clean_list_grace);
set_bit(CLEAN_LIST_BUSY_BIT, flag);
ret = xlist_del_head(&pool->clean_list);
if (ret)
ibmr = list_entry(ret, struct rds_ib_mr, xlist);
clear_bit(CLEAN_LIST_BUSY_BIT, flag);
preempt_enable();
return ibmr;
}
static inline void wait_clean_list_grace(void)
{
int cpu;
unsigned long *flag;
for_each_online_cpu(cpu) {
flag = &per_cpu(clean_list_grace, cpu);
while (test_bit(CLEAN_LIST_BUSY_BIT, flag))
cpu_relax();
}
}
static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
{
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
@ -299,7 +331,9 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
/* We do have some empty MRs. Flush them out. */
rds_ib_stats_inc(s_ib_rdma_mr_pool_wait);
rds_ib_flush_mr_pool(pool, 0);
rds_ib_flush_mr_pool(pool, 0, &ibmr);
if (ibmr)
return ibmr;
}
ibmr = kzalloc_node(sizeof(*ibmr), GFP_KERNEL, rdsibdev_to_node(rds_ibdev));
@ -493,34 +527,110 @@ static inline unsigned int rds_ib_flush_goal(struct rds_ib_mr_pool *pool, int fr
return 0;
}
/*
* given an xlist of mrs, put them all into the list_head for more processing
*/
static void xlist_append_to_list(struct xlist_head *xlist, struct list_head *list)
{
struct rds_ib_mr *ibmr;
struct xlist_head splice;
struct xlist_head *cur;
struct xlist_head *next;
splice.next = NULL;
xlist_splice(xlist, &splice);
cur = splice.next;
while (cur) {
next = cur->next;
ibmr = list_entry(cur, struct rds_ib_mr, xlist);
list_add_tail(&ibmr->unmap_list, list);
cur = next;
}
}
/*
* this takes a list head of mrs and turns it into an xlist of clusters.
* each cluster has an xlist of MR_CLUSTER_SIZE mrs that are ready for
* reuse.
*/
static void list_append_to_xlist(struct rds_ib_mr_pool *pool,
struct list_head *list, struct xlist_head *xlist,
struct xlist_head **tail_ret)
{
struct rds_ib_mr *ibmr;
struct xlist_head *cur_mr = xlist;
struct xlist_head *tail_mr = NULL;
list_for_each_entry(ibmr, list, unmap_list) {
tail_mr = &ibmr->xlist;
tail_mr->next = NULL;
cur_mr->next = tail_mr;
cur_mr = tail_mr;
}
*tail_ret = tail_mr;
}
/*
* Flush our pool of MRs.
* At a minimum, all currently unused MRs are unmapped.
* If the number of MRs allocated exceeds the limit, we also try
* to free as many MRs as needed to get back to this limit.
*/
static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
int free_all, struct rds_ib_mr **ibmr_ret)
{
struct rds_ib_mr *ibmr, *next;
struct xlist_head clean_xlist;
struct xlist_head *clean_tail;
LIST_HEAD(unmap_list);
LIST_HEAD(fmr_list);
unsigned long unpinned = 0;
unsigned long flags;
unsigned int nfreed = 0, ncleaned = 0, free_goal;
int ret = 0;
rds_ib_stats_inc(s_ib_rdma_mr_pool_flush);
mutex_lock(&pool->flush_lock);
if (ibmr_ret) {
DEFINE_WAIT(wait);
while(!mutex_trylock(&pool->flush_lock)) {
ibmr = rds_ib_reuse_fmr(pool);
if (ibmr) {
*ibmr_ret = ibmr;
finish_wait(&pool->flush_wait, &wait);
goto out_nolock;
}
prepare_to_wait(&pool->flush_wait, &wait,
TASK_UNINTERRUPTIBLE);
if (xlist_empty(&pool->clean_list))
schedule();
ibmr = rds_ib_reuse_fmr(pool);
if (ibmr) {
*ibmr_ret = ibmr;
finish_wait(&pool->flush_wait, &wait);
goto out_nolock;
}
}
finish_wait(&pool->flush_wait, &wait);
} else
mutex_lock(&pool->flush_lock);
if (ibmr_ret) {
ibmr = rds_ib_reuse_fmr(pool);
if (ibmr) {
*ibmr_ret = ibmr;
goto out;
}
}
spin_lock_irqsave(&pool->list_lock, flags);
/* Get the list of all MRs to be dropped. Ordering matters -
* we want to put drop_list ahead of free_list. */
list_splice_init(&pool->free_list, &unmap_list);
list_splice_init(&pool->drop_list, &unmap_list);
* we want to put drop_list ahead of free_list.
*/
xlist_append_to_list(&pool->drop_list, &unmap_list);
xlist_append_to_list(&pool->free_list, &unmap_list);
if (free_all)
list_splice_init(&pool->clean_list, &unmap_list);
spin_unlock_irqrestore(&pool->list_lock, flags);
xlist_append_to_list(&pool->clean_list, &unmap_list);
free_goal = rds_ib_flush_goal(pool, free_all);
@ -528,19 +638,20 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
goto out;
/* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
list_for_each_entry(ibmr, &unmap_list, list)
list_for_each_entry(ibmr, &unmap_list, unmap_list)
list_add(&ibmr->fmr->list, &fmr_list);
ret = ib_unmap_fmr(&fmr_list);
if (ret)
printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
/* Now we can destroy the DMA mapping and unpin any pages */
list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) {
unpinned += ibmr->sg_len;
__rds_ib_teardown_mr(ibmr);
if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
rds_ib_stats_inc(s_ib_rdma_mr_free);
list_del(&ibmr->list);
list_del(&ibmr->unmap_list);
ib_dealloc_fmr(ibmr->fmr);
kfree(ibmr);
nfreed++;
@ -548,9 +659,27 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
ncleaned++;
}
spin_lock_irqsave(&pool->list_lock, flags);
list_splice(&unmap_list, &pool->clean_list);
spin_unlock_irqrestore(&pool->list_lock, flags);
if (!list_empty(&unmap_list)) {
/* we have to make sure that none of the things we're about
* to put on the clean list would race with other cpus trying
* to pull items off. The xlist would explode if we managed to
* remove something from the clean list and then add it back again
* while another CPU was spinning on that same item in xlist_del_head.
*
* This is pretty unlikely, but just in case wait for an xlist grace period
* here before adding anything back into the clean list.
*/
wait_clean_list_grace();
list_append_to_xlist(pool, &unmap_list, &clean_xlist, &clean_tail);
if (ibmr_ret)
refill_local(pool, &clean_xlist, ibmr_ret);
/* refill_local may have emptied our list */
if (!xlist_empty(&clean_xlist))
xlist_add(clean_xlist.next, clean_tail, &pool->clean_list);
}
atomic_sub(unpinned, &pool->free_pinned);
atomic_sub(ncleaned, &pool->dirty_count);
@ -558,6 +687,9 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
out:
mutex_unlock(&pool->flush_lock);
if (waitqueue_active(&pool->flush_wait))
wake_up(&pool->flush_wait);
out_nolock:
return ret;
}
@ -565,7 +697,7 @@ static void rds_ib_mr_pool_flush_worker(struct work_struct *work)
{
struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker);
rds_ib_flush_mr_pool(pool, 0);
rds_ib_flush_mr_pool(pool, 0, NULL);
}
void rds_ib_free_mr(void *trans_private, int invalidate)
@ -573,20 +705,17 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
struct rds_ib_mr *ibmr = trans_private;
struct rds_ib_device *rds_ibdev = ibmr->device;
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
unsigned long flags;
rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
/* Return it to the pool's free list */
spin_lock_irqsave(&pool->list_lock, flags);
if (ibmr->remap_count >= pool->fmr_attr.max_maps)
list_add(&ibmr->list, &pool->drop_list);
xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->drop_list);
else
list_add(&ibmr->list, &pool->free_list);
xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->free_list);
atomic_add(ibmr->sg_len, &pool->free_pinned);
atomic_inc(&pool->dirty_count);
spin_unlock_irqrestore(&pool->list_lock, flags);
/* If we've pinned too many pages, request a flush */
if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned ||
@ -595,7 +724,7 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
if (invalidate) {
if (likely(!in_interrupt())) {
rds_ib_flush_mr_pool(pool, 0);
rds_ib_flush_mr_pool(pool, 0, NULL);
} else {
/* We get here if the user created a MR marked
* as use_once and invalidate at the same time. */
@ -614,7 +743,7 @@ void rds_ib_flush_mrs(void)
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
if (pool)
rds_ib_flush_mr_pool(pool, 0);
rds_ib_flush_mr_pool(pool, 0, NULL);
}
}
@ -659,3 +788,4 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
rds_ib_dev_put(rds_ibdev);
return ibmr;
}

110
net/rds/xlist.h Normal file
View File

@ -0,0 +1,110 @@
#ifndef _LINUX_XLIST_H
#define _LINUX_XLIST_H
#include <linux/stddef.h>
#include <linux/poison.h>
#include <linux/prefetch.h>
#include <asm/system.h>
struct xlist_head {
struct xlist_head *next;
};
/*
* XLIST_PTR_TAIL can be used to prevent double insertion. See
* xlist_protect()
*/
#define XLIST_PTR_TAIL ((struct xlist_head *)0x1)
static inline void xlist_add(struct xlist_head *new, struct xlist_head *tail, struct xlist_head *head)
{
struct xlist_head *cur;
struct xlist_head *check;
while (1) {
cur = head->next;
tail->next = cur;
check = cmpxchg(&head->next, cur, new);
if (check == cur)
break;
}
}
/*
* To avoid duplicate insertion by two CPUs of the same xlist item
* you can call xlist_protect. It will stuff XLIST_PTR_TAIL
* into the entry->next pointer with xchg, and only return 1
* if there was a NULL there before.
*
* if xlist_protect returns zero, someone else is busy working
* on this entry. Getting a NULL into the entry in a race
* free manner is the caller's job.
*/
static inline int xlist_protect(struct xlist_head *entry)
{
struct xlist_head *val;
val = xchg(&entry->next, XLIST_PTR_TAIL);
if (val == NULL)
return 1;
return 0;
}
static inline struct xlist_head *xlist_del_head(struct xlist_head *head)
{
struct xlist_head *cur;
struct xlist_head *check;
struct xlist_head *next;
while (1) {
cur = head->next;
if (!cur)
goto out;
if (cur == XLIST_PTR_TAIL) {
cur = NULL;
goto out;
}
next = cur->next;
check = cmpxchg(&head->next, cur, next);
if (check == cur)
goto out;
}
out:
return cur;
}
static inline struct xlist_head *xlist_del_head_fast(struct xlist_head *head)
{
struct xlist_head *cur;
cur = head->next;
if (!cur || cur == XLIST_PTR_TAIL)
return NULL;
head->next = cur->next;
return cur;
}
static inline void xlist_splice(struct xlist_head *list,
struct xlist_head *head)
{
struct xlist_head *cur;
WARN_ON(head->next);
cur = xchg(&list->next, NULL);
head->next = cur;
}
static inline void INIT_XLIST_HEAD(struct xlist_head *list)
{
list->next = NULL;
}
static inline int xlist_empty(struct xlist_head *head)
{
return head->next == NULL || head->next == XLIST_PTR_TAIL;
}
#endif