This thread contains a patchset. You're looking at the original emails,
but you may wish to use the patch review UI.
Review patch
2
2
[PATCH] Snapshot: remove internal state
This commit remove internal state for sending snapshot
It also allow to use snapshot term if commit index is equal to snapshot index
And also avoid to retrieve last term from log if current log is 0
Signed-off-by: Michael Bonfils <mbonfils@scaleway.com>
---
election.c | 5 ++++-
libfloat.h | 14 ++------------
log.c | 17 +++++++++++------
node.h | 1 -
raft.c | 13 +------------
snapshot.c | 26 +-------------------------
6 files changed, 19 insertions(+), 57 deletions(-)
diff --git a/election.c b/election.c
index ef9bff9..d9db208 100644
--- a/election.c
+++ b/election.c
@@ -45,7 +45,10 @@ void libfloat_election_start(libfloat_ctx_t *ctx)
ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout;
ctx->timeout_elapsed = 0;
- libfloat_get_last_term(ctx, &last_id, &last_term);
+ if (ctx->persistent.commit_index > 0)
+ {
+ libfloat_get_last_term(ctx, &last_id, &last_term);
+ }
/* Send a vote request to each node of the cluster */
for_every_node(ctx, node, {
diff --git a/libfloat.h b/libfloat.h
index 1b0ecf0..b4aaf1a 100644
--- a/libfloat.h
+++ b/libfloat.h
@@ -112,12 +112,11 @@ struct libfloat_ctx_s {
bool (*send_snapshot)(struct libfloat_ctx_s *, libfloat_node_t *);
/*!
- * \brief Abort a Snapshot currently send to a specific node
+ * \brief Abort every Snapshot currently send
*
* \param[in] struct libfloat_ctx_s * - Cluster context
- * \param[in] libfloat_node_t * - Node to send the abort snapshot request to
*/
- bool (*abort_snapshot)(struct libfloat_ctx_s *, libfloat_node_t *);
+ bool (*abort_send_snapshot)(struct libfloat_ctx_s *);
/*!
* \brief Write (and apply) a log to persistent storage
@@ -380,15 +379,6 @@ void libfloat_request_vote_response(libfloat_ctx_t *ctx, libfloat_rpc_response_v
*/
void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success);
-/*!
- * \brief Callback to update sending snapshot state
- *
- * \param[in] ctx libfloat context
- * \param[in] node node ID
- * \param[in] status true or false
- */
-void libfloat_snapshot_status_update(libfloat_ctx_t *ctx, libfloat_node_id_t id, bool status);
-
/*!
* \brief Set user data for a node
*
diff --git a/log.c b/log.c
index 1eeb566..d877a54 100644
--- a/log.c
+++ b/log.c
@@ -272,7 +272,6 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
ctx->persistent.snapshot.index
);
node->snapshot_count = 0;
- node->snapshot_in_progress = true;
ctx->send_snapshot(ctx, node);
/* check that there is not already a snapshot in progress */
return;
@@ -283,9 +282,6 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
goto end;
}
- /* reset snapshot flag */
- node->snapshot_in_progress = false;
-
if (node->next_log_to_send <= ctx->persistent.commit_index)
{
@@ -760,10 +756,19 @@ bool libfloat_get_last_term(libfloat_ctx_t *ctx, libfloat_entry_id_t *id, libflo
iterator = kh_get(libfloat_entry_id_t, ctx->persistent.log, i);
if (iterator == kh_end(ctx->persistent.log))
{
- /* NOT FOUND, NEED TO LOAD LOG */
if (!libfloat_get_log_from_db(ctx, i, &log))
{
- ERROR(ctx, "Log %u not found while retrieving last term", i);
+ if (ctx->persistent.commit_index == ctx->persistent.snapshot.index)
+ {
+ DEBUG(ctx, "Retrieve last term from snapshot");
+ if (term != NULL)
+ *term = ctx->persistent.snapshot.term;
+ if (id != NULL)
+ *id = i;
+ return true;
+ }
+
+ ERROR(ctx, "Log %u not found while retrieving last term (snapshot log %u)", i, ctx->persistent.snapshot.index);
return false;
}
}
diff --git a/node.h b/node.h
index cd6f666..a91b4ef 100644
--- a/node.h
+++ b/node.h
@@ -9,7 +9,6 @@ typedef struct {
uint8_t has_voted_for_me : 1;
uint8_t is_up_to_date : 1;
- uint8_t snapshot_in_progress : 1;
void *udata; /*!< User data */
time_t last_update; /*!< Time of the last AE response (If I am the leader) */
diff --git a/raft.c b/raft.c
index 4dba9e1..1b651c4 100644
--- a/raft.c
+++ b/raft.c
@@ -34,19 +34,8 @@ void libfloat_become_candidate(libfloat_ctx_t *ctx)
void libfloat_become_follower(libfloat_ctx_t *ctx)
{
- libfloat_node_t *node = NULL;
-
DEBUG(ctx, "Becoming follower");
- for_every_node(ctx, node, {
- if (ctx->me == node)
- continue;
-
- if (node->snapshot_in_progress)
- {
- ctx->abort_snapshot(ctx, node);
- node->snapshot_in_progress = false;
- }
- });
+ ctx->abort_send_snapshot(ctx);
ctx->state = RAFT_STATE_FOLLOWER;
ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout;
libfloat_vote_for(ctx, 0);
diff --git a/snapshot.c b/snapshot.c
index 2ca1ab2..d9025d3 100644
--- a/snapshot.c
+++ b/snapshot.c
@@ -110,8 +110,6 @@ static void libfloat_internal_snapshot_apply_log(libfloat_ctx_t *ctx)
void libfloat_internal_snapshot_begin(libfloat_ctx_t *ctx, libfloat_entry_id_t id, libfloat_term_t term)
{
- libfloat_node_t *node = NULL;
-
if (ctx->snapshot == NULL)
{
/* Implementation is missing a snapshot logic, let's quit here */
@@ -120,16 +118,7 @@ void libfloat_internal_snapshot_begin(libfloat_ctx_t *ctx, libfloat_entry_id_t i
return;
}
- for_every_node(ctx, node, {
- if (ctx->me == node)
- continue;
-
- if (node->snapshot_in_progress)
- {
- ctx->abort_snapshot(ctx, node);
- node->snapshot_in_progress = false;
- }
- });
+ ctx->abort_send_snapshot(ctx);
if (id == ctx->persistent.snapshot.index + 1 && term == ctx->persistent.snapshot.term)
{
@@ -197,16 +186,3 @@ void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success)
libfloat_internal_snapshot_apply_log(ctx);
ctx->is_snapshotting = false;
}
-
-void libfloat_snapshot_status_update(libfloat_ctx_t *ctx, libfloat_node_id_t id, bool status)
-{
- libfloat_node_t *node = NULL;
- khint_t iterator;
-
- iterator = kh_get(libfloat_node_id_t, ctx->nodes, id);
- if (iterator != kh_end(ctx->nodes))
- {
- node = kh_val(ctx->nodes, iterator);
- node->snapshot_in_progress = status;
- }
-}
--
2.25.1
LG
On 21/11/29 01:36PM, Michael Bonfils wrote:
> This commit remove internal state for sending snapshot
> It also allow to use snapshot term if commit index is equal to snapshot index
> And also avoid to retrieve last term from log if current log is 0
>
> Signed-off-by: Michael Bonfils <mbonfils@scaleway.com>
> ---
> election.c | 5 ++++-
> libfloat.h | 14 ++------------
> log.c | 17 +++++++++++------
> node.h | 1 -
> raft.c | 13 +------------
> snapshot.c | 26 +-------------------------
> 6 files changed, 19 insertions(+), 57 deletions(-)
>
> diff --git a/election.c b/election.c
> index ef9bff9..d9db208 100644
> --- a/election.c
> +++ b/election.c
> @@ -45,7 +45,10 @@ void libfloat_election_start(libfloat_ctx_t *ctx)
> ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout;
> ctx->timeout_elapsed = 0;
>
> - libfloat_get_last_term(ctx, &last_id, &last_term);
> + if (ctx->persistent.commit_index > 0)
> + {
> + libfloat_get_last_term(ctx, &last_id, &last_term);
> + }
>
> /* Send a vote request to each node of the cluster */
> for_every_node(ctx, node, {
> diff --git a/libfloat.h b/libfloat.h
> index 1b0ecf0..b4aaf1a 100644
> --- a/libfloat.h
> +++ b/libfloat.h
> @@ -112,12 +112,11 @@ struct libfloat_ctx_s {
> bool (*send_snapshot)(struct libfloat_ctx_s *, libfloat_node_t *);
>
> /*!
> - * \brief Abort a Snapshot currently send to a specific node
> + * \brief Abort every Snapshot currently send
> *
> * \param[in] struct libfloat_ctx_s * - Cluster context
> - * \param[in] libfloat_node_t * - Node to send the abort snapshot request to
> */
> - bool (*abort_snapshot)(struct libfloat_ctx_s *, libfloat_node_t *);
> + bool (*abort_send_snapshot)(struct libfloat_ctx_s *);
>
> /*!
> * \brief Write (and apply) a log to persistent storage
> @@ -380,15 +379,6 @@ void libfloat_request_vote_response(libfloat_ctx_t *ctx, libfloat_rpc_response_v
> */
> void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success);
>
> -/*!
> - * \brief Callback to update sending snapshot state
> - *
> - * \param[in] ctx libfloat context
> - * \param[in] node node ID
> - * \param[in] status true or false
> - */
> -void libfloat_snapshot_status_update(libfloat_ctx_t *ctx, libfloat_node_id_t id, bool status);
> -
> /*!
> * \brief Set user data for a node
> *
> diff --git a/log.c b/log.c
> index 1eeb566..d877a54 100644
> --- a/log.c
> +++ b/log.c
> @@ -272,7 +272,6 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
> ctx->persistent.snapshot.index
> );
> node->snapshot_count = 0;
> - node->snapshot_in_progress = true;
> ctx->send_snapshot(ctx, node);
> /* check that there is not already a snapshot in progress */
> return;
> @@ -283,9 +282,6 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
> goto end;
> }
>
> - /* reset snapshot flag */
> - node->snapshot_in_progress = false;
> -
> if (node->next_log_to_send <= ctx->persistent.commit_index)
> {
>
> @@ -760,10 +756,19 @@ bool libfloat_get_last_term(libfloat_ctx_t *ctx, libfloat_entry_id_t *id, libflo
> iterator = kh_get(libfloat_entry_id_t, ctx->persistent.log, i);
> if (iterator == kh_end(ctx->persistent.log))
> {
> - /* NOT FOUND, NEED TO LOAD LOG */
> if (!libfloat_get_log_from_db(ctx, i, &log))
> {
> - ERROR(ctx, "Log %u not found while retrieving last term", i);
> + if (ctx->persistent.commit_index == ctx->persistent.snapshot.index)
> + {
> + DEBUG(ctx, "Retrieve last term from snapshot");
> + if (term != NULL)
> + *term = ctx->persistent.snapshot.term;
> + if (id != NULL)
> + *id = i;
> + return true;
> + }
> +
> + ERROR(ctx, "Log %u not found while retrieving last term (snapshot log %u)", i, ctx->persistent.snapshot.index);
> return false;
> }
> }
> diff --git a/node.h b/node.h
> index cd6f666..a91b4ef 100644
> --- a/node.h
> +++ b/node.h
> @@ -9,7 +9,6 @@ typedef struct {
>
> uint8_t has_voted_for_me : 1;
> uint8_t is_up_to_date : 1;
> - uint8_t snapshot_in_progress : 1;
>
> void *udata; /*!< User data */
> time_t last_update; /*!< Time of the last AE response (If I am the leader) */
> diff --git a/raft.c b/raft.c
> index 4dba9e1..1b651c4 100644
> --- a/raft.c
> +++ b/raft.c
> @@ -34,19 +34,8 @@ void libfloat_become_candidate(libfloat_ctx_t *ctx)
>
> void libfloat_become_follower(libfloat_ctx_t *ctx)
> {
> - libfloat_node_t *node = NULL;
> -
> DEBUG(ctx, "Becoming follower");
> - for_every_node(ctx, node, {
> - if (ctx->me == node)
> - continue;
> -
> - if (node->snapshot_in_progress)
> - {
> - ctx->abort_snapshot(ctx, node);
> - node->snapshot_in_progress = false;
> - }
> - });
> + ctx->abort_send_snapshot(ctx);
> ctx->state = RAFT_STATE_FOLLOWER;
> ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout;
> libfloat_vote_for(ctx, 0);
> diff --git a/snapshot.c b/snapshot.c
> index 2ca1ab2..d9025d3 100644
> --- a/snapshot.c
> +++ b/snapshot.c
> @@ -110,8 +110,6 @@ static void libfloat_internal_snapshot_apply_log(libfloat_ctx_t *ctx)
>
> void libfloat_internal_snapshot_begin(libfloat_ctx_t *ctx, libfloat_entry_id_t id, libfloat_term_t term)
> {
> - libfloat_node_t *node = NULL;
> -
> if (ctx->snapshot == NULL)
> {
> /* Implementation is missing a snapshot logic, let's quit here */
> @@ -120,16 +118,7 @@ void libfloat_internal_snapshot_begin(libfloat_ctx_t *ctx, libfloat_entry_id_t i
> return;
> }
>
> - for_every_node(ctx, node, {
> - if (ctx->me == node)
> - continue;
> -
> - if (node->snapshot_in_progress)
> - {
> - ctx->abort_snapshot(ctx, node);
> - node->snapshot_in_progress = false;
> - }
> - });
> + ctx->abort_send_snapshot(ctx);
>
> if (id == ctx->persistent.snapshot.index + 1 && term == ctx->persistent.snapshot.term)
> {
> @@ -197,16 +186,3 @@ void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success)
> libfloat_internal_snapshot_apply_log(ctx);
> ctx->is_snapshotting = false;
> }
> -
> -void libfloat_snapshot_status_update(libfloat_ctx_t *ctx, libfloat_node_id_t id, bool status)
> -{
> - libfloat_node_t *node = NULL;
> - khint_t iterator;
> -
> - iterator = kh_get(libfloat_node_id_t, ctx->nodes, id);
> - if (iterator != kh_end(ctx->nodes))
> - {
> - node = kh_val(ctx->nodes, iterator);
> - node->snapshot_in_progress = status;
> - }
> -}
> --
> 2.25.1
>
LGTM
--
Louis Solofrizzo
> <woot> Put *that* in you .sig and smoke it, Knghtbrd.
> <Culus> You know he will read this :>
> <woot> heheheheh.