~ne02ptzero/libfloat

This thread contains a patchset. You're looking at the original emails, but you may wish to use the patch review UI. Review patch

[PATCH] Ensure libfloat is resistant to clock drifting --V3

Julien Egloff <jegloff@scaleway.com>
Details
Message ID
<20230711130334.92885-2-jegloff@scaleway.com>
DKIM signature
missing
Download raw message
Patch: +58 -43
Signed-off-by: Julien Egloff <jegloff@scaleway.com>
---
 election.c |  4 ++--
 libfloat.c |  4 ++--
 libfloat.h | 53 +++++++++++++++++++++++++++++++----------------------
 log.c      |  4 ++--
 log.h      |  2 +-
 node.h     |  2 +-
 periodic.c | 28 +++++++++++++++++-----------
 raft.c     |  4 ++--
 8 files changed, 58 insertions(+), 43 deletions(-)

diff --git a/election.c b/election.c
index 4239e0c..ecc184b 100644
--- a/election.c
+++ b/election.c
@@ -278,7 +278,7 @@ void libfloat_update_leader(libfloat_ctx_t *ctx, libfloat_node_t *node)
    if (ctx->leader != NULL && node == LIBFLOAT_NO_LEADER)
    {
        /* We've just lost our leader! */
        ctx->lost_leader_time = ctx->time(NULL);
        ctx->lost_leader_time = ms_to_s(ctx->global_timer);

        ctx->write_current_leader(ctx, 0);
    }
@@ -286,7 +286,7 @@ void libfloat_update_leader(libfloat_ctx_t *ctx, libfloat_node_t *node)
    if (ctx->leader == LIBFLOAT_NO_LEADER && node != NULL)
    {
        /* We have a new leader! */
        ctx->stat.last_election_duration = ctx->time(NULL) - ctx->lost_leader_time;
        ctx->stat.last_election_duration = ms_to_s(ctx->global_timer) - ctx->lost_leader_time;
        ctx->stat.leader_election++;

        ctx->write_current_leader(ctx, node->id);
diff --git a/libfloat.c b/libfloat.c
index 14d7a4a..c075d5c 100644
--- a/libfloat.c
+++ b/libfloat.c
@@ -136,7 +136,7 @@ void libfloat_update_last_update(libfloat_ctx_t *ctx, libfloat_node_id_t id)
        libfloat_node_t         *node = libfloat_get_node(ctx, id);

        if (node != NULL)
            node->last_update = ctx->time(NULL);
            node->last_update = ms_to_s(ctx->global_timer);
    }
}

@@ -165,7 +165,7 @@ void libfloat_wake_up(libfloat_ctx_t *ctx)
            ctx->rand() % ctx->conf.election_timeout;

        ctx->deep_sleep_state = 0;
        ctx->last_log_time = ctx->time(NULL);
        ctx->last_log_time = ms_to_s(ctx->global_timer);
        ctx->write_current_sleep_state(ctx, ctx->deep_sleep_state, ctx->conf.original.deep_sleep_time);
    }
}
diff --git a/libfloat.h b/libfloat.h
index 9a529b4..b77d0ac 100644
--- a/libfloat.h
+++ b/libfloat.h
@@ -7,12 +7,13 @@
#include <stdbool.h>
#include <stdint.h>
#include <stdarg.h>
#include <time.h>

typedef uint32_t libfloat_term_t;
typedef uint32_t libfloat_entry_id_t;
typedef uint32_t libfloat_node_id_t;
typedef struct libfloat_ctx_s libfloat_ctx_t;
typedef uint64_t libfloat_second_t;
typedef uint64_t libfloat_millisecond_t;

