~ne02ptzero/libfloat

raft: add soft snapshot feature v3 SUPERSEDED

Patrik Cyvoct: 1
 raft: add soft snapshot feature

 4 files changed, 58 insertions(+), 45 deletions(-)
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.sr.ht/~ne02ptzero/libfloat/patches/42782/mbox | git am -3
Learn more about email & git

[PATCH v3] raft: add soft snapshot feature Export this patch

This patch add soft snapshot feature: if no logs are received for the
soft_compact_time seconds, a snapshot will be made based on the
soft_compact_after_n value.

Signed-off-by: Patrik Cyvoct <patrik@ptrk.io>
---
 libfloat.c |  1 +
 libfloat.h | 88 ++++++++++++++++++++++++++++--------------------------
 periodic.c | 13 ++++++--
 raft.c     |  1 +
 4 files changed, 58 insertions(+), 45 deletions(-)

diff --git a/libfloat.c b/libfloat.c
index c075d5c..9b3be69 100644
--- a/libfloat.c
+++ b/libfloat.c
@@ -166,6 +166,7 @@ void libfloat_wake_up(libfloat_ctx_t *ctx)

        ctx->deep_sleep_state = 0;
        ctx->last_log_time = ms_to_s(ctx->global_timer);
        ctx->last_activity_time = ms_to_s(ctx->global_timer);
        ctx->write_current_sleep_state(ctx, ctx->deep_sleep_state, ctx->conf.original.deep_sleep_time);
    }
}
diff --git a/libfloat.h b/libfloat.h
index eaef087..06608df 100644
--- a/libfloat.h
+++ b/libfloat.h
@@ -33,37 +33,38 @@ typedef enum {

struct libfloat_ctx_s {
    struct {
        libfloat_term_t                 term;           /*!< Current term */
        libfloat_node_id_t              voted_for;      /*!< Last Node ID we voted for (0 if no current vote) */
        libfloat_entry_id_t             commit_index;   /*!< Last log we applied */
        khash_t(libfloat_entry_id_t)    *log;           /*!< Hashtable of current log */
        libfloat_term_t                 term;               /*!< Current term */
        libfloat_node_id_t              voted_for;          /*!< Last Node ID we voted for (0 if no current vote) */
        libfloat_entry_id_t             commit_index;       /*!< Last log we applied */
        khash_t(libfloat_entry_id_t)    *log;               /*!< Hashtable of current log */

        struct {
            libfloat_entry_id_t         index;          /*!< Last index of the snapshot */
            libfloat_term_t             term;           /*!< Last term of the snapshot */
            libfloat_entry_id_t         index;              /*!< Last index of the snapshot */
            libfloat_term_t             term;               /*!< Last term of the snapshot */
        } snapshot;
    } persistent;


    struct {
        uint64_t          leader_election;                /*!< Count of leader elections for this cluster */
        uint64_t          orphans_logs;                   /*!< Count of logs that are applied on the leader only */
        libfloat_second_t leader_election_time;           /*!< Timestamp of the last leader election */
        const char        *last_election_reason;          /*!< Last reason we had an election */
        libfloat_second_t last_election_duration;         /*!< Time it took to elect a leader for the last election */
        uint64_t          leader_election;                  /*!< Count of leader elections for this cluster */
        uint64_t          orphans_logs;                     /*!< Count of logs that are applied on the leader only */
        libfloat_second_t leader_election_time;             /*!< Timestamp of the last leader election */
        const char        *last_election_reason;            /*!< Last reason we had an election */
        libfloat_second_t last_election_duration;           /*!< Time it took to elect a leader for the last election */
    } stat;

    struct {
        libfloat_millisecond_t  election_timeout;       /*!< Timeout for RAFT election (ms) */
        libfloat_second_t       log_commit_timeout;     /*!< Timeout for log application (s) */
        uint32_t                compact_every_n_log;    /*!< Snapshot every N logs */
        bool                    avoid_congestion;       /*!< Avoid spamming AE to nodes already behind */
        libfloat_second_t       sanity_timeout;         /*!< Timeout before a leader steps-down if it can't reach any other nodes (sec) */
        bool                    optimistic_replication; /*!< Do we assume that nodes are gonna replicate the log or not */
        uint32_t                max_logs_per_ae;        /*!< Max number of logs to be sent per AE request */
        bool                    do_revert;              /*!< Whether or not to revert logs */
        libfloat_second_t       deep_sleep_time;        /*!< Time with no activity before a cluster goes in deep-sleep mode */

        libfloat_millisecond_t  election_timeout;           /*!< Timeout for RAFT election (ms) */
        libfloat_second_t       log_commit_timeout;         /*!< Timeout for log application (s) */
        uint32_t                compact_every_n_log;        /*!< Snapshot every N logs */
        uint32_t                soft_compact_every_n_log;   /*!< Soft snapshot every N logs (if no log since soft_compact_time) */
        libfloat_second_t       soft_compact_time;          /*!< Duration in seconds for soft compact */
        bool                    avoid_congestion;           /*!< Avoid spamming AE to nodes already behind */
        libfloat_second_t       sanity_timeout;             /*!< Timeout before a leader steps-down if it can't reach any other nodes (sec) */
        bool                    optimistic_replication;     /*!< Do we assume that nodes are gonna replicate the log or not */
        uint32_t                max_logs_per_ae;            /*!< Max number of logs to be sent per AE request */
        bool                    do_revert;                  /*!< Whether or not to revert logs */
        libfloat_second_t       deep_sleep_time;            /*!< Time with no activity before a cluster goes in deep-sleep mode */
        struct {
            libfloat_millisecond_t election_timeout;
            libfloat_second_t      sanity_timeout;
@@ -72,36 +73,37 @@ struct libfloat_ctx_s {
        } original;
    } conf;

    libfloat_raft_state_t       state;                  /*!< Current state of the node */
    size_t                      n_nodes;                /*!< Number of nodes in the cluster */
    khash_t(libfloat_node_id_t) *nodes;                 /*!< Hashtable of nodes */
    libfloat_node_t             *leader;                /*!< Current Leader (can be NULL) */
    libfloat_node_t             *me;                    /*!< My node in the cluster */

    libfloat_millisecond_t      global_timer;           /*!< Timer used for synchronisation, in millisconds */
    libfloat_millisecond_t      timeout_elapsed;        /*!< Current time elasped between two heartbeats */
    libfloat_millisecond_t      election_timeout_rand;  /*!< Randomized election time */
    libfloat_millisecond_t      request_timeout;        /*!< Timeout for AE */
    uint32_t                    logs_check;             /*!< Last check counter of log stuck */
    bool                        stepping_down;          /*!< Is the node stepping down from leadership */
    libfloat_second_t           lost_leader_time;       /*!< Timestamp that we lost our leader */
    libfloat_second_t           last_log_time;          /*!< Last time we applied a log */
    libfloat_raft_state_t       state;                      /*!< Current state of the node */
    size_t                      n_nodes;                    /*!< Number of nodes in the cluster */
    khash_t(libfloat_node_id_t) *nodes;                     /*!< Hashtable of nodes */
    libfloat_node_t             *leader;                    /*!< Current Leader (can be NULL) */
    libfloat_node_t             *me;                        /*!< My node in the cluster */

    libfloat_millisecond_t      global_timer;               /*!< Timer used for synchronisation, in millisconds */
    libfloat_millisecond_t      timeout_elapsed;            /*!< Current time elasped between two heartbeats */
    libfloat_millisecond_t      election_timeout_rand;      /*!< Randomized election time */
    libfloat_millisecond_t      request_timeout;            /*!< Timeout for AE */
    uint32_t                    logs_check;                 /*!< Last check counter of log stuck */
    bool                        stepping_down;              /*!< Is the node stepping down from leadership */
    libfloat_second_t           lost_leader_time;           /*!< Timestamp that we lost our leader */
    libfloat_second_t           last_log_time;              /*!< Last time we applied a log */
    libfloat_second_t           last_activity_time;         /*!< Last time we had an activity (deep sleep state change, log, ...) */
#define LIBFLOAT_DEEP_SLEEP_STATE_MAX 4
    uint32_t                    deep_sleep_state;       /*!< Deep sleep state of the cluster */
    uint32_t                    deep_sleep_state;           /*!< Deep sleep state of the cluster */

    struct {
        bool                    checking;               /*!< Are we checking that every node has lost the leader? */
        bool                    recovering;             /*!< We've just recoverd a leader from a gray-failure check */
        libfloat_second_t       check_time;             /*!< Time that we launched the check */
        bool                    checking;                   /*!< Are we checking that every node has lost the leader? */
        bool                    recovering;                 /*!< We've just recoverd a leader from a gray-failure check */
        libfloat_second_t       check_time;                 /*!< Time that we launched the check */
    } gray_failures;


    bool                        is_snapshotting;        /*!< Is the cluster currently snapshotting */
    libfloat_entry_id_t         snapshot_to_commit;     /*!< Last log ID that is to be written after the snapshot is done */
    bool                        is_snapshotting;            /*!< Is the cluster currently snapshotting */
    libfloat_entry_id_t         snapshot_to_commit;         /*!< Last log ID that is to be written after the snapshot is done */

    libfloat_list_t             logs;                   /*<! Logs with commit still not acked */
    libfloat_list_t             logs;                       /*<! Logs with commit still not acked */

    void                        *udata;                 /*!< User data */
    void                        *udata;                     /*!< User data */

    /*!
     * \brief Send a Vote Request to a node
diff --git a/periodic.c b/periodic.c
index 32ed0d4..14f5c2e 100644
--- a/periodic.c
+++ b/periodic.c
@@ -12,11 +12,11 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time)

    if (ctx->conf.deep_sleep_time != 0)
    {
        if (ctx->last_log_time + ctx->conf.deep_sleep_time < ms_to_s(ctx->global_timer))
        if (ctx->last_activity_time + ctx->conf.deep_sleep_time < ms_to_s(ctx->global_timer))
        {
            if (ctx->deep_sleep_state < LIBFLOAT_DEEP_SLEEP_STATE_MAX)
            {
                ctx->last_log_time = ms_to_s(ctx->global_timer);
                ctx->last_activity_time = ms_to_s(ctx->global_timer);

                ctx->conf.deep_sleep_time *= 2;
                ctx->conf.election_timeout *= 2;
@@ -109,6 +109,15 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time)
            /* check logs still waiting */
            ctx->logs_check = 0;
        }

        if (ctx->conf.soft_compact_every_n_log != 0)
        {
            /* If soft compact is enabled, let's check if the timing is right */
            if (ctx->last_log_time + ctx->conf.soft_compact_time < ms_to_s(ctx->global_timer) && ctx->persistent.commit_index - ctx->persistent.snapshot.index >= ctx->conf.soft_compact_every_n_log)
            {
                ctx->append_snapshot_log(ctx, ctx->persistent.commit_index, ctx->persistent.term);
            }
        }
    }
    else if (ctx->election_timeout_rand <= ctx->timeout_elapsed)
    {
diff --git a/raft.c b/raft.c
index 2a425b1..517f98e 100644
--- a/raft.c
+++ b/raft.c
@@ -91,6 +91,7 @@ void libfloat_set_current_commit_index(libfloat_ctx_t *ctx, libfloat_entry_id_t
        return;

    ctx->last_log_time = ms_to_s(ctx->global_timer);
    ctx->last_activity_time = ms_to_s(ctx->global_timer);
    libfloat_wake_up(ctx);

    ctx->persistent.commit_index = id;
-- 
2.41.0