Louis Solofrizzo: 1 log: Add real optimisic replication on log burst 5 files changed, 22 insertions(+), 5 deletions(-)
Copy & paste the following snippet into your terminal to import this patchset into git:
curl -s https://lists.sr.ht/~ne02ptzero/libfloat/patches/27200/mbox | git am -3Learn more about email & git
This patch adds some code to try not to duplicate log sending when we can avoid it. On log-write burst, the last known sent log is never overrided by the node AE response, in order to avoid replay. When heartbeating, the value _is_ overidded to allow replication on a out-of-date node. The main trade-off is that an out-of-date not will take longer to be up-to-date again (hearbeat time, best case), but less data will be sent on the wire. Signed-off-by: Louis Solofrizzo <lsolofrizzo@scaleway.com> --- internal.h | 3 ++- log.c | 14 ++++++++++++-- node.h | 1 + periodic.c | 2 +- raft.c | 7 ++++++- 5 files changed, 22 insertions(+), 5 deletions(-) diff --git a/internal.h b/internal.h index d01147e..0f59c30 100644 --- a/internal.h +++ b/internal.h @@ -63,8 +63,9 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo * \brief Send AE requests to every node * * \param[in] ctx libfloat context + * \param[in] hearbeating Whether or not this is a hearbeat */ -void libfloat_send_append_entries_to_all(libfloat_ctx_t *ctx); +void libfloat_send_append_entries_to_all(libfloat_ctx_t *ctx, bool hearbeating); /*! * \brief Start a snapshot diff --git a/log.c b/log.c index d877a54..06c9289 100644 --- a/log.c +++ b/log.c @@ -624,12 +624,22 @@ void libfloat_append_entries_response(libfloat_ctx_t *ctx, libfloat_rpc_append_e /* Set the next index to sent to the node */ node->last_update = ctx->time(NULL); - if (node->next_log_to_send <= resp->current_index) + + if (node->replicated_log < resp->current_index) { - node->next_log_to_send = resp->current_index + 1; node->replicated_log = resp->current_index; } + if (node->next_log_to_send <= resp->current_index && node->hearbeating) + { + node->next_log_to_send = resp->current_index + 1; + } + + if (node->hearbeating) + { + node->hearbeating = false; + } + if (ctx->persistent.term < resp->term) { libfloat_set_current_term(ctx, resp->term); diff --git a/node.h b/node.h index a91b4ef..f49ce9c 100644 --- a/node.h +++ b/node.h @@ -13,6 +13,7 @@ typedef struct { void *udata; /*!< User data */ time_t last_update; /*!< Time of the last AE response (If I am the leader) */ int snapshot_count; /*!< Count of the times we are supposed to send a snapshot */ + bool hearbeating; } libfloat_node_t; #endif /* LIBFLOAT_NODE_H */ diff --git a/periodic.c b/periodic.c index aceecd2..94bf728 100644 --- a/periodic.c +++ b/periodic.c @@ -34,7 +34,7 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time) } /* Timeout has expired! Time to send some heartbeats or some entries */ - libfloat_send_append_entries_to_all(ctx); + libfloat_send_append_entries_to_all(ctx, true); ctx->timeout_elapsed = 0; } diff --git a/raft.c b/raft.c index 1b651c4..bde3522 100644 --- a/raft.c +++ b/raft.c @@ -51,13 +51,18 @@ void libfloat_step_down(libfloat_ctx_t *ctx) ctx->state = RAFT_STATE_NONE; } -void libfloat_send_append_entries_to_all(libfloat_ctx_t *ctx) +void libfloat_send_append_entries_to_all(libfloat_ctx_t *ctx, bool heartbeat) { libfloat_node_t *node; for_every_node(ctx, node, { if (node != ctx->me) + { + if (heartbeat) + node->hearbeating = true; + libfloat_send_append_entries(ctx, node, false); + } }); } -- 2.33.1
LG