~ne02ptzero/libfloat

Add callback to abort snapshot synchronization v1 APPLIED

Michael Bonfils: 1
 Add callback to abort snapshot synchronization

 5 files changed, 60 insertions(+), 0 deletions(-)
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.sr.ht/~ne02ptzero/libfloat/patches/25458/mbox | git am -3
Learn more about email & git

[PATCH] Add callback to abort snapshot synchronization Export this patch

It will be done when a leader become a follower or when
a new snapshot should be done.

Signed-off-by: Michael Bonfils <mbonfils@scaleway.com>
---
 libfloat.h | 17 +++++++++++++++++
 log.c      |  4 ++++
 node.h     |  1 +
 raft.c     | 12 ++++++++++++
 snapshot.c | 26 ++++++++++++++++++++++++++
 5 files changed, 60 insertions(+)

diff --git a/libfloat.h b/libfloat.h
index 9319876..1b0ecf0 100644
--- a/libfloat.h
+++ b/libfloat.h
@@ -111,6 +111,14 @@ struct libfloat_ctx_s {
     */
    bool (*send_snapshot)(struct libfloat_ctx_s *, libfloat_node_t *);

    /*!
     * \brief Abort a Snapshot currently send to a specific node
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_node_t * - Node to send the abort snapshot request to
     */
    bool (*abort_snapshot)(struct libfloat_ctx_s *, libfloat_node_t *);

    /*!
     * \brief Write (and apply) a log to persistent storage
     *
@@ -372,6 +380,15 @@ void libfloat_request_vote_response(libfloat_ctx_t *ctx, libfloat_rpc_response_v
 */
void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success);

/*!
 * \brief Callback to update sending snapshot state
 *
 * \param[in] ctx libfloat context
 * \param[in] node node ID
 * \param[in] status true or false
 */
void libfloat_snapshot_status_update(libfloat_ctx_t *ctx, libfloat_node_id_t id, bool status);

/*!
 * \brief Set user data for a node
 *
diff --git a/log.c b/log.c
index bed02da..1eeb566 100644
--- a/log.c
+++ b/log.c
@@ -272,6 +272,7 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
                ctx->persistent.snapshot.index
            );
            node->snapshot_count = 0;
            node->snapshot_in_progress = true;
            ctx->send_snapshot(ctx, node);
            /* check that there is not already a snapshot in progress */
            return;
@@ -282,6 +283,9 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
        goto end;
    }

    /* reset snapshot flag */
    node->snapshot_in_progress = false;

    if (node->next_log_to_send <= ctx->persistent.commit_index)
    {

diff --git a/node.h b/node.h
index a91b4ef..cd6f666 100644
--- a/node.h
+++ b/node.h
@@ -9,6 +9,7 @@ typedef struct {

    uint8_t             has_voted_for_me        : 1;
    uint8_t             is_up_to_date           : 1;
    uint8_t             snapshot_in_progress    : 1;

    void                *udata;                 /*!< User data */
    time_t              last_update;            /*!< Time of the last AE response (If I am the leader) */
diff --git a/raft.c b/raft.c
index 9bb83f7..4dba9e1 100644
--- a/raft.c
+++ b/raft.c
@@ -34,7 +34,19 @@ void libfloat_become_candidate(libfloat_ctx_t *ctx)

void libfloat_become_follower(libfloat_ctx_t *ctx)
{
    libfloat_node_t             *node = NULL;

    DEBUG(ctx, "Becoming follower");
    for_every_node(ctx, node, {
        if (ctx->me == node)
            continue;

        if (node->snapshot_in_progress)
        {
            ctx->abort_snapshot(ctx, node);
            node->snapshot_in_progress = false;
        }
    });
    ctx->state = RAFT_STATE_FOLLOWER;
    ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout;
    libfloat_vote_for(ctx, 0);
diff --git a/snapshot.c b/snapshot.c
index 4034018..34db6ff 100644
--- a/snapshot.c
+++ b/snapshot.c
@@ -110,6 +110,8 @@ static void libfloat_internal_snapshot_apply_log(libfloat_ctx_t *ctx)

void libfloat_internal_snapshot_begin(libfloat_ctx_t *ctx, libfloat_entry_id_t id, libfloat_term_t term)
{
    libfloat_node_t             *node = NULL;

    if (ctx->snapshot == NULL)
    {
        /* Implementation is missing a snapshot logic, let's quit here */
@@ -118,6 +120,17 @@ void libfloat_internal_snapshot_begin(libfloat_ctx_t *ctx, libfloat_entry_id_t i
        return;
    }

    for_every_node(ctx, node, {
        if (ctx->me == node)
            continue;

        if (node->snapshot_in_progress)
        {
            ctx->abort_snapshot(ctx, node);
            node->snapshot_in_progress = false;
        }
    });

    if (id == ctx->persistent.snapshot.index + 1 && term == ctx->persistent.snapshot.term)
    {
        DEBUG(ctx, "libfloat_internal_snapshot_begin: A snapshot already exists with previous commit id, skip it");
@@ -184,3 +197,16 @@ void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success)
    libfloat_internal_snapshot_apply_log(ctx);
    ctx->is_snapshotting = false;
}

void libfloat_snapshot_status_update(libfloat_ctx_t *ctx, libfloat_node_id_t id, bool status)
{
    libfloat_node_t     *node = NULL;
    khint_t             iterator;

    iterator = kh_get(libfloat_node_id_t, ctx->nodes, id);
    if (iterator != kh_end(ctx->nodes))
    {
        node = kh_val(ctx->nodes, iterator);
        node->snapshot_in_progress = status;
    }
}
-- 
2.25.1
LGTM

---
Louis Solofrizzo
> If Patrick Henry thought that taxation without representation was bad,
> he should see how bad it is with representation.