Patrik Cyvoct: 1 raft: add soft snapshot feature 4 files changed, 58 insertions(+), 45 deletions(-)
Copy & paste the following snippet into your terminal to import this patchset into git:
curl -s https://lists.sr.ht/~ne02ptzero/libfloat/patches/42781/mbox | git am -3Learn more about email & git
This patch add soft snapshot feature: if no logs are received for the soft_compact_time seconds, a snapshot will be made based on the soft_compact_after_n value. Signed-off-by: Patrik Cyvoct <patrik@ptrk.io> --- libfloat.c | 1 + libfloat.h | 88 ++++++++++++++++++++++++++++-------------------------- periodic.c | 13 ++++++-- raft.c | 1 + 4 files changed, 58 insertions(+), 45 deletions(-) diff --git a/libfloat.c b/libfloat.c index c075d5c..9b3be69 100644 --- a/libfloat.c +++ b/libfloat.c @@ -166,6 +166,7 @@ void libfloat_wake_up(libfloat_ctx_t *ctx) ctx->deep_sleep_state = 0; ctx->last_log_time = ms_to_s(ctx->global_timer); + ctx->last_activity_time = ms_to_s(ctx->global_timer); ctx->write_current_sleep_state(ctx, ctx->deep_sleep_state, ctx->conf.original.deep_sleep_time); } } diff --git a/libfloat.h b/libfloat.h index eaef087..06608df 100644 --- a/libfloat.h +++ b/libfloat.h @@ -33,37 +33,38 @@ typedef enum { struct libfloat_ctx_s { struct { - libfloat_term_t term; /*!< Current term */ - libfloat_node_id_t voted_for; /*!< Last Node ID we voted for (0 if no current vote) */ - libfloat_entry_id_t commit_index; /*!< Last log we applied */ - khash_t(libfloat_entry_id_t) *log; /*!< Hashtable of current log */ + libfloat_term_t term; /*!< Current term */ + libfloat_node_id_t voted_for; /*!< Last Node ID we voted for (0 if no current vote) */ + libfloat_entry_id_t commit_index; /*!< Last log we applied */ + khash_t(libfloat_entry_id_t) *log; /*!< Hashtable of current log */ struct { - libfloat_entry_id_t index; /*!< Last index of the snapshot */ - libfloat_term_t term; /*!< Last term of the snapshot */ + libfloat_entry_id_t index; /*!< Last index of the snapshot */ + libfloat_term_t term; /*!< Last term of the snapshot */ } snapshot; } persistent; struct { - uint64_t leader_election; /*!< Count of leader elections for this cluster */ - uint64_t orphans_logs; /*!< Count of logs that are applied on the leader only */ - libfloat_second_t leader_election_time; /*!< Timestamp of the last leader election */ - const char *last_election_reason; /*!< Last reason we had an election */ - libfloat_second_t last_election_duration; /*!< Time it took to elect a leader for the last election */ + uint64_t leader_election; /*!< Count of leader elections for this cluster */ + uint64_t orphans_logs; /*!< Count of logs that are applied on the leader only */ + libfloat_second_t leader_election_time; /*!< Timestamp of the last leader election */ + const char *last_election_reason; /*!< Last reason we had an election */ + libfloat_second_t last_election_duration; /*!< Time it took to elect a leader for the last election */ } stat; struct { - libfloat_millisecond_t election_timeout; /*!< Timeout for RAFT election (ms) */ - libfloat_second_t log_commit_timeout; /*!< Timeout for log application (s) */ - uint32_t compact_every_n_log; /*!< Snapshot every N logs */ - bool avoid_congestion; /*!< Avoid spamming AE to nodes already behind */ - libfloat_second_t sanity_timeout; /*!< Timeout before a leader steps-down if it can't reach any other nodes (sec) */ - bool optimistic_replication; /*!< Do we assume that nodes are gonna replicate the log or not */ - uint32_t max_logs_per_ae; /*!< Max number of logs to be sent per AE request */ - bool do_revert; /*!< Whether or not to revert logs */ - libfloat_second_t deep_sleep_time; /*!< Time with no activity before a cluster goes in deep-sleep mode */ - + libfloat_millisecond_t election_timeout; /*!< Timeout for RAFT election (ms) */ + libfloat_second_t log_commit_timeout; /*!< Timeout for log application (s) */ + uint32_t compact_every_n_log; /*!< Snapshot every N logs */ + uint32_t soft_compact_every_n_log; /*!< Soft snapshot every N logs (if no log since soft_compact_time) */ + libfloat_second_t soft_compact_time; /*!< Duration in seconds for soft compact */ + bool avoid_congestion; /*!< Avoid spamming AE to nodes already behind */ + libfloat_second_t sanity_timeout; /*!< Timeout before a leader steps-down if it can't reach any other nodes (sec) */ + bool optimistic_replication; /*!< Do we assume that nodes are gonna replicate the log or not */ + uint32_t max_logs_per_ae; /*!< Max number of logs to be sent per AE request */ + bool do_revert; /*!< Whether or not to revert logs */ + libfloat_second_t deep_sleep_time; /*!< Time with no activity before a cluster goes in deep-sleep mode */ struct { libfloat_millisecond_t election_timeout; libfloat_second_t sanity_timeout; @@ -72,36 +73,37 @@ struct libfloat_ctx_s { } original; } conf; - libfloat_raft_state_t state; /*!< Current state of the node */ - size_t n_nodes; /*!< Number of nodes in the cluster */ - khash_t(libfloat_node_id_t) *nodes; /*!< Hashtable of nodes */ - libfloat_node_t *leader; /*!< Current Leader (can be NULL) */ - libfloat_node_t *me; /*!< My node in the cluster */ - - libfloat_millisecond_t global_timer; /*!< Timer used for synchronisation, in millisconds */ - libfloat_millisecond_t timeout_elapsed; /*!< Current time elasped between two heartbeats */ - libfloat_millisecond_t election_timeout_rand; /*!< Randomized election time */ - libfloat_millisecond_t request_timeout; /*!< Timeout for AE */ - uint32_t logs_check; /*!< Last check counter of log stuck */ - bool stepping_down; /*!< Is the node stepping down from leadership */ - libfloat_second_t lost_leader_time; /*!< Timestamp that we lost our leader */ - libfloat_second_t last_log_time; /*!< Last time we applied a log */ + libfloat_raft_state_t state; /*!< Current state of the node */ + size_t n_nodes; /*!< Number of nodes in the cluster */ + khash_t(libfloat_node_id_t) *nodes; /*!< Hashtable of nodes */ + libfloat_node_t *leader; /*!< Current Leader (can be NULL) */ + libfloat_node_t *me; /*!< My node in the cluster */ + + libfloat_millisecond_t global_timer; /*!< Timer used for synchronisation, in millisconds */ + libfloat_millisecond_t timeout_elapsed; /*!< Current time elasped between two heartbeats */ + libfloat_millisecond_t election_timeout_rand; /*!< Randomized election time */ + libfloat_millisecond_t request_timeout; /*!< Timeout for AE */ + uint32_t logs_check; /*!< Last check counter of log stuck */ + bool stepping_down; /*!< Is the node stepping down from leadership */ + libfloat_second_t lost_leader_time; /*!< Timestamp that we lost our leader */ + libfloat_second_t last_log_time; /*!< Last time we applied a log */ + libfloat_second_t last_activity_time; /*!< Last time we had an activity (deep sleep state change, log, ...) */ #define LIBFLOAT_DEEP_SLEEP_STATE_MAX 4 - uint32_t deep_sleep_state; /*!< Deep sleep state of the cluster */ + uint32_t deep_sleep_state; /*!< Deep sleep state of the cluster */ struct { - bool checking; /*!< Are we checking that every node has lost the leader? */ - bool recovering; /*!< We've just recoverd a leader from a gray-failure check */ - libfloat_second_t check_time; /*!< Time that we launched the check */ + bool checking; /*!< Are we checking that every node has lost the leader? */ + bool recovering; /*!< We've just recoverd a leader from a gray-failure check */ + libfloat_second_t check_time; /*!< Time that we launched the check */ } gray_failures; - bool is_snapshotting; /*!< Is the cluster currently snapshotting */ - libfloat_entry_id_t snapshot_to_commit; /*!< Last log ID that is to be written after the snapshot is done */ + bool is_snapshotting; /*!< Is the cluster currently snapshotting */ + libfloat_entry_id_t snapshot_to_commit; /*!< Last log ID that is to be written after the snapshot is done */ - libfloat_list_t logs; /*<! Logs with commit still not acked */ + libfloat_list_t logs; /*<! Logs with commit still not acked */ - void *udata; /*!< User data */ + void *udata; /*!< User data */ /*! * \brief Send a Vote Request to a node diff --git a/periodic.c b/periodic.c index 32ed0d4..ac4cedd 100644 --- a/periodic.c +++ b/periodic.c @@ -12,11 +12,11 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time) if (ctx->conf.deep_sleep_time != 0) { - if (ctx->last_log_time + ctx->conf.deep_sleep_time < ms_to_s(ctx->global_timer)) + if (ctx->last_activity_time + ctx->conf.deep_sleep_time < ms_to_s(ctx->global_timer)) { if (ctx->deep_sleep_state < LIBFLOAT_DEEP_SLEEP_STATE_MAX) { - ctx->last_log_time = ms_to_s(ctx->global_timer); + ctx->last_activity_time = ms_to_s(ctx->global_timer); ctx->conf.deep_sleep_time *= 2; ctx->conf.election_timeout *= 2; @@ -109,6 +109,15 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time) /* check logs still waiting */ ctx->logs_check = 0; } + + if (ctx->conf.soft_compact_every_n_log != 0) + { + /* If soft compact is enabled, let's check if the timing is right */ + if (ctx->last_log_time + ctx->conf.soft_compact_time < ctx->time(NULL) && ctx->persistent.commit_index - ctx->persistent.snapshot.index >= ctx->conf.soft_compact_every_n_log) + { + ctx->append_snapshot_log(ctx, ctx->persistent.commit_index, ctx->persistent.term); + } + }
Please split the main condition in two different conditions for reading simplicity:
} else if (ctx->election_timeout_rand <= ctx->timeout_elapsed) { diff --git a/raft.c b/raft.c index 2a425b1..517f98e 100644 --- a/raft.c +++ b/raft.c @@ -91,6 +91,7 @@ void libfloat_set_current_commit_index(libfloat_ctx_t *ctx, libfloat_entry_id_t return; ctx->last_log_time = ms_to_s(ctx->global_timer); + ctx->last_activity_time = ms_to_s(ctx->global_timer); libfloat_wake_up(ctx); ctx->persistent.commit_index = id; -- 2.41.0
And double check that we're not already snapshotting (!ctx->is_snapshotting). Otherwise, looks good. -- Louis Solofrizzo