#include "log.h"
#include "rpc.h"
@@ -43,30 +44,31 @@ struct libfloat_ctx_s {
        } snapshot;
    } persistent;


    struct {
        uint64_t        leader_election;                /*!< Count of leader elections for this cluster */
        uint64_t        orphans_logs;                   /*!< Count of logs that are applied on the leader only */
        time_t          leader_election_time;           /*!< Timestamp of the last leader election */
        const char      *last_election_reason;          /*!< Last reason we had an election */
        time_t          last_election_duration;         /*!< Time it took to elect a leader for the last election */
        uint64_t          leader_election;                /*!< Count of leader elections for this cluster */
        uint64_t          orphans_logs;                   /*!< Count of logs that are applied on the leader only */
        libfloat_second_t leader_election_time;           /*!< Timestamp of the last leader election */
        const char        *last_election_reason;          /*!< Last reason we had an election */
        libfloat_second_t last_election_duration;         /*!< Time it took to elect a leader for the last election */
    } stat;

    struct {
        uint32_t                election_timeout;       /*!< Timeout for RAFT election (ms) */
        uint32_t                log_commit_timeout;     /*!< Timeout for log application (s) */
        libfloat_millisecond_t  election_timeout;       /*!< Timeout for RAFT election (ms) */
        libfloat_second_t       log_commit_timeout;     /*!< Timeout for log application (s) */
        uint32_t                compact_every_n_log;    /*!< Snapshot every N logs */
        bool                    avoid_congestion;       /*!< Avoid spamming AE to nodes already behind */
        uint32_t                sanity_timeout;         /*!< Timeout before a leader steps-down if it can't reach any other nodes (sec) */
        libfloat_second_t       sanity_timeout;         /*!< Timeout before a leader steps-down if it can't reach any other nodes (sec) */
        bool                    optimistic_replication; /*!< Do we assume that nodes are gonna replicate the log or not */
        uint32_t                max_logs_per_ae;        /*!< Max number of logs to be sent per AE request */
        bool                    do_revert;              /*!< Whether or not to revert logs */
        uint32_t                deep_sleep_time;        /*!< Time with no activity before a cluster goes in deep-sleep mode */
        libfloat_second_t       deep_sleep_time;        /*!< Time with no activity before a cluster goes in deep-sleep mode */

        struct {
            uint32_t            election_timeout;
            uint32_t            sanity_timeout;
            uint32_t            request_timeout;
            uint32_t            deep_sleep_time;
            libfloat_millisecond_t election_timeout;
            libfloat_second_t      sanity_timeout;
            libfloat_millisecond_t request_timeout;
            libfloat_second_t      deep_sleep_time;
        } original;
    } conf;

@@ -76,20 +78,21 @@ struct libfloat_ctx_s {
    libfloat_node_t             *leader;                /*!< Current Leader (can be NULL) */
    libfloat_node_t             *me;                    /*!< My node in the cluster */

    uint32_t                    timeout_elapsed;        /*!< Current time elasped between two heartbeats */
    uint32_t                    election_timeout_rand;  /*!< Randomized election time */
    uint32_t                    request_timeout;        /*!< Timeout for AE */
    libfloat_millisecond_t      global_timer;           /*!< Timer used for synchronisation, in millisconds */
    libfloat_millisecond_t      timeout_elapsed;        /*!< Current time elasped between two heartbeats */
    libfloat_millisecond_t      election_timeout_rand;  /*!< Randomized election time */
    libfloat_millisecond_t      request_timeout;        /*!< Timeout for AE */
    uint32_t                    logs_check;             /*!< Last check counter of log stuck */
    bool                        stepping_down;          /*!< Is the node stepping down from leadership */
    time_t                      lost_leader_time;       /*!< Timestamp that we lost our leader */
    time_t                      last_log_time;          /*!< Last time we applied a log */
    libfloat_second_t           lost_leader_time;       /*!< Timestamp that we lost our leader */
    libfloat_second_t           last_log_time;          /*!< Last time we applied a log */
#define LIBFLOAT_DEEP_SLEEP_STATE_MAX 4
    uint32_t                    deep_sleep_state;       /*!< Deep sleep state of the cluster */

    struct {
        bool                    checking;               /*!< Are we checking that every node has lost the leader? */
        bool                    recovering;             /*!< We've just recoverd a leader from a gray-failure check */
        time_t                  check_time;             /*!< Time that we launched the check */
        libfloat_second_t       check_time;             /*!< Time that we launched the check */
    } gray_failures;


@@ -332,7 +335,6 @@ struct libfloat_ctx_s {
    void        *(*calloc)(size_t, size_t);
    void        (*free)(void *);
    int         (*rand)(void);
    long int    (*time)(long int *);
    int         (*vsnprintf)(char *, size_t, const char *, va_list);
};

@@ -393,7 +395,7 @@ void libfloat_del_node(libfloat_ctx_t *ctx, libfloat_node_id_t id);
 * \param[in] ctx Libfloat context
 * \param[in] time Time elapsed between this call and the previous, in milliseconds
 */
void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time);
void libfloat_periodic(libfloat_ctx_t *ctx, libfloat_millisecond_t time);

