~ne02ptzero/libfloat

This thread contains a patchset. You're looking at the original emails, but you may wish to use the patch review UI. Review patch

[PATCH] log: Apply deep-sleep state from heartbeats / leaders instead of computing locally

Louis Solofrizzo <lsolofrizzo@scaleway.com>
Details
Message ID
<20240319122330.1029963-1-lsolofrizzo@scaleway.com>
DKIM signature
pass
Download raw message
Patch: +55 -34
We're now applying the deep-sleep state from the AEs/Hearbeats from the
leader, instead of each node computing it locally. This way, only the
leader computes it, and gossip this state to other nodes via AEs, and
those node simply apply it. Solve some issues of desync deep-sleep
timers seen on production.

I've also reworked the deep-sleep routines a bit, in order to have a
single entry-point for it.

Signed-off-by: Louis Solofrizzo <lsolofrizzo@scaleway.com>
---
 CMakeLists.txt |  2 ++
 deep_sleep.c   | 34 ++++++++++++++++++++++++++++++++++
 internal.h     | 10 ++++++++++
 libfloat.c     | 10 +---------
 log.c          |  2 ++
 periodic.c     | 13 ++-----------
 raft.c         | 17 +++--------------
 rpc.h          |  1 +
 8 files changed, 55 insertions(+), 34 deletions(-)
 create mode 100644 deep_sleep.c

diff --git a/CMakeLists.txt b/CMakeLists.txt
index e976f8f..d2f46a6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -20,6 +20,7 @@ if (LIBFLOAT_BUILD_SHARED)
        periodic.c
        snapshot.c
        error.c
        deep_sleep.c
        externals/list.c
    )

@@ -38,6 +39,7 @@ add_library(float-static STATIC
    periodic.c
    snapshot.c
    error.c
    deep_sleep.c
    externals/list.c
)

diff --git a/deep_sleep.c b/deep_sleep.c
new file mode 100644
index 0000000..41d3dca
--- /dev/null
+++ b/deep_sleep.c
@@ -0,0 +1,34 @@
#include "internal.h"

void libfloat_apply_deep_sleep_state(libfloat_ctx_t *ctx, int deep_sleep_state)
{
    if (ctx->deep_sleep_state == deep_sleep_state)
        return;

    ctx->deep_sleep_state = deep_sleep_state;

    /* Revert to the original values */
    ctx->conf.deep_sleep_time = ctx->conf.original.deep_sleep_time;
    ctx->conf.election_timeout = ctx->conf.original.election_timeout;
    ctx->conf.sanity_timeout = ctx->conf.original.sanity_timeout;
    ctx->request_timeout = ctx->conf.original.request_timeout;

    /* Compute the timers for this state */
    for (int i = 0; i < deep_sleep_state; i++)
    {
        ctx->conf.deep_sleep_time *= 2;
        ctx->conf.election_timeout *= 2;
        ctx->request_timeout *= 2;
        ctx->conf.sanity_timeout *= 2;
    }

    /* Compute the election timeout */
    ctx->election_timeout_rand = ctx->conf.election_timeout +
        (ctx->rand() % ctx->conf.election_timeout);

    /* Write this state to persistent storage */
    ctx->write_current_sleep_state(ctx,
        ctx->deep_sleep_state,
        ctx->conf.original.deep_sleep_time
    );
}
diff --git a/internal.h b/internal.h
index 723d37a..484735e 100644
--- a/internal.h
+++ b/internal.h
@@ -144,4 +144,14 @@ void libfloat_update_leader(libfloat_ctx_t *ctx, libfloat_node_t *node);
 */
libfloat_second_t ms_to_s(libfloat_millisecond_t ms);

/*!
 * \brief Apply a deep-sleep state on the cluster, from an index
 *
 * \param[in] ctx libfloat context
 * \param[in] deep_sleep_state State to apply (from 0 to LIBFLOAT_DEEP_SLEEP_STATE_MAX)
 *
 * \note This function does nothing if the state has not changed
 */
void libfloat_apply_deep_sleep_state(libfloat_ctx_t *ctx, int deep_sleep_state);

