Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: clean up on forwarded aborted mds request
  ceph: fix leak of osd authorizer
  ceph: close out mds, osd connections before stopping auth
  ceph: make lease code DN specific
  fs/ceph: Use ERR_CAST
  ceph: renew auth tickets before they expire
  ceph: do not resend mon requests on auth ticket renewal
  ceph: removed duplicated #includes
  ceph: avoid possible null dereference
  ceph: make mds requests killable, not interruptible
  sched: add wait_for_completion_killable_timeout
This commit is contained in:
Linus Torvalds 2010-05-30 08:56:39 -07:00
commit b612a05537
19 changed files with 104 additions and 32 deletions

View file

@ -1,7 +1,6 @@
#include "ceph_debug.h" #include "ceph_debug.h"
#include <linux/module.h> #include <linux/module.h>
#include <linux/slab.h>
#include <linux/err.h> #include <linux/err.h>
#include <linux/slab.h> #include <linux/slab.h>
@ -217,8 +216,8 @@ int ceph_handle_auth_reply(struct ceph_auth_client *ac,
if (ac->protocol != protocol) { if (ac->protocol != protocol) {
ret = ceph_auth_init_protocol(ac, protocol); ret = ceph_auth_init_protocol(ac, protocol);
if (ret) { if (ret) {
pr_err("error %d on auth method %s init\n", pr_err("error %d on auth protocol %d init\n",
ret, ac->ops->name); ret, protocol);
goto out; goto out;
} }
} }
@ -247,7 +246,7 @@ int ceph_build_auth(struct ceph_auth_client *ac,
if (!ac->protocol) if (!ac->protocol)
return ceph_auth_build_hello(ac, msg_buf, msg_len); return ceph_auth_build_hello(ac, msg_buf, msg_len);
BUG_ON(!ac->ops); BUG_ON(!ac->ops);
if (!ac->ops->is_authenticated(ac)) if (ac->ops->should_authenticate(ac))
return ceph_build_auth_request(ac, msg_buf, msg_len); return ceph_build_auth_request(ac, msg_buf, msg_len);
return 0; return 0;
} }

View file

