Patchwork [BUG:1303,1/2] cluster/pump: Dynamically control sink connect and disconnect.

login
register
Submitter Pavan Sondur
Date 2010-08-12 10:19:15
Message ID <20100812101915.GA6133@gluster.com>
Download mbox | patch
Permalink /patch/4089/
State Accepted
Headers show

Comments

Pavan Sondur - 2010-08-12 10:19:15
Signed-off-by: Pavan Vilas Sondur <pavan@gluster.com>
---
 xlators/cluster/afr/src/afr.h  |    2 +
 xlators/cluster/afr/src/pump.c |  459 ++++++++++++++++++++++++++++------------
 xlators/cluster/afr/src/pump.h |   32 ++--
 3 files changed, 348 insertions(+), 145 deletions(-)

Patch

diff --git a/xlators/cluster/afr/src/afr.h b/xlators/cluster/afr/src/afr.h
index 3fa987e..284eb7a 100644
--- a/xlators/cluster/afr/src/afr.h
+++ b/xlators/cluster/afr/src/afr.h
@@ -245,6 +245,8 @@  typedef struct _afr_local {
 	int32_t  inodelk_count;
 	int32_t  entrylk_count;
 
+        dict_t  *dict;
+
         int (*up_down_flush_cbk) (call_frame_t *, xlator_t *);
 
 	/* 
diff --git a/xlators/cluster/afr/src/pump.c b/xlators/cluster/afr/src/pump.c
index e69c028..b62b079 100644
--- a/xlators/cluster/afr/src/pump.c
+++ b/xlators/cluster/afr/src/pump.c
@@ -28,7 +28,47 @@ 
 
 #include "afr-common.c"
 
-pump_state_t
+static int
+pump_mark_start_pending (xlator_t *this)
+{
+        afr_private_t  *priv      = NULL;
+        pump_private_t *pump_priv = NULL;
+
+        priv      = this->private;
+        pump_priv = priv->pump_private;
+
+        pump_priv->pump_start_pending = 1;
+
+        return 0;
+}
+
+static int
+is_pump_start_pending (xlator_t *this)
+{
+        afr_private_t  *priv      = NULL;
+        pump_private_t *pump_priv = NULL;
+
+        priv      = this->private;
+        pump_priv = priv->pump_private;
+
+        return (pump_priv->pump_start_pending);
+}
+
+static int
+pump_remove_start_pending (xlator_t *this)
+{
+        afr_private_t  *priv      = NULL;
+        pump_private_t *pump_priv = NULL;
+
+        priv      = this->private;
+        pump_priv = priv->pump_private;
+
+        pump_priv->pump_start_pending = 0;
+
+        return 0;
+}
+
+static pump_state_t
 pump_get_state ()
 {
         xlator_t *this = NULL;
@@ -59,16 +99,11 @@  pump_change_state (xlator_t *this, pump_state_t state)
         pump_state_t state_old;
         pump_state_t state_new;
 
-	unsigned char *     child_up = NULL;
-        int i = 0;
-
 
         priv = this->private;
         pump_priv = priv->pump_private;
 
-	child_up = priv->child_up;
-
-        assert (pump_priv);
+        GF_ASSERT (pump_priv);
 
         LOCK (&pump_priv->pump_state_lock);
         {
@@ -77,48 +112,6 @@  pump_change_state (xlator_t *this, pump_state_t state)
 
                 pump_priv->pump_state = state;
 
-                switch (pump_priv->pump_state) {
-                case PUMP_STATE_RESUME:
-                case PUMP_STATE_RUNNING:
-                case PUMP_STATE_PAUSE:
-                {
-                        priv->pump_loaded = _gf_true;
-                        i = 1;
-
-                        child_up[i] = 1;
-
-                        LOCK (&priv->lock);
-                        {
-                                priv->up_count++;
-                        }
-                        UNLOCK (&priv->lock);
-
-                        break;
-                }
-                case PUMP_STATE_ABORT:
-                {
-                        priv->pump_loaded = _gf_false;
-                        i = 1;
-
-                        child_up[i] = 0;
-
-                        LOCK (&priv->lock);
-                        {
-                                priv->down_count++;
-                        }
-                        UNLOCK (&priv->lock);
-
-                        LOCK (&pump_priv->resume_path_lock);
-                        {
-                                pump_priv->number_files_pumped = 0;
-                        }
-                        UNLOCK (&pump_priv->resume_path_lock);
-
-
-                        break;
-                }
-
-                }
         }
         UNLOCK (&pump_priv->pump_state_lock);
 
@@ -338,67 +331,24 @@  is_pump_traversal_allowed (xlator_t *this, const char *path)
 }
 
 static int
-pump_update_file_stats (xlator_t *this, long source_blocks,
-                               long sink_blocks)
+pump_save_file_stats (xlator_t *this, const char *path)
 {
         afr_private_t  *priv        = NULL;
         pump_private_t *pump_priv   = NULL;
 
-        priv = this->private;
+        priv      = this->private;
         pump_priv = priv->pump_private;
 
         LOCK (&pump_priv->resume_path_lock);
         {
-                pump_priv->source_blocks = source_blocks;
-                pump_priv->sink_blocks   = sink_blocks;
-        }
-        UNLOCK (&pump_priv->resume_path_lock);
-
-        return 0;
-}
-
-static int
-pump_save_file_stats (xlator_t *this)
-{
-        afr_private_t  *priv        = NULL;
-        struct statvfs  source_buf  = {0, };
-        struct statvfs  sink_buf    = {0, };
-        loc_t loc;
-        int ret = -1;
-
-        priv = this->private;
-
-        assert (priv->root_inode);
-
-        build_root_loc (priv->root_inode, &loc);
-
-        ret = syncop_statfs (PUMP_SOURCE_CHILD (this),
-                             &loc, &source_buf);
-        if (ret < 0) {
-                gf_log (this->name, GF_LOG_DEBUG,
-                        "source statfs failed");
-        } else {
-                gf_log (this->name, GF_LOG_DEBUG,
-                        "source statfs succeeded");
-        }
+                pump_priv->number_files_pumped++;
 
-
-        ret = syncop_statfs (PUMP_SOURCE_CHILD (this),
-                             &loc, &sink_buf);
-        if (ret < 0) {
-                gf_log (this->name, GF_LOG_DEBUG,
-                        "sink statfs failed");
-        } else {
-                gf_log (this->name, GF_LOG_DEBUG,
-                        "sink statfs succeeded");
+                strncpy (pump_priv->current_file, path,
+                         PATH_MAX);
         }
-
-        pump_update_file_stats (this,
-                                source_buf.f_blocks,
-                                sink_buf.f_blocks);
+        UNLOCK (&pump_priv->resume_path_lock);
 
         return 0;
-
 }
 static int
 pump_save_path (xlator_t *this, const char *path)
@@ -435,16 +385,6 @@  pump_save_path (xlator_t *this, const char *path)
                         "setxattr succeeded - saved path=%s", path);
                 gf_log (this->name, GF_LOG_DEBUG,
                         "Saving path for status info");
-
-                LOCK (&pump_priv->resume_path_lock);
-                {
-                        pump_priv->number_files_pumped++;
-
-                        strncpy (pump_priv->current_file, path,
-                                 PATH_MAX);
-                }
-                UNLOCK (&pump_priv->resume_path_lock);
-
         }
 
         dict_unref (dict);
@@ -534,7 +474,7 @@  gf_pump_traverse_directory (loc_t *loc)
                         if (!IS_ENTRY_CWD(entry->d_name) &&
                             !IS_ENTRY_PARENT (entry->d_name)) {
                                 pump_save_path (this, entry_loc.path);
-                                pump_save_file_stats (this);
+                                pump_save_file_stats (this, entry_loc.path);
                         }
 
                         ret = pump_check_and_update_status (this);
@@ -726,19 +666,16 @@  pump_task_completion (int ret, void *data)
 }
 
 int
-pump_start (call_frame_t *frame, xlator_t *this)
+pump_start (call_frame_t *pump_frame, xlator_t *this)
 {
 	afr_private_t *priv = NULL;
 	pump_private_t *pump_priv = NULL;
-        call_frame_t *pump_frame = NULL;
 
 	int ret = -1;
 
 	priv = this->private;
         pump_priv = priv->pump_private;
 
-        pump_frame = copy_frame (frame);
-
         if (!pump_frame->root->lk_owner)
                 pump_frame->root->lk_owner = PUMP_LK_OWNER;
 
@@ -782,6 +719,212 @@  is_pump_loaded (xlator_t *this)
 
 }
 
+static int
+pump_start_synctask (xlator_t *this)
+{
+        call_frame_t *frame = NULL;
+        int ret = 0;
+
+        frame = create_frame (this, this->ctx->pool);
+        if (!frame) {
+                gf_log (this->name, GF_LOG_ERROR,
+                        "Out of memory");
+                ret = -1;
+                goto out;
+        }
+
+        pump_change_state (this, PUMP_STATE_RUNNING);
+
+        ret = pump_start (frame, this);
+
+out:
+        return ret;
+}
+
+int32_t
+pump_cmd_start_setxattr_cbk (call_frame_t *frame,
+                             void *cookie,
+                             xlator_t *this,
+                             int32_t op_ret,
+                             int32_t op_errno)
+
+{
+        afr_local_t *local = NULL;
+        int ret = 0;
+
+        local = frame->local;
+
+        if (op_ret < 0) {
+                gf_log (this->name, GF_LOG_ERROR,
+                        "Could not initiate destination "
+                        "brick connect");
+                ret = op_ret;
+                goto out;
+        }
+
+        gf_log (this->name, GF_LOG_DEBUG,
+                "Successfully initiated destination "
+                "brick connect");
+
+        pump_mark_start_pending (this);
+
+out:
+        local->op_ret = ret;
+        pump_command_reply (frame, this);
+
+        return 0;
+}
+
+static int
+pump_initiate_sink_connect (call_frame_t *frame, xlator_t *this)
+{
+        afr_local_t   *local     = NULL;
+        afr_private_t *priv      = NULL;
+        dict_t        *dict      = NULL;
+        char          *dst_brick = NULL;
+        loc_t loc;
+
+        int ret = 0;
+
+        priv  = this->private;
+        local = frame->local;
+
+        GF_ASSERT (priv->root_inode);
+
+        build_root_loc (priv->root_inode, &loc);
+
+        ret = dict_get_str (local->dict, PUMP_CMD_START, &dst_brick);
+        if (ret < 0) {
+                gf_log (this->name, GF_LOG_ERROR,
+                        "Could not get destination brick value");
+                goto out;
+        }
+
+        dict = dict_new ();
+        if (!dict) {
+                gf_log (this->name, GF_LOG_ERROR,
+                        "Out of memory");
+                ret = -1;
+                goto out;
+        }
+
+        GF_ASSERT (dst_brick);
+        gf_log (this->name, GF_LOG_DEBUG,
+                "Got destination brick as %s", dst_brick);
+
+        ret = dict_set_str (dict, CLIENT_CMD_CONNECT, dst_brick);
+        if (ret < 0) {
+                gf_log (this->name, GF_LOG_ERROR,
+                        "Could not inititiate destination brick "
+                        "connect");
+                goto out;
+        }
+
+	STACK_WIND (frame,
+		    pump_cmd_start_setxattr_cbk,
+		    PUMP_SINK_CHILD(this),
+		    PUMP_SINK_CHILD(this)->fops->setxattr,
+		    &loc,
+		    dict,
+		    0);
+
+        ret = 0;
+
+        dict_unref (dict);
+out:
+        return ret;
+}
+
+int32_t
+pump_cmd_abort_setxattr_cbk (call_frame_t *frame,
+                             void *cookie,
+                             xlator_t *this,
+                             int32_t op_ret,
+                             int32_t op_errno)
+
+{
+        afr_local_t *local = NULL;
+        int ret = 0;
+
+        local = frame->local;
+
+        if (op_ret < 0) {
+                gf_log (this->name, GF_LOG_ERROR,
+                        "Could not initiate destination "
+                        "brick disconnect");
+                ret = op_ret;
+                goto out;
+        }
+
+        gf_log (this->name, GF_LOG_DEBUG,
+                "Successfully initiated destination "
+                "brick disconnect");
+        ret = 0;
+
+out:
+        local->op_ret = ret;
+        pump_command_reply (frame, this);
+        return 0;
+}
+
+static int
+pump_initiate_sink_disconnect (call_frame_t *frame, xlator_t *this)
+{
+        afr_local_t   *local     = NULL;
+        afr_private_t *priv      = NULL;
+        dict_t        *dict      = NULL;
+        loc_t loc;
+
+        int ret = 0;
+
+        priv  = this->private;
+        local = frame->local;
+
+        GF_ASSERT (priv->root_inode);
+
+        build_root_loc (priv->root_inode, &loc);
+
+        dict = dict_new ();
+        if (!dict) {
+                gf_log (this->name, GF_LOG_ERROR,
+                        "Out of memory");
+                ret = -1;
+                goto out;
+        }
+
+        ret = dict_set_str (dict, CLIENT_CMD_DISCONNECT, "jargon");
+        if (ret < 0) {
+                gf_log (this->name, GF_LOG_ERROR,
+                        "Could not inititiate destination brick "
+                        "disconnect");
+                goto out;
+        }
+
+	STACK_WIND (frame,
+		    pump_cmd_abort_setxattr_cbk,
+		    PUMP_SINK_CHILD(this),
+		    PUMP_SINK_CHILD(this)->fops->setxattr,
+		    &loc,
+		    dict,
+		    0);
+
+        ret = 0;
+
+        dict_unref (dict);
+out:
+        return ret;
+}
+
+static int
+is_pump_aborted (xlator_t *this)
+{
+        pump_state_t state;
+
+        state = pump_get_state ();
+
+        return ((state == PUMP_STATE_ABORT));
+}
+
 int32_t
 pump_cmd_start_getxattr_cbk (call_frame_t *frame,
                              void *cookie,
@@ -795,14 +938,17 @@  pump_cmd_start_getxattr_cbk (call_frame_t *frame,
 
         pump_state_t state;
         int ret = 0;
+        int need_unwind = 0;
         int dict_ret = -1;
 
         local = frame->local;
 
         if (op_ret < 0) {
                 gf_log (this->name, GF_LOG_DEBUG,
-                        "getxattr failed - changing pump state to RUNNING with '/'");
+                        "getxattr failed - changing pump "
+                        "state to RUNNING with '/'");
                 path = "/";
+                ret = op_ret;
         } else {
                 gf_log (this->name, GF_LOG_TRACE,
                         "getxattr succeeded");
@@ -822,13 +968,22 @@  pump_cmd_start_getxattr_cbk (call_frame_t *frame,
         }
 
         pump_set_resume_path (this, path);
-        pump_change_state (this, PUMP_STATE_RUNNING);
 
-        ret = pump_start (frame, this);
+        if (is_pump_aborted (this))
+                /* We're re-starting pump afresh */
+                ret = pump_initiate_sink_connect (frame, this);
+        else {
+                /* We're re-starting pump from a previous
+                   pause */
+                ret = pump_start_synctask (this);
+                need_unwind = 1;
+        }
 
 out:
-        local->op_ret = ret;
-        pump_command_reply (frame, this);
+        if ((ret < 0) || (need_unwind == 1)) {
+                local->op_ret = ret;
+                pump_command_reply (frame, this);
+        }
 	return 0;
 }
 
