~ne02ptzero/libfloat

raft: add a should_snapshot method v1 SUPERSEDED

Patrik Cyvoct: 1
 raft: add a should_snapshot method

 2 files changed, 61 insertions(+), 42 deletions(-)
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.sr.ht/~ne02ptzero/libfloat/patches/42771/mbox | git am -3
Learn more about email & git

[PATCH] raft: add a should_snapshot method Export this patch

This patch adds the ability to register a should_snapshot callback,
that can be used in order to avoid snapshotting on certain cases
decided by the library client.

Signed-off-by: Patrik Cyvoct <patrik@ptrk.io>
---
 libfloat.h | 92 ++++++++++++++++++++++++++++++------------------------
 log.c      | 11 ++++++-
 2 files changed, 61 insertions(+), 42 deletions(-)

diff --git a/libfloat.h b/libfloat.h
index 9a529b4..06d51e2 100644
--- a/libfloat.h
+++ b/libfloat.h
@@ -32,35 +32,36 @@ typedef enum {

struct libfloat_ctx_s {
    struct {
        libfloat_term_t                 term;           /*!< Current term */
        libfloat_node_id_t              voted_for;      /*!< Last Node ID we voted for (0 if no current vote) */
        libfloat_entry_id_t             commit_index;   /*!< Last log we applied */
        khash_t(libfloat_entry_id_t)    *log;           /*!< Hashtable of current log */
        libfloat_term_t                 term;               /*!< Current term */
        libfloat_node_id_t              voted_for;          /*!< Last Node ID we voted for (0 if no current vote) */
        libfloat_entry_id_t             commit_index;       /*!< Last log we applied */
        khash_t(libfloat_entry_id_t)    *log;               /*!< Hashtable of current log */

        struct {
            libfloat_entry_id_t         index;          /*!< Last index of the snapshot */
            libfloat_term_t             term;           /*!< Last term of the snapshot */
            libfloat_entry_id_t         index;              /*!< Last index of the snapshot */
            libfloat_term_t             term;               /*!< Last term of the snapshot */
            libfloat_entry_id_t         last_checked_index; /*!< Last checked index of the snapshot, used if should_snapshot is defined */
        } snapshot;
    } persistent;

    struct {
        uint64_t        leader_election;                /*!< Count of leader elections for this cluster */
        uint64_t        orphans_logs;                   /*!< Count of logs that are applied on the leader only */
        time_t          leader_election_time;           /*!< Timestamp of the last leader election */
        const char      *last_election_reason;          /*!< Last reason we had an election */
        time_t          last_election_duration;         /*!< Time it took to elect a leader for the last election */
        uint64_t        leader_election;                    /*!< Count of leader elections for this cluster */
        uint64_t        orphans_logs;                       /*!< Count of logs that are applied on the leader only */
        time_t          leader_election_time;               /*!< Timestamp of the last leader election */
        const char      *last_election_reason;              /*!< Last reason we had an election */
        time_t          last_election_duration;             /*!< Time it took to elect a leader for the last election */
    } stat;

    struct {
        uint32_t                election_timeout;       /*!< Timeout for RAFT election (ms) */
        uint32_t                log_commit_timeout;     /*!< Timeout for log application (s) */
        uint32_t                compact_every_n_log;    /*!< Snapshot every N logs */
        bool                    avoid_congestion;       /*!< Avoid spamming AE to nodes already behind */
        uint32_t                sanity_timeout;         /*!< Timeout before a leader steps-down if it can't reach any other nodes (sec) */
        bool                    optimistic_replication; /*!< Do we assume that nodes are gonna replicate the log or not */
        uint32_t                max_logs_per_ae;        /*!< Max number of logs to be sent per AE request */
        bool                    do_revert;              /*!< Whether or not to revert logs */
        uint32_t                deep_sleep_time;        /*!< Time with no activity before a cluster goes in deep-sleep mode */
        uint32_t                election_timeout;           /*!< Timeout for RAFT election (ms) */
        uint32_t                log_commit_timeout;         /*!< Timeout for log application (s) */
        uint32_t                compact_every_n_log;        /*!< Snapshot every N logs */
        bool                    avoid_congestion;           /*!< Avoid spamming AE to nodes already behind */
        uint32_t                sanity_timeout;             /*!< Timeout before a leader steps-down if it can't reach any other nodes (sec) */
        bool                    optimistic_replication;     /*!< Do we assume that nodes are gonna replicate the log or not */
        uint32_t                max_logs_per_ae;            /*!< Max number of logs to be sent per AE request */
        bool                    do_revert;                  /*!< Whether or not to revert logs */
        uint32_t                deep_sleep_time;            /*!< Time with no activity before a cluster goes in deep-sleep mode */

        struct {
            uint32_t            election_timeout;
@@ -70,35 +71,35 @@ struct libfloat_ctx_s {
        } original;
    } conf;

    libfloat_raft_state_t       state;                  /*!< Current state of the node */
    size_t                      n_nodes;                /*!< Number of nodes in the cluster */
    khash_t(libfloat_node_id_t) *nodes;                 /*!< Hashtable of nodes */
    libfloat_node_t             *leader;                /*!< Current Leader (can be NULL) */
    libfloat_node_t             *me;                    /*!< My node in the cluster */

    uint32_t                    timeout_elapsed;        /*!< Current time elasped between two heartbeats */
    uint32_t                    election_timeout_rand;  /*!< Randomized election time */
    uint32_t                    request_timeout;        /*!< Timeout for AE */
    uint32_t                    logs_check;             /*!< Last check counter of log stuck */
    bool                        stepping_down;          /*!< Is the node stepping down from leadership */
    time_t                      lost_leader_time;       /*!< Timestamp that we lost our leader */
    time_t                      last_log_time;          /*!< Last time we applied a log */
    libfloat_raft_state_t       state;                      /*!< Current state of the node */
    size_t                      n_nodes;                    /*!< Number of nodes in the cluster */
    khash_t(libfloat_node_id_t) *nodes;                     /*!< Hashtable of nodes */
    libfloat_node_t             *leader;                    /*!< Current Leader (can be NULL) */
    libfloat_node_t             *me;                        /*!< My node in the cluster */

    uint32_t                    timeout_elapsed;            /*!< Current time elasped between two heartbeats */
    uint32_t                    election_timeout_rand;      /*!< Randomized election time */
    uint32_t                    request_timeout;            /*!< Timeout for AE */
    uint32_t                    logs_check;                 /*!< Last check counter of log stuck */
    bool                        stepping_down;              /*!< Is the node stepping down from leadership */
    time_t                      lost_leader_time;           /*!< Timestamp that we lost our leader */
    time_t                      last_log_time;              /*!< Last time we applied a log */
#define LIBFLOAT_DEEP_SLEEP_STATE_MAX 4
    uint32_t                    deep_sleep_state;       /*!< Deep sleep state of the cluster */
    uint32_t                    deep_sleep_state;           /*!< Deep sleep state of the cluster */

    struct {
        bool                    checking;               /*!< Are we checking that every node has lost the leader? */
        bool                    recovering;             /*!< We've just recoverd a leader from a gray-failure check */
        time_t                  check_time;             /*!< Time that we launched the check */
        bool                    checking;                   /*!< Are we checking that every node has lost the leader? */
        bool                    recovering;                 /*!< We've just recoverd a leader from a gray-failure check */
        time_t                  check_time;                 /*!< Time that we launched the check */
    } gray_failures;


    bool                        is_snapshotting;        /*!< Is the cluster currently snapshotting */
    libfloat_entry_id_t         snapshot_to_commit;     /*!< Last log ID that is to be written after the snapshot is done */
    bool                        is_snapshotting;            /*!< Is the cluster currently snapshotting */
    libfloat_entry_id_t         snapshot_to_commit;         /*!< Last log ID that is to be written after the snapshot is done */

    libfloat_list_t             logs;                   /*<! Logs with commit still not acked */
    libfloat_list_t             logs;                       /*<! Logs with commit still not acked */

    void                        *udata;                 /*!< User data */
    void                        *udata;                     /*!< User data */

    /*!
     * \brief Send a Vote Request to a node
@@ -325,6 +326,15 @@ struct libfloat_ctx_s {
     */
    void (*debug)(struct libfloat_ctx_s *, const char *);

    /*!
     * \brief Returns wether a snapshot is needed, with the implementation left to the client
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     *
     * \note This function will be called each compact_every_n_log
     */
    bool (*should_snapshot)(struct libfloat_ctx_s *);

    /**
     * The following functions are expected to behave like their counterparts in man(3)
     */
diff --git a/log.c b/log.c
index 53bf93d..d61ce49 100644
--- a/log.c
+++ b/log.c
@@ -179,7 +179,16 @@ bool libfloat_add_log(libfloat_ctx_t *ctx, libfloat_commit_type_t commit_type, l
    if (ctx->conf.compact_every_n_log != 0)
    {
        /* User asked for snapshots, let's check it */
        if (ctx->persistent.commit_index - ctx->persistent.snapshot.index >= ctx->conf.compact_every_n_log)
        if (ctx->should_snapshot != NULL)
        {
            /* If we have a should_snapshot method registered, let's check if this snapshot is needed */
            if (ctx->persistent.commit_index - ctx->persistent.snapshot.last_checked_index >= ctx->conf.compact_every_n_log && ctx->should_snapshot(ctx))
            {
                ctx->append_snapshot_log(ctx, ctx->persistent.commit_index, ctx->persistent.term);
                ctx->persistent.snapshot.last_checked_index = ctx->persistent.commit_index;
            }
        }
        else if (ctx->persistent.commit_index - ctx->persistent.snapshot.index >= ctx->conf.compact_every_n_log)
        {
            ctx->append_snapshot_log(ctx, ctx->persistent.commit_index, ctx->persistent.term);
        }
-- 
2.41.0