@ -23,6 +23,12 @@ struct ceph_auth_client_ops {
*/ */
int (*is_authenticated)(struct ceph_auth_client *ac); int (*is_authenticated)(struct ceph_auth_client *ac);
/*
* true if we should (re)authenticate, e.g., when our tickets
* are getting old and crusty.
*/
int (*should_authenticate)(struct ceph_auth_client *ac);
/* /*
* build requests and process replies during monitor * build requests and process replies during monitor
* handshake. if handle_reply returns -EAGAIN, we build * handshake. if handle_reply returns -EAGAIN, we build

View file

@ -31,6 +31,13 @@ static int is_authenticated(struct ceph_auth_client *ac)
return !xi->starting; return !xi->starting;
} }
static int should_authenticate(struct ceph_auth_client *ac)
{
struct ceph_auth_none_info *xi = ac->private;
return xi->starting;
}
/* /*
* the generic auth code decode the global_id, and we carry no actual * the generic auth code decode the global_id, and we carry no actual
* authenticate state, so nothing happens here. * authenticate state, so nothing happens here.
@ -98,6 +105,7 @@ static const struct ceph_auth_client_ops ceph_auth_none_ops = {
.reset = reset, .reset = reset,
.destroy = destroy, .destroy = destroy,
.is_authenticated = is_authenticated, .is_authenticated = is_authenticated,
.should_authenticate = should_authenticate,
.handle_reply = handle_reply, .handle_reply = handle_reply,
.create_authorizer = ceph_auth_none_create_authorizer, .create_authorizer = ceph_auth_none_create_authorizer,
.destroy_authorizer = ceph_auth_none_destroy_authorizer, .destroy_authorizer = ceph_auth_none_destroy_authorizer,

View file

@ -27,6 +27,17 @@ static int ceph_x_is_authenticated(struct ceph_auth_client *ac)
return (ac->want_keys & xi->have_keys) == ac->want_keys; return (ac->want_keys & xi->have_keys) == ac->want_keys;
} }
static int ceph_x_should_authenticate(struct ceph_auth_client *ac)
{
struct ceph_x_info *xi = ac->private;
int need;
ceph_x_validate_tickets(ac, &need);
dout("ceph_x_should_authenticate want=%d need=%d have=%d\n",
ac->want_keys, need, xi->have_keys);
return need != 0;
}
static int ceph_x_encrypt_buflen(int ilen) static int ceph_x_encrypt_buflen(int ilen)
{ {
return sizeof(struct ceph_x_encrypt_header) + ilen + 16 + return sizeof(struct ceph_x_encrypt_header) + ilen + 16 +
@ -620,6 +631,7 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
static const struct ceph_auth_client_ops ceph_x_ops = { static const struct ceph_auth_client_ops ceph_x_ops = {
.name = "x", .name = "x",
.is_authenticated = ceph_x_is_authenticated, .is_authenticated = ceph_x_is_authenticated,
.should_authenticate = ceph_x_should_authenticate,
.build_request = ceph_x_build_request, .build_request = ceph_x_build_request,
.handle_reply = ceph_x_handle_reply, .handle_reply = ceph_x_handle_reply,
.create_authorizer = ceph_x_create_authorizer, .create_authorizer = ceph_x_create_authorizer,

View file

@ -265,16 +265,17 @@ extern const char *ceph_mds_state_name(int s);
* - they also define the lock ordering by the MDS * - they also define the lock ordering by the MDS
* - a few of these are internal to the mds * - a few of these are internal to the mds
*/ */
#define CEPH_LOCK_DN 1 #define CEPH_LOCK_DVERSION 1
#define CEPH_LOCK_ISNAP 2 #define CEPH_LOCK_DN 2
#define CEPH_LOCK_IVERSION 4 /* mds internal */ #define CEPH_LOCK_ISNAP 16
#define CEPH_LOCK_IFILE 8 /* mds internal */ #define CEPH_LOCK_IVERSION 32 /* mds internal */
#define CEPH_LOCK_IAUTH 32 #define CEPH_LOCK_IFILE 64
#define CEPH_LOCK_ILINK 64 #define CEPH_LOCK_IAUTH 128
#define CEPH_LOCK_IDFT 128 /* dir frag tree */ #define CEPH_LOCK_ILINK 256
#define CEPH_LOCK_INEST 256 /* mds internal */ #define CEPH_LOCK_IDFT 512 /* dir frag tree */
#define CEPH_LOCK_IXATTR 512 #define CEPH_LOCK_INEST 1024 /* mds internal */
#define CEPH_LOCK_INO 2048 /* immutable inode bits; not a lock */ #define CEPH_LOCK_IXATTR 2048
#define CEPH_LOCK_INO 8192 /* immutable inode bits; not a lock */
/* client_session ops */ /* client_session ops */
enum { enum {

View file

@ -587,7 +587,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP; CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
if (IS_ERR(req)) if (IS_ERR(req))
return ERR_PTR(PTR_ERR(req)); return ERR_CAST(req);
req->r_dentry = dget(dentry); req->r_dentry = dget(dentry);
req->r_num_caps = 2; req->r_num_caps = 2;
/* we only need inode linkage */ /* we only need inode linkage */

View file

@ -133,7 +133,7 @@ static struct dentry *__cfh_to_dentry(struct super_block *sb,
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPHASH, req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPHASH,
USE_ANY_MDS); USE_ANY_MDS);
if (IS_ERR(req)) if (IS_ERR(req))
return ERR_PTR(PTR_ERR(req)); return ERR_CAST(req);
req->r_ino1 = vino; req->r_ino1 = vino;
req->r_ino2.ino = cfh->parent_ino; req->r_ino2.ino = cfh->parent_ino;

View file

@ -230,7 +230,7 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry,
/* do the open */ /* do the open */
req = prepare_open_request(dir->i_sb, flags, mode); req = prepare_open_request(dir->i_sb, flags, mode);
if (IS_ERR(req)) if (IS_ERR(req))
return ERR_PTR(PTR_ERR(req)); return ERR_CAST(req);
req->r_dentry = dget(dentry); req->r_dentry = dget(dentry);
req->r_num_caps = 2; req->r_num_caps = 2;
if (flags & O_CREAT) { if (flags & O_CREAT) {

View file

@ -69,7 +69,7 @@ struct inode *ceph_get_snapdir(struct inode *parent)
BUG_ON(!S_ISDIR(parent->i_mode)); BUG_ON(!S_ISDIR(parent->i_mode));
if (IS_ERR(inode)) if (IS_ERR(inode))
return ERR_PTR(PTR_ERR(inode)); return inode;
inode->i_mode = parent->i_mode; inode->i_mode = parent->i_mode;
inode->i_uid = parent->i_uid; inode->i_uid = parent->i_uid;
inode->i_gid = parent->i_gid; inode->i_gid = parent->i_gid;

View file

@ -1768,12 +1768,12 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
dout("do_request waiting\n"); dout("do_request waiting\n");
if (req->r_timeout) { if (req->r_timeout) {
err = (long)wait_for_completion_interruptible_timeout( err = (long)wait_for_completion_killable_timeout(
&req->r_completion, req->r_timeout); &req->r_completion, req->r_timeout);
if (err == 0) if (err == 0)
err = -EIO; err = -EIO;
} else { } else {
err = wait_for_completion_interruptible(&req->r_completion); err = wait_for_completion_killable(&req->r_completion);
} }
dout("do_request waited, got %d\n", err); dout("do_request waited, got %d\n", err);
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
@ -2014,16 +2014,21 @@ static void handle_forward(struct ceph_mds_client *mdsc,
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
req = __lookup_request(mdsc, tid); req = __lookup_request(mdsc, tid);
if (!req) { if (!req) {
dout("forward %llu to mds%d - req dne\n", tid, next_mds); dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
goto out; /* dup reply? */ goto out; /* dup reply? */
} }
if (fwd_seq <= req->r_num_fwd) { if (req->r_aborted) {
dout("forward %llu to mds%d - old seq %d <= %d\n", dout("forward tid %llu aborted, unregistering\n", tid);
__unregister_request(mdsc, req);
} else if (fwd_seq <= req->r_num_fwd) {
dout("forward tid %llu to mds%d - old seq %d <= %d\n",
tid, next_mds, req->r_num_fwd, fwd_seq); tid, next_mds, req->r_num_fwd, fwd_seq);
} else { } else {
/* resend. forward race not possible; mds would drop */ /* resend. forward race not possible; mds would drop */
dout("forward %llu to mds%d (we resend)\n", tid, next_mds); dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
BUG_ON(req->r_err);
BUG_ON(req->r_got_result);
req->r_num_fwd = fwd_seq; req->r_num_fwd = fwd_seq;
req->r_resend_mds = next_mds; req->r_resend_mds = next_mds;
put_request_session(req); put_request_session(req);
@ -2541,7 +2546,7 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
return; return;
lease = msg->front.iov_base; lease = msg->front.iov_base;
lease->action = action; lease->action = action;
lease->mask = cpu_to_le16(CEPH_LOCK_DN); lease->mask = cpu_to_le16(1);
lease->ino = cpu_to_le64(ceph_vino(inode).ino); lease->ino = cpu_to_le64(ceph_vino(inode).ino);
lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
lease->seq = cpu_to_le32(seq); lease->seq = cpu_to_le32(seq);
@ -2571,7 +2576,7 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
BUG_ON(inode == NULL); BUG_ON(inode == NULL);
BUG_ON(dentry == NULL); BUG_ON(dentry == NULL);
BUG_ON(mask != CEPH_LOCK_DN); BUG_ON(mask == 0);
/* is dentry lease valid? */ /* is dentry lease valid? */
spin_lock(&dentry->d_lock); spin_lock(&dentry->d_lock);

View file

@ -120,6 +120,12 @@ void ceph_msgr_exit(void)
destroy_workqueue(ceph_msgr_wq); destroy_workqueue(ceph_msgr_wq);
} }
void ceph_msgr_flush()
{
flush_workqueue(ceph_msgr_wq);
}
/* /*
* socket callback functions * socket callback functions
*/ */

View file

@ -213,6 +213,7 @@ extern int ceph_parse_ips(const char *c, const char *end,
extern int ceph_msgr_init(void); extern int ceph_msgr_init(void);
extern void ceph_msgr_exit(void); extern void ceph_msgr_exit(void);
extern void ceph_msgr_flush(void);
extern struct ceph_messenger *ceph_messenger_create( extern struct ceph_messenger *ceph_messenger_create(
struct ceph_entity_addr *myaddr); struct ceph_entity_addr *myaddr);

View file

@ -704,8 +704,11 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg) struct ceph_msg *msg)
{ {
int ret; int ret;
int was_auth = 0;
mutex_lock(&monc->mutex); mutex_lock(&monc->mutex);
if (monc->auth->ops)
was_auth = monc->auth->ops->is_authenticated(monc->auth);
monc->pending_auth = 0; monc->pending_auth = 0;
ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
msg->front.iov_len, msg->front.iov_len,
@ -716,7 +719,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
wake_up(&monc->client->auth_wq); wake_up(&monc->client->auth_wq);
} else if (ret > 0) { } else if (ret > 0) {
__send_prepared_auth_request(monc, ret); __send_prepared_auth_request(monc, ret);
} else if (monc->auth->ops->is_authenticated(monc->auth)) { } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
dout("authenticated, starting session\n"); dout("authenticated, starting session\n");
monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;

View file

@ -361,8 +361,13 @@ static void put_osd(struct ceph_osd *osd)
{ {
dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
atomic_read(&osd->o_ref) - 1); atomic_read(&osd->o_ref) - 1);
if (atomic_dec_and_test(&osd->o_ref)) if (atomic_dec_and_test(&osd->o_ref)) {
struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
if (osd->o_authorizer)
ac->ops->destroy_authorizer(ac, osd->o_authorizer);
kfree(osd); kfree(osd);
}
} }
/* /*

View file

@ -706,7 +706,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
len, *p, end); len, *p, end);
newcrush = crush_decode(*p, min(*p+len, end)); newcrush = crush_decode(*p, min(*p+len, end));
if (IS_ERR(newcrush)) if (IS_ERR(newcrush))
return ERR_PTR(PTR_ERR(newcrush)); return ERR_CAST(newcrush);
} }
/* new flags? */ /* new flags? */

View file

@ -669,9 +669,17 @@ static void ceph_destroy_client(struct ceph_client *client)
/* unmount */ /* unmount */
ceph_mdsc_stop(&client->mdsc); ceph_mdsc_stop(&client->mdsc);
ceph_monc_stop(&client->monc);
ceph_osdc_stop(&client->osdc); ceph_osdc_stop(&client->osdc);
/*
* make sure mds and osd connections close out before destroying
* the auth module, which is needed to free those connections'
* ceph_authorizers.
*/
ceph_msgr_flush();
ceph_monc_stop(&client->monc);
ceph_adjust_min_caps(-client->min_caps); ceph_adjust_min_caps(-client->min_caps);
ceph_debugfs_client_cleanup(client); ceph_debugfs_client_cleanup(client);
@ -738,7 +746,7 @@ static struct dentry *open_root_dentry(struct ceph_client *client,
dout("open_root_inode opening '%s'\n", path); dout("open_root_inode opening '%s'\n", path);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
if (IS_ERR(req)) if (IS_ERR(req))
return ERR_PTR(PTR_ERR(req)); return ERR_CAST(req);
req->r_path1 = kstrdup(path, GFP_NOFS); req->r_path1 = kstrdup(path, GFP_NOFS);
req->r_ino1.ino = CEPH_INO_ROOT; req->r_ino1.ino = CEPH_INO_ROOT;
req->r_ino1.snap = CEPH_NOSNAP; req->r_ino1.snap = CEPH_NOSNAP;

View file

@ -10,7 +10,6 @@
#include <linux/fs.h> #include <linux/fs.h>
#include <linux/mempool.h> #include <linux/mempool.h>
#include <linux/pagemap.h> #include <linux/pagemap.h>
#include <linux/slab.h>
#include <linux/wait.h> #include <linux/wait.h>
#include <linux/writeback.h> #include <linux/writeback.h>
#include <linux/slab.h> #include <linux/slab.h>

View file

@ -83,6 +83,8 @@ extern unsigned long wait_for_completion_timeout(struct completion *x,
unsigned long timeout); unsigned long timeout);
extern unsigned long wait_for_completion_interruptible_timeout( extern unsigned long wait_for_completion_interruptible_timeout(
struct completion *x, unsigned long timeout); struct completion *x, unsigned long timeout);
extern unsigned long wait_for_completion_killable_timeout(
struct completion *x, unsigned long timeout);
extern bool try_wait_for_completion(struct completion *x); extern bool try_wait_for_completion(struct completion *x);
extern bool completion_done(struct completion *x); extern bool completion_done(struct completion *x);

View file

@ -4053,6 +4053,23 @@ int __sched wait_for_completion_killable(struct completion *x)
} }
EXPORT_SYMBOL(wait_for_completion_killable); EXPORT_SYMBOL(wait_for_completion_killable);
/**
* wait_for_completion_killable_timeout: - waits for completion of a task (w/(to,killable))
* @x: holds the state of this particular completion
* @timeout: timeout value in jiffies
*
* This waits for either a completion of a specific task to be
* signaled or for a specified timeout to expire. It can be
* interrupted by a kill signal. The timeout is in jiffies.
*/
unsigned long __sched
wait_for_completion_killable_timeout(struct completion *x,
unsigned long timeout)
{
return wait_for_common(x, timeout, TASK_KILLABLE);
}
EXPORT_SYMBOL(wait_for_completion_killable_timeout);
/** /**
* try_wait_for_completion - try to decrement a completion without blocking * try_wait_for_completion - try to decrement a completion without blocking
* @x: completion structure * @x: completion structure