@@ -924,13 +1079,14 @@  pump_execute_start (call_frame_t *frame, xlator_t *this)
         local = frame->local;
 
         if (!priv->root_inode) {
-                gf_log (this->name, GF_LOG_NORMAL,
-                        "Pump xlator cannot be started without an initial lookup");
+                gf_log (this->name, GF_LOG_ERROR,
+                        "Pump xlator cannot be started without an initial "
+                        "lookup");
                 ret = -1;
                 goto out;
         }
 
-        assert (priv->root_inode);
+        GF_ASSERT (priv->root_inode);
 
         build_root_loc (priv->root_inode, &loc);
 
@@ -960,6 +1116,7 @@  pump_cmd_abort_removexattr_cbk (call_frame_t *frame,
                                 int32_t op_errno)
 {
         afr_local_t *local = NULL;
+        int ret = 0;
 
         local = frame->local;
 
@@ -967,16 +1124,23 @@  pump_cmd_abort_removexattr_cbk (call_frame_t *frame,
                 gf_log (this->name, GF_LOG_ERROR,
                         "Aborting pump failed. Please remove xattr"
                         PUMP_PATH "of the source child's '/'");
-                local->op_ret = -1;
-        } else {
-                gf_log (this->name, GF_LOG_DEBUG,
-                "remove xattr succeeded");
-                local->op_ret = 0;
+                ret = op_ret;
+                goto out;
         }
 
+        gf_log (this->name, GF_LOG_DEBUG,
+                "remove xattr succeeded");
+
+
         pump_change_state (this, PUMP_STATE_ABORT);
+        ret = pump_initiate_sink_disconnect (frame, this);
+
+out:
+        if (ret < 0) {
+                local->op_ret = ret;
+                pump_command_reply (frame, this);
+        }
 
-        pump_command_reply (frame, this);
 	return 0;
 }
 
