diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c index e15fc7d50827..6cdeaa76f27f 100644 --- a/fs/ocfs2/dlmglue.c +++ b/fs/ocfs2/dlmglue.c @@ -248,6 +248,10 @@ static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { .flags = 0, }; +static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { + .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, +}; + static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { .get_osb = ocfs2_get_dentry_osb, .post_unlock = ocfs2_dentry_post_unlock, @@ -637,6 +641,19 @@ static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, &ocfs2_nfs_sync_lops, osb); } +static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, + struct ocfs2_super *osb) +{ + struct ocfs2_orphan_scan_lvb *lvb; + + ocfs2_lock_res_init_once(res); + ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); + ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, + &ocfs2_orphan_scan_lops, osb); + lvb = ocfs2_dlm_lvb(&res->l_lksb); + lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; +} + void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, struct ocfs2_file_private *fp) { @@ -2352,6 +2369,37 @@ void ocfs2_inode_unlock(struct inode *inode, mlog_exit_void(); } +int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno, int ex) +{ + struct ocfs2_lock_res *lockres; + struct ocfs2_orphan_scan_lvb *lvb; + int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; + int status = 0; + + lockres = &osb->osb_orphan_scan.os_lockres; + status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); + if (status < 0) + return status; + + lvb = ocfs2_dlm_lvb(&lockres->l_lksb); + if (lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) + *seqno = be32_to_cpu(lvb->lvb_os_seqno); + return status; +} + +void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno, int ex) +{ + struct ocfs2_lock_res *lockres; + struct ocfs2_orphan_scan_lvb *lvb; + int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; + + lockres = &osb->osb_orphan_scan.os_lockres; + lvb = ocfs2_dlm_lvb(&lockres->l_lksb); + lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; + lvb->lvb_os_seqno = cpu_to_be32(seqno); + ocfs2_cluster_unlock(osb, lockres, level); +} + int ocfs2_super_lock(struct ocfs2_super *osb, int ex) { @@ -2842,6 +2890,7 @@ local: ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); + ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); osb->cconn = conn; @@ -2878,6 +2927,7 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb, ocfs2_lock_res_free(&osb->osb_super_lockres); ocfs2_lock_res_free(&osb->osb_rename_lockres); ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); + ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); ocfs2_cluster_disconnect(osb->cconn, hangup_pending); osb->cconn = NULL; @@ -3061,6 +3111,7 @@ static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); + ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); } int ocfs2_drop_inode_locks(struct inode *inode) diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h index e1fd5721cd7f..31b90d7b8f51 100644 --- a/fs/ocfs2/dlmglue.h +++ b/fs/ocfs2/dlmglue.h @@ -62,6 +62,14 @@ struct ocfs2_qinfo_lvb { __be32 lvb_free_entry; }; +#define OCFS2_ORPHAN_LVB_VERSION 1 + +struct ocfs2_orphan_scan_lvb { + __u8 lvb_version; + __u8 lvb_reserved[3]; + __be32 lvb_os_seqno; +}; + /* ocfs2_inode_lock_full() 'arg_flags' flags */ /* don't wait on recovery. */ #define OCFS2_META_LOCK_RECOVERY (0x01) @@ -113,6 +121,9 @@ int ocfs2_super_lock(struct ocfs2_super *osb, int ex); void ocfs2_super_unlock(struct ocfs2_super *osb, int ex); +int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno, int ex); +void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno, int ex); + int ocfs2_rename_lock(struct ocfs2_super *osb); void ocfs2_rename_unlock(struct ocfs2_super *osb); int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex); diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c index a20a0f1e37fd..44ed768782ed 100644 --- a/fs/ocfs2/journal.c +++ b/fs/ocfs2/journal.c @@ -28,6 +28,8 @@ #include #include #include +#include +#include #define MLOG_MASK_PREFIX ML_JOURNAL #include @@ -52,6 +54,8 @@ DEFINE_SPINLOCK(trans_inc_lock); +#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000 + static int ocfs2_force_read_journal(struct inode *inode); static int ocfs2_recover_node(struct ocfs2_super *osb, int node_num, int slot_num); @@ -1841,6 +1845,109 @@ bail: return status; } +/* + * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some + * randomness to the timeout to minimize multple nodes firing the timer at the + * same time. + */ +static inline unsigned long ocfs2_orphan_scan_timeout(void) +{ + unsigned long time; + + get_random_bytes(&time, sizeof(time)); + time = ORPHAN_SCAN_SCHEDULE_TIMEOUT + (time % 5000); + return msecs_to_jiffies(time); +} + +/* + * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for + * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This + * is done to catch any orphans that are left over in orphan directories. + * + * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT + * seconds. It gets an EX lock on os_lockres and checks sequence number + * stored in LVB. If the sequence number has changed, it means some other + * node has done the scan. This node skips the scan and tracks the + * sequence number. If the sequence number didn't change, it means a scan + * hasn't happened. The node queues a scan and increments the + * sequence number in the LVB. + */ +void ocfs2_queue_orphan_scan(struct ocfs2_super *osb) +{ + struct ocfs2_orphan_scan *os; + int status, i; + u32 seqno = 0; + + os = &osb->osb_orphan_scan; + + status = ocfs2_orphan_scan_lock(osb, &seqno, DLM_LOCK_EX); + if (status < 0) { + if (status != -EAGAIN) + mlog_errno(status); + goto out; + } + + if (os->os_seqno != seqno) { + os->os_seqno = seqno; + goto unlock; + } + + for (i = 0; i < osb->max_slots; i++) + ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL, + NULL); + /* + * We queued a recovery on orphan slots, increment the sequence + * number and update LVB so other node will skip the scan for a while + */ + seqno++; +unlock: + ocfs2_orphan_scan_unlock(osb, seqno, DLM_LOCK_EX); +out: + return; +} + +/* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */ +void ocfs2_orphan_scan_work(struct work_struct *work) +{ + struct ocfs2_orphan_scan *os; + struct ocfs2_super *osb; + + os = container_of(work, struct ocfs2_orphan_scan, + os_orphan_scan_work.work); + osb = os->os_osb; + + mutex_lock(&os->os_lock); + ocfs2_queue_orphan_scan(osb); + schedule_delayed_work(&os->os_orphan_scan_work, + ocfs2_orphan_scan_timeout()); + mutex_unlock(&os->os_lock); +} + +void ocfs2_orphan_scan_stop(struct ocfs2_super *osb) +{ + struct ocfs2_orphan_scan *os; + + os = &osb->osb_orphan_scan; + mutex_lock(&os->os_lock); + cancel_delayed_work(&os->os_orphan_scan_work); + mutex_unlock(&os->os_lock); +} + +int ocfs2_orphan_scan_init(struct ocfs2_super *osb) +{ + struct ocfs2_orphan_scan *os; + + os = &osb->osb_orphan_scan; + os->os_osb = osb; + mutex_init(&os->os_lock); + + INIT_DELAYED_WORK(&os->os_orphan_scan_work, + ocfs2_orphan_scan_work); + schedule_delayed_work(&os->os_orphan_scan_work, + ocfs2_orphan_scan_timeout()); + return 0; +} + struct ocfs2_orphan_filldir_priv { struct inode *head; struct ocfs2_super *osb; diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h index eb7b76331eb7..61045eeb3f6e 100644 --- a/fs/ocfs2/journal.h +++ b/fs/ocfs2/journal.h @@ -144,6 +144,10 @@ static inline void ocfs2_inode_set_new(struct ocfs2_super *osb, } /* Exported only for the journal struct init code in super.c. Do not call. */ +int ocfs2_orphan_scan_init(struct ocfs2_super *osb); +void ocfs2_orphan_scan_stop(struct ocfs2_super *osb); +void ocfs2_orphan_scan_exit(struct ocfs2_super *osb); + void ocfs2_complete_recovery(struct work_struct *work); void ocfs2_wait_for_recovery(struct ocfs2_super *osb); diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h index 1386281950db..1fde52c96d25 100644 --- a/fs/ocfs2/ocfs2.h +++ b/fs/ocfs2/ocfs2.h @@ -151,6 +151,14 @@ struct ocfs2_lock_res { #endif }; +struct ocfs2_orphan_scan { + struct mutex os_lock; + struct ocfs2_super *os_osb; + struct ocfs2_lock_res os_lockres; /* lock to synchronize scans */ + struct delayed_work os_orphan_scan_work; + u32 os_seqno; /* incremented on every scan */ +}; + struct ocfs2_dlm_debug { struct kref d_refcnt; struct dentry *d_locking_state; @@ -341,6 +349,8 @@ struct ocfs2_super unsigned int *osb_orphan_wipes; wait_queue_head_t osb_wipe_event; + struct ocfs2_orphan_scan osb_orphan_scan; + /* used to protect metaecc calculation check of xattr. */ spinlock_t osb_xattr_lock; diff --git a/fs/ocfs2/ocfs2_lockid.h b/fs/ocfs2/ocfs2_lockid.h index a53ce87481bf..fcdba091af3d 100644 --- a/fs/ocfs2/ocfs2_lockid.h +++ b/fs/ocfs2/ocfs2_lockid.h @@ -48,6 +48,7 @@ enum ocfs2_lock_type { OCFS2_LOCK_TYPE_FLOCK, OCFS2_LOCK_TYPE_QINFO, OCFS2_LOCK_TYPE_NFS_SYNC, + OCFS2_LOCK_TYPE_ORPHAN_SCAN, OCFS2_NUM_LOCK_TYPES }; @@ -85,6 +86,9 @@ static inline char ocfs2_lock_type_char(enum ocfs2_lock_type type) case OCFS2_LOCK_TYPE_NFS_SYNC: c = 'Y'; break; + case OCFS2_LOCK_TYPE_ORPHAN_SCAN: + c = 'P'; + break; default: c = '\0'; } @@ -104,6 +108,7 @@ static char *ocfs2_lock_type_strings[] = { [OCFS2_LOCK_TYPE_OPEN] = "Open", [OCFS2_LOCK_TYPE_FLOCK] = "Flock", [OCFS2_LOCK_TYPE_QINFO] = "Quota", + [OCFS2_LOCK_TYPE_ORPHAN_SCAN] = "OrphanScan", }; static inline const char *ocfs2_lock_type_string(enum ocfs2_lock_type type) diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c index 79ff8d9d37e0..44ac27e2d1f5 100644 --- a/fs/ocfs2/super.c +++ b/fs/ocfs2/super.c @@ -1802,6 +1802,8 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err) ocfs2_truncate_log_shutdown(osb); + ocfs2_orphan_scan_stop(osb); + /* This will disable recovery and flush any recovery work. */ ocfs2_recovery_exit(osb); @@ -1957,6 +1959,13 @@ static int ocfs2_initialize_super(struct super_block *sb, goto bail; } + status = ocfs2_orphan_scan_init(osb); + if (status) { + mlog(ML_ERROR, "Unable to initialize delayed orphan scan\n"); + mlog_errno(status); + goto bail; + } + init_waitqueue_head(&osb->checkpoint_event); atomic_set(&osb->needs_checkpoint, 0);