#endif /* LIBFLOAT_INTERNAL_H */
diff --git a/libfloat.c b/libfloat.c
index 9b3be69..fba908b 100644
--- a/libfloat.c
+++ b/libfloat.c
@@ -156,17 +156,9 @@ void libfloat_wake_up(libfloat_ctx_t *ctx)
    if (ctx->deep_sleep_state != 0)
    {
        /* Wake-up from deep-sleep, let's set the timers back to their original values */
        ctx->conf.election_timeout = ctx->conf.original.election_timeout;
        ctx->request_timeout = ctx->conf.original.request_timeout;
        ctx->conf.sanity_timeout = ctx->conf.original.sanity_timeout;
        ctx->conf.deep_sleep_time = ctx->conf.original.deep_sleep_time;
        libfloat_apply_deep_sleep_state(ctx, 0);

        ctx->election_timeout_rand = ctx->conf.election_timeout +
            ctx->rand() % ctx->conf.election_timeout;

        ctx->deep_sleep_state = 0;
        ctx->last_log_time = ms_to_s(ctx->global_timer);
        ctx->last_activity_time = ms_to_s(ctx->global_timer);
        ctx->write_current_sleep_state(ctx, ctx->deep_sleep_state, ctx->conf.original.deep_sleep_time);
    }
}
diff --git a/log.c b/log.c
index 43c06c4..9f43c18 100644
--- a/log.c
+++ b/log.c
@@ -251,6 +251,7 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
    req.term = ctx->persistent.term;
    req.leader_id = ctx->me->id;
    req.leader_commit = ctx->persistent.commit_index;
    req.deep_sleep_state = ctx->deep_sleep_state;
    req.n_entries = 0;

    if (heartbeat)
@@ -432,6 +433,7 @@ void libfloat_append_entries_receive(libfloat_ctx_t *ctx, libfloat_rpc_append_en

    /* Reset timer */
    ctx->timeout_elapsed = 0;
    libfloat_apply_deep_sleep_state(ctx, req->deep_sleep_state);

    if (req->prev_log_index > ctx->persistent.commit_index)
    {
diff --git a/periodic.c b/periodic.c
index 8275800..4520770 100644
--- a/periodic.c
+++ b/periodic.c
@@ -10,23 +10,14 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time)
    ctx->timeout_elapsed += time;
    ctx->global_timer += time;

    if (ctx->conf.deep_sleep_time != 0)
    if (ctx->conf.deep_sleep_time != 0 && libfloat_am_i_leader(ctx))
    {
        if (ctx->last_activity_time + ctx->conf.deep_sleep_time < ms_to_s(ctx->global_timer))
        {
            if (ctx->deep_sleep_state < LIBFLOAT_DEEP_SLEEP_STATE_MAX)
            {
                ctx->last_activity_time = ms_to_s(ctx->global_timer);

                ctx->conf.deep_sleep_time *= 2;
                ctx->conf.election_timeout *= 2;
                ctx->request_timeout *= 2;
                ctx->conf.sanity_timeout *= 2;

                ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout;
                ctx->deep_sleep_state++;

                ctx->write_current_sleep_state(ctx, ctx->deep_sleep_state, ctx->conf.original.deep_sleep_time);
                libfloat_apply_deep_sleep_state(ctx, ctx->deep_sleep_state + 1);
            }
        }
    }
diff --git a/raft.c b/raft.c
index 517f98e..c103841 100644
--- a/raft.c
+++ b/raft.c
@@ -128,11 +128,6 @@ void libfloat_reload_state(libfloat_ctx_t *ctx)
    ctx->get_current_sleep_state(ctx, &deep_sleep_state, &deep_sleep_timer);
    current_leader = ctx->get_current_leader(ctx);

    if (deep_sleep_timer != 0)
    {
        ctx->conf.deep_sleep_time = ctx->conf.original.deep_sleep_time = deep_sleep_timer;
    }

    if (!ctx->get_last_snapshot(ctx, &ctx->persistent.snapshot.index, &ctx->persistent.snapshot.term))
    {
        /* No snapshot to load, let's set the defaults */
@@ -169,16 +164,10 @@ void libfloat_reload_state(libfloat_ctx_t *ctx)
        libfloat_update_leader(ctx, leader);
    }

    for (; ctx->deep_sleep_state < deep_sleep_state; ctx->deep_sleep_state++)
    {
        /* Restore the deep-sleep-state timers */
        ctx->conf.deep_sleep_time *= 2;
        ctx->conf.election_timeout *= 2;
        ctx->request_timeout *= 2;
        ctx->conf.sanity_timeout *= 2;
    if (deep_sleep_timer != 0)
        ctx->conf.deep_sleep_time = ctx->conf.original.deep_sleep_time = deep_sleep_timer;

        ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout;
    }
    libfloat_apply_deep_sleep_state(ctx, deep_sleep_state);
}

bool libfloat_am_i_up_to_date(libfloat_ctx_t *ctx)
diff --git a/rpc.h b/rpc.h
index 934ca12..2842ad1 100644
--- a/rpc.h
+++ b/rpc.h
@@ -20,6 +20,7 @@ typedef struct {
    libfloat_entry_id_t         prev_log_index;
    libfloat_term_t             prev_log_term;
    libfloat_entry_id_t         leader_commit;
    int                         deep_sleep_state;

    size_t                      n_entries;
    libfloat_log_entry_t        **entries;
-- 
2.44.0
Reply to thread Export thread (mbox)