@@ -1000,7 +1164,7 @@  pump_execute_abort (call_frame_t *frame, xlator_t *this)
                 goto out;
         }
 
-        assert (priv->root_inode);
+        GF_ASSERT (priv->root_inode);
 
         build_root_loc (priv->root_inode, &root_loc);
 
@@ -1446,6 +1610,8 @@  pump_command_reply (call_frame_t *frame, xlator_t *this)
                 gf_log (this->name, GF_LOG_NORMAL,
                         "Command succeeded");
 
+        dict_unref (local->dict);
+
         AFR_STACK_UNWIND (setxattr,
                           frame,
                           local->op_ret,
@@ -1463,14 +1629,17 @@  pump_parse_command (call_frame_t *frame, xlator_t *this,
 
         if (pump_command_start (this, dict)) {
                 frame->local = local;
+                local->dict = dict_ref (dict);
                 ret = pump_execute_start (frame, this);
 
         } else if (pump_command_pause (this, dict)) {
                 frame->local = local;
+                local->dict = dict_ref (dict);
                 ret = pump_execute_pause (frame, this);
 
         } else if (pump_command_abort (this, dict)) {
                 frame->local = local;
+                local->dict = dict_ref (dict);
                 ret = pump_execute_abort (frame, this);
         }
         return ret;
@@ -1566,19 +1735,47 @@  mem_acct_init (xlator_t *this)
         return ret;
 }
 
+static int
+is_xlator_pump_sink (xlator_t *child)
+{
+        return (child == PUMP_SINK_CHILD(THIS));
+}
+
+static int
+is_xlator_pump_source (xlator_t *child)
+{
+        return (child == PUMP_SOURCE_CHILD(THIS));
+}
+
 int32_t
 notify (xlator_t *this, int32_t event,
 	void *data, ...)
 {
         int ret = -1;
+        xlator_t *child_xl = NULL;
+
+        child_xl = (xlator_t *) data;
+
+        ret = afr_notify (this, event, data);
 
 	switch (event) {
 	case GF_EVENT_CHILD_DOWN:
-                pump_change_state (this, PUMP_STATE_ABORT);
+                if (is_xlator_pump_source (child_xl))
+                        pump_change_state (this, PUMP_STATE_ABORT);
                 break;
-        }
 
-        ret = afr_notify (this, event, data);
+        case GF_EVENT_CHILD_UP:
+                if (is_xlator_pump_sink (child_xl))
+                        if (is_pump_start_pending (this)) {
+                                ret = pump_start_synctask (this);
+                                if (ret < 0)
+                                        gf_log (this->name, GF_LOG_DEBUG,
+                                                "Could not start pump "
+                                                "synctask");
+                                else
+                                        pump_remove_start_pending (this);
+                        }
+        }
 
         return ret;
 }