/*!
 * \brief Append a log to the cluster
@@ -673,4 +675,11 @@ void libfloat_update_leader_check(libfloat_ctx_t *ctx, libfloat_node_id_t id, bo
 */
void libfloat_wake_up(libfloat_ctx_t *ctx);

/*!
 * \brief Convert libfloat_millisecond_t in libfloat_second_t
 *
 * \param[in] s libfloat_millisecond_t to convert
 */
libfloat_second_t ms_to_s(libfloat_millisecond_t ms);

#endif /* LIBFLOAT_H */
diff --git a/log.c b/log.c
index 53bf93d..5ed9ece 100644
--- a/log.c
+++ b/log.c
@@ -86,7 +86,7 @@ bool libfloat_add_log(libfloat_ctx_t *ctx, libfloat_commit_type_t commit_type, l
    entry->commit = commit;
    entry->udata = udata;
    entry->commit_type = commit_type;
    entry->started = ctx->time(NULL);
    entry->started = ms_to_s(ctx->global_timer);

    entry->node_acks.prev = &entry->node_acks;
    entry->node_acks.next = &entry->node_acks;
@@ -628,7 +628,7 @@ void libfloat_append_entries_response(libfloat_ctx_t *ctx, libfloat_rpc_append_e
    }

    /* Set the next index to sent to the node */
    node->last_update = ctx->time(NULL);
    node->last_update = ms_to_s(ctx->global_timer);

    if (node->replicated_log < resp->current_index)
    {
diff --git a/log.h b/log.h
index 1ff2b66..bb20ba0 100644
--- a/log.h
+++ b/log.h
@@ -41,7 +41,7 @@ typedef struct {
    void                        (*commit)(void *, libfloat_commit_status_t);    /*!< Commit callback */
    libfloat_commit_type_t      commit_type;                                    /*!< Commit type */
    void                        *udata;                                         /*!< User data for callback */
    time_t                      started;                                        /*!< Age of the log */
    uint64_t                    started;                                        /*!< Age of the log */
    libfloat_list_t             node_acks;                                      /*!< List of nodes that have replicated this log */

    libfloat_list_t             next;
diff --git a/node.h b/node.h
index 654f275..c614d61 100644
--- a/node.h
+++ b/node.h
@@ -17,7 +17,7 @@ typedef struct {
    uint8_t             has_responded_to_leader_check   : 1;

    void                *udata;                 /*!< User data */
    time_t              last_update;            /*!< Time of the last AE response (If I am the leader) */
    uint64_t            last_update;            /*!< Time of the last AE response (If I am the leader) */
    int                 snapshot_count;         /*!< Count of the times we are supposed to send a snapshot */
    bool                hearbeating;
} libfloat_node_t;
diff --git a/periodic.c b/periodic.c
index 4e55316..9908e72 100644
--- a/periodic.c
+++ b/periodic.c
@@ -1,16 +1,22 @@
#include "internal.h"

void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time)
libfloat_second_t ms_to_s(libfloat_second_t ms)
{
    return ms / 1000;
}

void libfloat_periodic(libfloat_ctx_t *ctx, libfloat_millisecond_t time)
{
    ctx->timeout_elapsed += time;
    ctx->global_timer += time;

    if (ctx->conf.deep_sleep_time != 0)
    {
        if (ctx->last_log_time + ctx->conf.deep_sleep_time < ctx->time(NULL))
        if (ctx->last_log_time + ctx->conf.deep_sleep_time < ms_to_s(ctx->global_timer))
        {
            if (ctx->deep_sleep_state < LIBFLOAT_DEEP_SLEEP_STATE_MAX)
            {
                ctx->last_log_time = ctx->time(NULL);
                ctx->last_log_time = ms_to_s(ctx->global_timer);

                ctx->conf.deep_sleep_time *= 2;
                ctx->conf.election_timeout *= 2;
@@ -39,7 +45,7 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time)
                    {
                        nodes_reachable++;
                    }
                    else if (node->last_update + ctx->conf.sanity_timeout > ctx->time(NULL))
                    else if (node->last_update + ctx->conf.sanity_timeout > ms_to_s(ctx->global_timer))
                    {
                        nodes_reachable++;
                    }
@@ -52,7 +58,7 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time)
                    * Time to step down!
                    */
                    libfloat_become_follower(ctx, .reason = "not enough nodes reachable");
                    ERROR(ctx, "Sanity timeout has been reached (%d seconds), stepping down from leader position: reachable %lu / quorum %lu (%lu)",
                    ERROR(ctx, "Sanity timeout has been reached (%ld seconds), stepping down from leader position: reachable %lu / quorum %lu (%lu)",
                          ctx->conf.sanity_timeout, nodes_reachable, ctx->n_nodes / 2 + 1, ctx->n_nodes);
                    return;
                }
@@ -68,7 +74,7 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time)
        if (ctx->logs_check >= 5)
        {
            libfloat_log_entry_t        *entry, *n = NULL;
            time_t                      now = ctx->time(NULL);
            uint64_t                    now = ms_to_s(ctx->global_timer);

            libfloat_list_for_each_entry_safe(entry, n, &ctx->logs, next)
            {
@@ -113,9 +119,9 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time)
            if (ctx->is_leader_healthy == NULL)
            {
                /* DynamoDB-like elections are not implemented, let's simply start an election */
                DEBUG(ctx, "New election: timeout elapsed %d", ctx->timeout_elapsed);
                DEBUG(ctx, "New election: timeout elapsed %ld", ctx->timeout_elapsed);
                libfloat_election_start(ctx, .reason = "Election timeout, start a new one", .force = true);
                ctx->lost_leader_time -= (ctx->conf.election_timeout / 1000);
                ctx->lost_leader_time -= ms_to_s(ctx->conf.election_timeout);
            }
            else
            {
@@ -134,11 +140,11 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time)
                {
                    /* Discard leadership */
                    libfloat_update_leader(ctx, LIBFLOAT_NO_LEADER);
                    ctx->lost_leader_time -= (ctx->conf.election_timeout / 1000);
                    ctx->lost_leader_time -= ms_to_s(ctx->conf.election_timeout);

                    /* We did not fire the requests yet, let's init stuff and do it */
                    ctx->gray_failures.checking = true;
                    ctx->gray_failures.check_time = ctx->time(NULL);
                    ctx->gray_failures.check_time = ms_to_s(ctx->global_timer);

                    ERROR(ctx, "Launching leader-check because timeout has been reached");
                    for_every_node(ctx, node, {
@@ -158,7 +164,7 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time)
                    uint32_t    node_with_leaders = 0;
                    uint32_t    node_responses = 0;

                    if (ctx->gray_failures.check_time + (ctx->conf.election_timeout / 1000) < ctx->time(NULL))
                    if (ctx->gray_failures.check_time + ms_to_s(ctx->conf.election_timeout) < ms_to_s(ctx->global_timer))
                    {
                        /* We did not receive any reponses, and the election timeout has expired twice, let's launch an election */
                        ctx->gray_failures.checking = false;
diff --git a/raft.c b/raft.c
index 38c1f93..b0a8b27 100644
--- a/raft.c
+++ b/raft.c
@@ -23,7 +23,7 @@ void libfloat_become_leader(libfloat_ctx_t *ctx)
    });

    ctx->me->is_up_to_date = 1;
    ctx->stat.leader_election_time = ctx->time(NULL);
    ctx->stat.leader_election_time = ms_to_s(ctx->global_timer); 
}

void __libfloat_become_candidate(libfloat_ctx_t *ctx, libfloat_elections_args_t *args)
@@ -90,7 +90,7 @@ void libfloat_set_current_commit_index(libfloat_ctx_t *ctx, libfloat_entry_id_t
    if (id <= ctx->persistent.commit_index)
        return;

    ctx->last_log_time = ctx->time(NULL);
    ctx->last_log_time = ms_to_s(ctx->global_timer);
    libfloat_wake_up(ctx);

    ctx->persistent.commit_index = id;
-- 
2.41.0
Reply to thread Export thread (mbox)