Julien Egloff: 1 Ensure libfloat is resistant to clock drifting 10 files changed, 66 insertions(+), 50 deletions(-)
Copy & paste the following snippet into your terminal to import this patchset into git:
curl -s https://lists.sr.ht/~ne02ptzero/libfloat/patches/42580/mbox | git am -3Learn more about email & git
Signed-off-by: Julien Egloff <jegloff@scaleway.com> --- election.c | 10 +++++----- internal.h | 8 ++++++++ libfloat.c | 4 ++-- libfloat.h | 44 +++++++++++++++++++++++--------------------- log.c | 4 ++-- log.h | 2 +- node.h | 2 +- periodic.c | 30 ++++++++++++++++++------------ raft.c | 10 +++++----- snapshot.c | 2 +- 10 files changed, 66 insertions(+), 50 deletions(-) diff --git a/election.c b/election.c index 4239e0c..c6dfccb 100644 --- a/election.c +++ b/election.c @@ -47,7 +47,7 @@ void __libfloat_election_start(libfloat_ctx_t *ctx, libfloat_elections_args_t *a /* Randomize election timeout, and reset counter */ ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout; - ctx->timeout_elapsed = 0; + ctx->timeout_elapsed = reset_timer(); if (ctx->persistent.commit_index > 0) { @@ -156,7 +156,7 @@ void libfloat_request_vote_receive(libfloat_ctx_t *ctx, libfloat_rpc_request_vot /* Vote reset */ libfloat_vote_for(ctx, 0); - ctx->timeout_elapsed = 0; + ctx->timeout_elapsed = reset_timer(); } if (libfloat_can_i_grant_vote(ctx, req)) @@ -166,7 +166,7 @@ void libfloat_request_vote_receive(libfloat_ctx_t *ctx, libfloat_rpc_request_vot resp->vote_granted = true; libfloat_update_leader(ctx, LIBFLOAT_NO_LEADER); - ctx->timeout_elapsed = 0; + ctx->timeout_elapsed = reset_timer(); } else { @@ -278,7 +278,7 @@ void libfloat_update_leader(libfloat_ctx_t *ctx, libfloat_node_t *node) if (ctx->leader != NULL && node == LIBFLOAT_NO_LEADER) { /* We've just lost our leader! */ - ctx->lost_leader_time = ctx->time(NULL); + ctx->lost_leader_time = ms_to_s(ctx->global_timer); ctx->write_current_leader(ctx, 0); } @@ -286,7 +286,7 @@ void libfloat_update_leader(libfloat_ctx_t *ctx, libfloat_node_t *node) if (ctx->leader == LIBFLOAT_NO_LEADER && node != NULL) { /* We have a new leader! */ - ctx->stat.last_election_duration = ctx->time(NULL) - ctx->lost_leader_time; + ctx->stat.last_election_duration = ms_to_s(ctx->global_timer) - ctx->lost_leader_time; ctx->stat.leader_election++; ctx->write_current_leader(ctx, node->id); diff --git a/internal.h b/internal.h index 14d95d1..723d37a 100644 --- a/internal.h +++ b/internal.h @@ -11,6 +11,7 @@ # define min(x, y) ((x) < (y) ? (x) : (y)) # endif +#define reset_timer() 0 /*! * \brief Start an election * @@ -136,4 +137,11 @@ void libfloat_log_memory_cleanup(libfloat_ctx_t *ctx); #define LIBFLOAT_NO_LEADER NULL void libfloat_update_leader(libfloat_ctx_t *ctx, libfloat_node_t *node); +/*! + * \brief Convert libfloat_millisecond_t in libfloat_second_t + * + * \param[in] s libfloat_millisecond_t to convert + */ +libfloat_second_t ms_to_s(libfloat_millisecond_t ms); + #endif /* LIBFLOAT_INTERNAL_H */ diff --git a/libfloat.c b/libfloat.c index 14d7a4a..c075d5c 100644 --- a/libfloat.c +++ b/libfloat.c @@ -136,7 +136,7 @@ void libfloat_update_last_update(libfloat_ctx_t *ctx, libfloat_node_id_t id) libfloat_node_t *node = libfloat_get_node(ctx, id); if (node != NULL) - node->last_update = ctx->time(NULL); + node->last_update = ms_to_s(ctx->global_timer); } } @@ -165,7 +165,7 @@ void libfloat_wake_up(libfloat_ctx_t *ctx) ctx->rand() % ctx->conf.election_timeout; ctx->deep_sleep_state = 0; - ctx->last_log_time = ctx->time(NULL); + ctx->last_log_time = ms_to_s(ctx->global_timer); ctx->write_current_sleep_state(ctx, ctx->deep_sleep_state, ctx->conf.original.deep_sleep_time); } } diff --git a/libfloat.h b/libfloat.h index 9a529b4..eaef087 100644 --- a/libfloat.h +++ b/libfloat.h @@ -7,12 +7,13 @@ #include <stdbool.h> #include <stdint.h> #include <stdarg.h> -#include <time.h> typedef uint32_t libfloat_term_t; typedef uint32_t libfloat_entry_id_t; typedef uint32_t libfloat_node_id_t; typedef struct libfloat_ctx_s libfloat_ctx_t; +typedef uint64_t libfloat_second_t; +typedef uint64_t libfloat_millisecond_t; #include "log.h" #include "rpc.h" @@ -43,30 +44,31 @@ struct libfloat_ctx_s { } snapshot; } persistent; + struct { - uint64_t leader_election; /*!< Count of leader elections for this cluster */ - uint64_t orphans_logs; /*!< Count of logs that are applied on the leader only */ - time_t leader_election_time; /*!< Timestamp of the last leader election */ - const char *last_election_reason; /*!< Last reason we had an election */ - time_t last_election_duration; /*!< Time it took to elect a leader for the last election */ + uint64_t leader_election; /*!< Count of leader elections for this cluster */ + uint64_t orphans_logs; /*!< Count of logs that are applied on the leader only */ + libfloat_second_t leader_election_time; /*!< Timestamp of the last leader election */ + const char *last_election_reason; /*!< Last reason we had an election */ + libfloat_second_t last_election_duration; /*!< Time it took to elect a leader for the last election */ } stat; struct { - uint32_t election_timeout; /*!< Timeout for RAFT election (ms) */ - uint32_t log_commit_timeout; /*!< Timeout for log application (s) */ + libfloat_millisecond_t election_timeout; /*!< Timeout for RAFT election (ms) */ + libfloat_second_t log_commit_timeout; /*!< Timeout for log application (s) */ uint32_t compact_every_n_log; /*!< Snapshot every N logs */ bool avoid_congestion; /*!< Avoid spamming AE to nodes already behind */ - uint32_t sanity_timeout; /*!< Timeout before a leader steps-down if it can't reach any other nodes (sec) */ + libfloat_second_t sanity_timeout; /*!< Timeout before a leader steps-down if it can't reach any other nodes (sec) */ bool optimistic_replication; /*!< Do we assume that nodes are gonna replicate the log or not */ uint32_t max_logs_per_ae; /*!< Max number of logs to be sent per AE request */ bool do_revert; /*!< Whether or not to revert logs */ - uint32_t deep_sleep_time; /*!< Time with no activity before a cluster goes in deep-sleep mode */ + libfloat_second_t deep_sleep_time; /*!< Time with no activity before a cluster goes in deep-sleep mode */ struct { - uint32_t election_timeout; - uint32_t sanity_timeout; - uint32_t request_timeout; - uint32_t deep_sleep_time; + libfloat_millisecond_t election_timeout; + libfloat_second_t sanity_timeout; + libfloat_millisecond_t request_timeout; + libfloat_second_t deep_sleep_time; } original; } conf; @@ -76,20 +78,21 @@ struct libfloat_ctx_s { libfloat_node_t *leader; /*!< Current Leader (can be NULL) */ libfloat_node_t *me; /*!< My node in the cluster */ - uint32_t timeout_elapsed; /*!< Current time elasped between two heartbeats */ - uint32_t election_timeout_rand; /*!< Randomized election time */ - uint32_t request_timeout; /*!< Timeout for AE */ + libfloat_millisecond_t global_timer; /*!< Timer used for synchronisation, in millisconds */ + libfloat_millisecond_t timeout_elapsed; /*!< Current time elasped between two heartbeats */ + libfloat_millisecond_t election_timeout_rand; /*!< Randomized election time */ + libfloat_millisecond_t request_timeout; /*!< Timeout for AE */ uint32_t logs_check; /*!< Last check counter of log stuck */ bool stepping_down; /*!< Is the node stepping down from leadership */ - time_t lost_leader_time; /*!< Timestamp that we lost our leader */ - time_t last_log_time; /*!< Last time we applied a log */ + libfloat_second_t lost_leader_time; /*!< Timestamp that we lost our leader */ + libfloat_second_t last_log_time; /*!< Last time we applied a log */ #define LIBFLOAT_DEEP_SLEEP_STATE_MAX 4 uint32_t deep_sleep_state; /*!< Deep sleep state of the cluster */ struct { bool checking; /*!< Are we checking that every node has lost the leader? */ bool recovering; /*!< We've just recoverd a leader from a gray-failure check */ - time_t check_time; /*!< Time that we launched the check */ + libfloat_second_t check_time; /*!< Time that we launched the check */ } gray_failures; @@ -332,7 +335,6 @@ struct libfloat_ctx_s { void *(*calloc)(size_t, size_t); void (*free)(void *); int (*rand)(void); - long int (*time)(long int *); int (*vsnprintf)(char *, size_t, const char *, va_list); }; diff --git a/log.c b/log.c index 53bf93d..5ed9ece 100644 --- a/log.c +++ b/log.c @@ -86,7 +86,7 @@ bool libfloat_add_log(libfloat_ctx_t *ctx, libfloat_commit_type_t commit_type, l entry->commit = commit; entry->udata = udata; entry->commit_type = commit_type; - entry->started = ctx->time(NULL); + entry->started = ms_to_s(ctx->global_timer); entry->node_acks.prev = &entry->node_acks; entry->node_acks.next = &entry->node_acks; @@ -628,7 +628,7 @@ void libfloat_append_entries_response(libfloat_ctx_t *ctx, libfloat_rpc_append_e } /* Set the next index to sent to the node */ - node->last_update = ctx->time(NULL); + node->last_update = ms_to_s(ctx->global_timer); if (node->replicated_log < resp->current_index) { diff --git a/log.h b/log.h index 1ff2b66..2e00e1d 100644 --- a/log.h +++ b/log.h @@ -41,7 +41,7 @@ typedef struct { void (*commit)(void *, libfloat_commit_status_t); /*!< Commit callback */ libfloat_commit_type_t commit_type; /*!< Commit type */ void *udata; /*!< User data for callback */ - time_t started; /*!< Age of the log */ + libfloat_second_t started; /*!< Age of the log */ libfloat_list_t node_acks; /*!< List of nodes that have replicated this log */ libfloat_list_t next; diff --git a/node.h b/node.h index 654f275..cf70566 100644 --- a/node.h +++ b/node.h @@ -17,7 +17,7 @@ typedef struct { uint8_t has_responded_to_leader_check : 1; void *udata; /*!< User data */ - time_t last_update; /*!< Time of the last AE response (If I am the leader) */ + libfloat_second_t last_update; /*!< Time of the last AE response (If I am the leader) */ int snapshot_count; /*!< Count of the times we are supposed to send a snapshot */ bool hearbeating; } libfloat_node_t; diff --git a/periodic.c b/periodic.c index 4e55316..32ed0d4 100644 --- a/periodic.c +++ b/periodic.c @@ -1,16 +1,22 @@ #include "internal.h" +libfloat_second_t ms_to_s(libfloat_millisecond_t ms) +{ + return ms / 1000; +} + void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time) { ctx->timeout_elapsed += time; + ctx->global_timer += time; if (ctx->conf.deep_sleep_time != 0) { - if (ctx->last_log_time + ctx->conf.deep_sleep_time < ctx->time(NULL)) + if (ctx->last_log_time + ctx->conf.deep_sleep_time < ms_to_s(ctx->global_timer)) { if (ctx->deep_sleep_state < LIBFLOAT_DEEP_SLEEP_STATE_MAX) { - ctx->last_log_time = ctx->time(NULL); + ctx->last_log_time = ms_to_s(ctx->global_timer); ctx->conf.deep_sleep_time *= 2; ctx->conf.election_timeout *= 2; @@ -39,7 +45,7 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time) { nodes_reachable++; } - else if (node->last_update + ctx->conf.sanity_timeout > ctx->time(NULL)) + else if (node->last_update + ctx->conf.sanity_timeout > ms_to_s(ctx->global_timer)) { nodes_reachable++; } @@ -52,7 +58,7 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time) * Time to step down! */ libfloat_become_follower(ctx, .reason = "not enough nodes reachable"); - ERROR(ctx, "Sanity timeout has been reached (%d seconds), stepping down from leader position: reachable %lu / quorum %lu (%lu)", + ERROR(ctx, "Sanity timeout has been reached (%ld seconds), stepping down from leader position: reachable %lu / quorum %lu (%lu)", ctx->conf.sanity_timeout, nodes_reachable, ctx->n_nodes / 2 + 1, ctx->n_nodes); return; } @@ -60,7 +66,7 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time) /* Timeout has expired! Time to send some heartbeats or some entries */ libfloat_send_append_entries_to_all(ctx, true); - ctx->timeout_elapsed = 0; + ctx->timeout_elapsed = reset_timer(); } /* every second, check logs with commit */ @@ -68,7 +74,7 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time) if (ctx->logs_check >= 5) { libfloat_log_entry_t *entry, *n = NULL; - time_t now = ctx->time(NULL); + uint64_t now = ms_to_s(ctx->global_timer); libfloat_list_for_each_entry_safe(entry, n, &ctx->logs, next) { @@ -113,9 +119,9 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time) if (ctx->is_leader_healthy == NULL) { /* DynamoDB-like elections are not implemented, let's simply start an election */ - DEBUG(ctx, "New election: timeout elapsed %d", ctx->timeout_elapsed); + DEBUG(ctx, "New election: timeout elapsed %ld", ctx->timeout_elapsed); libfloat_election_start(ctx, .reason = "Election timeout, start a new one", .force = true); - ctx->lost_leader_time -= (ctx->conf.election_timeout / 1000); + ctx->lost_leader_time -= ms_to_s(ctx->conf.election_timeout); } else { @@ -134,11 +140,11 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time) { /* Discard leadership */ libfloat_update_leader(ctx, LIBFLOAT_NO_LEADER); - ctx->lost_leader_time -= (ctx->conf.election_timeout / 1000); + ctx->lost_leader_time -= ms_to_s(ctx->conf.election_timeout); /* We did not fire the requests yet, let's init stuff and do it */ ctx->gray_failures.checking = true; - ctx->gray_failures.check_time = ctx->time(NULL); + ctx->gray_failures.check_time = ms_to_s(ctx->global_timer); ERROR(ctx, "Launching leader-check because timeout has been reached"); for_every_node(ctx, node, { @@ -158,7 +164,7 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time) uint32_t node_with_leaders = 0; uint32_t node_responses = 0; - if (ctx->gray_failures.check_time + (ctx->conf.election_timeout / 1000) < ctx->time(NULL)) + if (ctx->gray_failures.check_time + ms_to_s(ctx->conf.election_timeout) < ms_to_s(ctx->global_timer)) { /* We did not receive any reponses, and the election timeout has expired twice, let's launch an election */ ctx->gray_failures.checking = false; @@ -185,7 +191,7 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time) if (node_with_leaders > (ctx->n_nodes / 2)) { /* Some nodes in the cluster still have a leader, let's postpone the election for now */ - ctx->timeout_elapsed = 0; + ctx->timeout_elapsed = reset_timer(); ctx->gray_failures.checking = false; } else if (node_responses >= (ctx->n_nodes / 2) && node_with_leaders <= (ctx->n_nodes / 2)) diff --git a/raft.c b/raft.c index 38c1f93..2a425b1 100644 --- a/raft.c +++ b/raft.c @@ -23,7 +23,7 @@ void libfloat_become_leader(libfloat_ctx_t *ctx) }); ctx->me->is_up_to_date = 1; - ctx->stat.leader_election_time = ctx->time(NULL); + ctx->stat.leader_election_time = ms_to_s(ctx->global_timer); } void __libfloat_become_candidate(libfloat_ctx_t *ctx, libfloat_elections_args_t *args) @@ -31,13 +31,13 @@ void __libfloat_become_candidate(libfloat_ctx_t *ctx, libfloat_elections_args_t DEBUG(ctx, "Becoming candidate: reason=%s", args->reason); ctx->state = RAFT_STATE_CANDIDATE; ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout; - ctx->stat.leader_election_time = 0; + ctx->stat.leader_election_time = reset_timer(); } void __libfloat_become_follower(libfloat_ctx_t *ctx, libfloat_elections_args_t *args) { ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout; - ctx->stat.leader_election_time = 0; + ctx->stat.leader_election_time = reset_timer(); libfloat_vote_for(ctx, 0); @@ -61,7 +61,7 @@ void libfloat_step_down(libfloat_ctx_t *ctx) libfloat_update_leader(ctx, LIBFLOAT_NO_LEADER); ctx->state = RAFT_STATE_NONE; - ctx->stat.leader_election_time = 0; + ctx->stat.leader_election_time = reset_timer(); } void libfloat_send_append_entries_to_all(libfloat_ctx_t *ctx, bool heartbeat) @@ -90,7 +90,7 @@ void libfloat_set_current_commit_index(libfloat_ctx_t *ctx, libfloat_entry_id_t if (id <= ctx->persistent.commit_index) return; - ctx->last_log_time = ctx->time(NULL); + ctx->last_log_time = ms_to_s(ctx->global_timer); libfloat_wake_up(ctx); ctx->persistent.commit_index = id; diff --git a/snapshot.c b/snapshot.c index e602ed3..1aa5cd7 100644 --- a/snapshot.c +++ b/snapshot.c @@ -36,7 +36,7 @@ static void libfloat_internal_snapshot_apply_log(libfloat_ctx_t *ctx) if (ctx->state == RAFT_STATE_LEADER) libfloat_log_add_node_ack(ctx, entry, ctx->me->id); - ctx->timeout_elapsed = 0; + ctx->timeout_elapsed = reset_timer(); libfloat_set_current_commit_index(ctx, entry->id); if (entry->commit_type == LIBFLOAT_EVENTUALLY_CONSISTENT) -- 2.41.0
LGTM -- Louis Solofrizzo Team Storage
LG