diff --git a/xlators/cluster/afr/src/pump.h b/xlators/cluster/afr/src/pump.h
index 1579900..e786fb0 100644
--- a/xlators/cluster/afr/src/pump.h
+++ b/xlators/cluster/afr/src/pump.h
@@ -22,6 +22,10 @@ 
 
 #include "syncop.h"
 
+/* FIXME: Needs to be defined in a common file */
+#define CLIENT_CMD_CONNECT "trusted.glusterfs.client-connect"
+#define CLIENT_CMD_DISCONNECT "trusted.glusterfs.client-disconnect"
+
 #define PUMP_PID 696969
 #define PUMP_LK_OWNER 696969
 
@@ -43,23 +47,23 @@ 
 #define PUMP_SINK_CHILD(xl) (xl->children->next->xlator)
 
 typedef enum {
-        PUMP_STATE_RUNNING,
-        PUMP_STATE_RESUME,
-        PUMP_STATE_PAUSE,
-        PUMP_STATE_ABORT,
+        PUMP_STATE_RUNNING,             /* Pump is running and migrating files */
+        PUMP_STATE_RESUME,              /* Pump is resuming from a previous pause */
+        PUMP_STATE_PAUSE,               /* Pump is paused */
+        PUMP_STATE_ABORT,               /* Pump is aborted */
 } pump_state_t;
 
 typedef struct _pump_private {
-	struct syncenv *env;
-        const char *resume_path;
-        gf_lock_t resume_path_lock;
-        gf_lock_t pump_state_lock;
-        pump_state_t pump_state;
-        long source_blocks;
-        long sink_blocks;
-        char current_file[PATH_MAX];
-        uint64_t number_files_pumped;
-        gf_boolean_t pump_finished;
+	struct syncenv *env;            /* The env pointer to the pump synctask */
+        const char *resume_path;        /* path to resume from the last pause */
+        gf_lock_t resume_path_lock;     /* Synchronize resume_path changes */
+        gf_lock_t pump_state_lock;      /* Synchronize pump_state changes */
+        pump_state_t pump_state;        /* State of pump */
+        char current_file[PATH_MAX];    /* Current file being pumped */
+        uint64_t number_files_pumped;   /* Number of files pumped */
+        gf_boolean_t pump_finished;     /* Boolean to indicate pump termination */
+        char pump_start_pending;        /* Boolean to mark start pending until
+                                           CHILD_UP */
 } pump_private_t;
 
 void