libceph: have cursor point to data

Rather than having a ceph message data item point to the cursor it's
associated with, have the cursor point to a data item.  This will
allow a message cursor to be used for more than one data item.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
This commit is contained in:
Alex Elder 2013-03-14 14:09:06 -05:00 committed by Sage Weil
parent 36153ec9dd
commit 8ae4f4f5c0
2 changed files with 59 additions and 62 deletions

View File

@ -104,13 +104,13 @@ struct ceph_msg_data {
}; };
struct ceph_pagelist *pagelist; struct ceph_pagelist *pagelist;
}; };
struct ceph_msg_data_cursor *cursor;
}; };
struct ceph_msg_data_cursor { struct ceph_msg_data_cursor {
size_t resid; /* bytes not yet consumed */ struct ceph_msg_data *data; /* data item this describes */
bool last_piece; /* now at last piece of data item */ size_t resid; /* bytes not yet consumed */
bool need_crc; /* new piece; crc update needed */ bool last_piece; /* current is last piece */
bool need_crc; /* crc update needed */
union { union {
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
struct { /* bio */ struct { /* bio */

View File

@ -722,10 +722,10 @@ static void con_out_kvec_add(struct ceph_connection *con,
* entry in the current bio iovec, or the first entry in the next * entry in the current bio iovec, or the first entry in the next
* bio in the list. * bio in the list.
*/ */
static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data, static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length) size_t length)
{ {
struct ceph_msg_data_cursor *cursor = data->cursor; struct ceph_msg_data *data = cursor->data;
struct bio *bio; struct bio *bio;
BUG_ON(data->type != CEPH_MSG_DATA_BIO); BUG_ON(data->type != CEPH_MSG_DATA_BIO);
@ -741,11 +741,11 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data,
cursor->last_piece = length <= bio->bi_io_vec[0].bv_len; cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
} }
static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data, static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
size_t *page_offset, size_t *page_offset,
size_t *length) size_t *length)
{ {
struct ceph_msg_data_cursor *cursor = data->cursor; struct ceph_msg_data *data = cursor->data;
struct bio *bio; struct bio *bio;
struct bio_vec *bio_vec; struct bio_vec *bio_vec;
unsigned int index; unsigned int index;
@ -772,14 +772,14 @@ static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
return bio_vec->bv_page; return bio_vec->bv_page;
} }
static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes) static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
size_t bytes)
{ {
struct ceph_msg_data_cursor *cursor = data->cursor;
struct bio *bio; struct bio *bio;
struct bio_vec *bio_vec; struct bio_vec *bio_vec;
unsigned int index; unsigned int index;
BUG_ON(data->type != CEPH_MSG_DATA_BIO); BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
bio = cursor->bio; bio = cursor->bio;
BUG_ON(!bio); BUG_ON(!bio);
@ -823,10 +823,10 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
* For a page array, a piece comes from the first page in the array * For a page array, a piece comes from the first page in the array
* that has not already been fully consumed. * that has not already been fully consumed.
*/ */
static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data, static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length) size_t length)
{ {
struct ceph_msg_data_cursor *cursor = data->cursor; struct ceph_msg_data *data = cursor->data;
int page_count; int page_count;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES); BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
@ -845,11 +845,11 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data,
cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE; cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
} }
static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data, static struct page *
size_t *page_offset, ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
size_t *length) size_t *page_offset, size_t *length)
{ {
struct ceph_msg_data_cursor *cursor = data->cursor; struct ceph_msg_data *data = cursor->data;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES); BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
@ -865,12 +865,10 @@ static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
return data->pages[cursor->page_index]; return data->pages[cursor->page_index];
} }
static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data, static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
size_t bytes) size_t bytes)
{ {
struct ceph_msg_data_cursor *cursor = data->cursor; BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
BUG_ON(cursor->page_offset + bytes > PAGE_SIZE); BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
@ -894,10 +892,11 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
* For a pagelist, a piece is whatever remains to be consumed in the * For a pagelist, a piece is whatever remains to be consumed in the
* first page in the list, or the front of the next page. * first page in the list, or the front of the next page.
*/ */
static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data, static void
ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length) size_t length)
{ {
struct ceph_msg_data_cursor *cursor = data->cursor; struct ceph_msg_data *data = cursor->data;
struct ceph_pagelist *pagelist; struct ceph_pagelist *pagelist;
struct page *page; struct page *page;
@ -919,11 +918,11 @@ static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
cursor->last_piece = length <= PAGE_SIZE; cursor->last_piece = length <= PAGE_SIZE;
} }
static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data, static struct page *
size_t *page_offset, ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
size_t *length) size_t *page_offset, size_t *length)
{ {
struct ceph_msg_data_cursor *cursor = data->cursor; struct ceph_msg_data *data = cursor->data;
struct ceph_pagelist *pagelist; struct ceph_pagelist *pagelist;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
@ -941,13 +940,13 @@ static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
else else
*length = PAGE_SIZE - *page_offset; *length = PAGE_SIZE - *page_offset;
return data->cursor->page; return cursor->page;
} }
static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data, static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
size_t bytes) size_t bytes)
{ {
struct ceph_msg_data_cursor *cursor = data->cursor; struct ceph_msg_data *data = cursor->data;
struct ceph_pagelist *pagelist; struct ceph_pagelist *pagelist;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
@ -983,19 +982,21 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
* be processed in that piece. It also tracks whether the current * be processed in that piece. It also tracks whether the current
* piece is the last one in the data item. * piece is the last one in the data item.
*/ */
static void ceph_msg_data_cursor_init(struct ceph_msg_data *data, static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
size_t length)
{ {
switch (data->type) { struct ceph_msg_data_cursor *cursor = &msg->cursor;
cursor->data = msg->data;
switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST: case CEPH_MSG_DATA_PAGELIST:
ceph_msg_data_pagelist_cursor_init(data, length); ceph_msg_data_pagelist_cursor_init(cursor, length);
break; break;
case CEPH_MSG_DATA_PAGES: case CEPH_MSG_DATA_PAGES:
ceph_msg_data_pages_cursor_init(data, length); ceph_msg_data_pages_cursor_init(cursor, length);
break; break;
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO: case CEPH_MSG_DATA_BIO:
ceph_msg_data_bio_cursor_init(data, length); ceph_msg_data_bio_cursor_init(cursor, length);
break; break;
#endif /* CONFIG_BLOCK */ #endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE: case CEPH_MSG_DATA_NONE:
@ -1003,7 +1004,7 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
/* BUG(); */ /* BUG(); */
break; break;
} }
data->cursor->need_crc = true; cursor->need_crc = true;
} }
/* /*
@ -1011,23 +1012,22 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
* data item, and supply the page offset and length of that piece. * data item, and supply the page offset and length of that piece.
* Indicate whether this is the last piece in this data item. * Indicate whether this is the last piece in this data item.
*/ */
static struct page *ceph_msg_data_next(struct ceph_msg_data *data, static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
size_t *page_offset, size_t *page_offset, size_t *length,
size_t *length,
bool *last_piece) bool *last_piece)
{ {
struct page *page; struct page *page;
switch (data->type) { switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST: case CEPH_MSG_DATA_PAGELIST:
page = ceph_msg_data_pagelist_next(data, page_offset, length); page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
break; break;
case CEPH_MSG_DATA_PAGES: case CEPH_MSG_DATA_PAGES:
page = ceph_msg_data_pages_next(data, page_offset, length); page = ceph_msg_data_pages_next(cursor, page_offset, length);
break; break;
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO: case CEPH_MSG_DATA_BIO:
page = ceph_msg_data_bio_next(data, page_offset, length); page = ceph_msg_data_bio_next(cursor, page_offset, length);
break; break;
#endif /* CONFIG_BLOCK */ #endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE: case CEPH_MSG_DATA_NONE:
@ -1039,7 +1039,7 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
BUG_ON(*page_offset + *length > PAGE_SIZE); BUG_ON(*page_offset + *length > PAGE_SIZE);
BUG_ON(!*length); BUG_ON(!*length);
if (last_piece) if (last_piece)
*last_piece = data->cursor->last_piece; *last_piece = cursor->last_piece;
return page; return page;
} }
@ -1048,22 +1048,22 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
* Returns true if the result moves the cursor on to the next piece * Returns true if the result moves the cursor on to the next piece
* of the data item. * of the data item.
*/ */
static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes) static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
size_t bytes)
{ {
struct ceph_msg_data_cursor *cursor = data->cursor;
bool new_piece; bool new_piece;
BUG_ON(bytes > cursor->resid); BUG_ON(bytes > cursor->resid);
switch (data->type) { switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST: case CEPH_MSG_DATA_PAGELIST:
new_piece = ceph_msg_data_pagelist_advance(data, bytes); new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
break; break;
case CEPH_MSG_DATA_PAGES: case CEPH_MSG_DATA_PAGES:
new_piece = ceph_msg_data_pages_advance(data, bytes); new_piece = ceph_msg_data_pages_advance(cursor, bytes);
break; break;
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO: case CEPH_MSG_DATA_BIO:
new_piece = ceph_msg_data_bio_advance(data, bytes); new_piece = ceph_msg_data_bio_advance(cursor, bytes);
break; break;
#endif /* CONFIG_BLOCK */ #endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE: case CEPH_MSG_DATA_NONE:
@ -1071,7 +1071,7 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
BUG(); BUG();
break; break;
} }
data->cursor->need_crc = new_piece; cursor->need_crc = new_piece;
return new_piece; return new_piece;
} }
@ -1083,7 +1083,7 @@ static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
/* Initialize data cursor */ /* Initialize data cursor */
ceph_msg_data_cursor_init(msg->data, (size_t)data_len); ceph_msg_data_cursor_init(msg, (size_t)data_len);
} }
/* /*
@ -1404,7 +1404,7 @@ static u32 ceph_crc32c_page(u32 crc, struct page *page,
static int write_partial_message_data(struct ceph_connection *con) static int write_partial_message_data(struct ceph_connection *con)
{ {
struct ceph_msg *msg = con->out_msg; struct ceph_msg *msg = con->out_msg;
struct ceph_msg_data_cursor *cursor = msg->data->cursor; struct ceph_msg_data_cursor *cursor = &msg->cursor;
bool do_datacrc = !con->msgr->nocrc; bool do_datacrc = !con->msgr->nocrc;
u32 crc; u32 crc;
@ -1430,7 +1430,7 @@ static int write_partial_message_data(struct ceph_connection *con)
bool need_crc; bool need_crc;
int ret; int ret;
page = ceph_msg_data_next(msg->data, &page_offset, &length, page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
&last_piece); &last_piece);
ret = ceph_tcp_sendpage(con->sock, page, page_offset, ret = ceph_tcp_sendpage(con->sock, page, page_offset,
length, last_piece); length, last_piece);
@ -1442,7 +1442,7 @@ static int write_partial_message_data(struct ceph_connection *con)
} }
if (do_datacrc && cursor->need_crc) if (do_datacrc && cursor->need_crc)
crc = ceph_crc32c_page(crc, page, page_offset, length); crc = ceph_crc32c_page(crc, page, page_offset, length);
need_crc = ceph_msg_data_advance(msg->data, (size_t)ret); need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
} }
dout("%s %p msg %p done\n", __func__, con, msg); dout("%s %p msg %p done\n", __func__, con, msg);
@ -2102,7 +2102,7 @@ static int read_partial_message_section(struct ceph_connection *con,
static int read_partial_msg_data(struct ceph_connection *con) static int read_partial_msg_data(struct ceph_connection *con)
{ {
struct ceph_msg *msg = con->in_msg; struct ceph_msg *msg = con->in_msg;
struct ceph_msg_data_cursor *cursor = msg->data->cursor; struct ceph_msg_data_cursor *cursor = &msg->cursor;
const bool do_datacrc = !con->msgr->nocrc; const bool do_datacrc = !con->msgr->nocrc;
struct page *page; struct page *page;
size_t page_offset; size_t page_offset;
@ -2117,7 +2117,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
if (do_datacrc) if (do_datacrc)
crc = con->in_data_crc; crc = con->in_data_crc;
while (cursor->resid) { while (cursor->resid) {
page = ceph_msg_data_next(msg->data, &page_offset, &length, page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
NULL); NULL);
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
if (ret <= 0) { if (ret <= 0) {
@ -2129,7 +2129,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
if (do_datacrc) if (do_datacrc)
crc = ceph_crc32c_page(crc, page, page_offset, ret); crc = ceph_crc32c_page(crc, page, page_offset, ret);
(void) ceph_msg_data_advance(msg->data, (size_t)ret); (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
} }
if (do_datacrc) if (do_datacrc)
con->in_data_crc = crc; con->in_data_crc = crc;
@ -2991,7 +2991,6 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
BUG_ON(!data); BUG_ON(!data);
data->cursor = &msg->cursor;
data->pages = pages; data->pages = pages;
data->length = length; data->length = length;
data->alignment = alignment & ~PAGE_MASK; data->alignment = alignment & ~PAGE_MASK;
@ -3013,7 +3012,6 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
BUG_ON(!data); BUG_ON(!data);
data->cursor = &msg->cursor;
data->pagelist = pagelist; data->pagelist = pagelist;
msg->data = data; msg->data = data;
@ -3033,7 +3031,6 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
BUG_ON(!data); BUG_ON(!data);
data->cursor = &msg->cursor;
data->bio = bio; data->bio = bio;
data->bio_length = length; data->bio_length = length;