~ne02ptzero/libfloat

Few improvmements v1 APPLIED

Michael Bonfils: 1
 Few improvmements

 8 files changed, 197 insertions(+), 41 deletions(-)
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.sr.ht/~ne02ptzero/libfloat/patches/25207/mbox | git am -3
Learn more about email & git

[PATCH] Few improvmements Export this patch

- ACK manage logs
- Cleanup logs memory in reload_state
- Avoid to reload all logs on startup
---
 election.c |  16 +++++--
 internal.h |  30 +++++++++++-
 libfloat.c |   2 +-
 libfloat.h |  11 +++++
 log.c      | 136 +++++++++++++++++++++++++++++++++++++++++++++++------
 log.h      |  10 +++-
 raft.c     |  28 +++++------
 snapshot.c |   5 +-
 8 files changed, 197 insertions(+), 41 deletions(-)

diff --git a/election.c b/election.c
index e648421..663ee99 100644
--- a/election.c
+++ b/election.c
@@ -21,6 +21,8 @@ static int libfloat_get_number_of_votes_for_me(libfloat_ctx_t *ctx)
void libfloat_election_start(libfloat_ctx_t *ctx)
{
    libfloat_node_t     *node;
    libfloat_term_t     last_term = 0;
    libfloat_entry_id_t last_id = 0;

    DEBUG(ctx, "Election starting!");
    /* First, reset the vote of everyone */
@@ -43,6 +45,8 @@ void libfloat_election_start(libfloat_ctx_t *ctx)
    ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout;
    ctx->timeout_elapsed = 0;

    libfloat_get_last_term(ctx, &last_id, &last_term);

    /* Send a vote request to each node of the cluster */
    for_every_node(ctx, node, {
        libfloat_rpc_request_vote_t     vote = { 0 };
@@ -56,8 +60,8 @@ void libfloat_election_start(libfloat_ctx_t *ctx)
        /* Set informations and send the request to the node */
        vote.term = ctx->persistent.term;
        vote.candidate_id = ctx->me->id;
        vote.last_log_index = ctx->last_log->id;
        vote.last_log_term = ctx->last_log->term;
        vote.last_log_index = last_id;
        vote.last_log_term = last_term;

        ctx->request_vote(ctx, node, &vote);
    });
@@ -65,6 +69,8 @@ void libfloat_election_start(libfloat_ctx_t *ctx)

static bool libfloat_can_i_grant_vote(libfloat_ctx_t *ctx, libfloat_rpc_request_vote_t *req)
{
    libfloat_term_t     last_term;

    /* Let's check if I have already voted for someone */
    if (ctx->persistent.voted_for != 0)
    {
@@ -73,14 +79,14 @@ static bool libfloat_can_i_grant_vote(libfloat_ctx_t *ctx, libfloat_rpc_request_
        return false;
    }

    if (ctx->last_log == NULL)
    if (libfloat_get_last_term(ctx, NULL, &last_term) == false)
    {
        /* Our log is empty, so let's assume we are _not_ up to date */
        /* We have failed to retrieve last_ferm from log */
        return false;
    }

    /* Check term match */
    if (ctx->last_log->term > req->last_log_term && req->last_log_index <= ctx->persistent.commit_index)
    if (last_term > req->last_log_term && req->last_log_index <= ctx->persistent.commit_index)
    {
        /* We have a superior term and a superior log, we can't grant our vote */
        return false;
diff --git a/internal.h b/internal.h
index ad41067..4815bef 100644
--- a/internal.h
+++ b/internal.h
@@ -63,7 +63,7 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
 * \brief Send AE requests to every node
 *
 * \param[in] ctx libfloat context
 */ 
 */
void libfloat_send_append_entries_to_all(libfloat_ctx_t *ctx);

void libfloat_internal_snapshot_begin(libfloat_ctx_t *ctx);
@@ -87,4 +87,32 @@ void libfloat_error(libfloat_ctx_t *ctx, const char *fmt, ...) __attribute__((fo
 */
void libfloat_set_error_str(const char *str);

/*!
 * \brief Count a node towards an acknowledge-replication list
 *
 * \param[in] ctx libfloat context
 * \param[in,out] log Entry to update
 * \param[in] id ID of the node that acked this log
 *
 * \note This function is replay-safe, eg; If a node tells us twice that a log
 * X is replicated, we only count it once.
 */
void libfloat_log_add_node_ack(libfloat_ctx_t *ctx, libfloat_log_entry_t *log, libfloat_node_id_t id);

/*!
 * \brief Free the list of acknowledges of a log
 *
 * \param[in] ctx libfloat context
 * \param[in,out] log Log to free
 */
void libfloat_log_free_acks(libfloat_ctx_t *ctx, libfloat_log_entry_t *log);

/*!
 * \brief Free all log from memory
 *
 * \param[in] ctx libfloat context
 */
void libfloat_log_memory_cleanup(libfloat_ctx_t *ctx);


#endif /* LIBFLOAT_INTERNAL_H */
diff --git a/libfloat.c b/libfloat.c
index 338a0eb..6763c5e 100644
--- a/libfloat.c
+++ b/libfloat.c
@@ -50,8 +50,8 @@ void libfloat_ctx_del(libfloat_ctx_t *ctx)
            ctx->free(log->data);
        }

        libfloat_log_free_acks(ctx, log);
        ctx->free(log);

    });

    kh_foreach_value(ctx->nodes, node, {
diff --git a/libfloat.h b/libfloat.h
index dcce50f..0ca3641 100644
--- a/libfloat.h
+++ b/libfloat.h
@@ -485,4 +485,15 @@ void libfloat_election_resign_receive(libfloat_ctx_t *ctx);
 */
const char *libfloat_get_error_str(void);

/*!
 * \brief Retrieve term of last log
 *
 * \param[in] ctx libfloat context
 * \param[out] id last commit from log
 * \param[out@ term last term from log
 *
 * \note return false if log is missing
 */
bool libfloat_get_last_term(libfloat_ctx_t *ctx, libfloat_entry_id_t *id, libfloat_term_t *term);

#endif /* LIBFLOAT_H */
diff --git a/log.c b/log.c
index ad95bc7..55c2851 100644
--- a/log.c
+++ b/log.c
@@ -1,5 +1,41 @@
#include "internal.h"

void libfloat_log_add_node_ack(libfloat_ctx_t *ctx, libfloat_log_entry_t *log, libfloat_node_id_t id)
{
    bool                found = false;
    libfloat_node_ack_t *ptr, *tmp;

    libfloat_list_for_each_entry_safe(ptr, tmp, &log->node_acks, next)
    {
        if (ptr->id == id)
        {
            found = true;
            break;
        }
    }

    if (found == false)
    {
        ptr = ctx->calloc(1, sizeof(*ptr));
        ptr->id = id;
        libfloat_list_add_tail(&ptr->next, &log->node_acks);
    }
}

void libfloat_log_free_acks(libfloat_ctx_t *ctx, libfloat_log_entry_t *log)
{
    libfloat_node_ack_t *ptr, *tmp;

    libfloat_list_for_each_entry_safe(ptr, tmp, &log->node_acks, next)
    {
        ctx->free(ptr);
    }

    /* Reset the list head */
    log->node_acks.prev = &log->node_acks;
    log->node_acks.next = &log->node_acks;
}

bool libfloat_add_log(libfloat_ctx_t *ctx, libfloat_commit_type_t commit_type, libfloat_log_data_t *log,
    void (*commit)(void *, libfloat_commit_status_t), void *udata)
{
@@ -50,6 +86,9 @@ bool libfloat_add_log(libfloat_ctx_t *ctx, libfloat_commit_type_t commit_type, l
    entry->commit_type = commit_type;
    entry->started = ctx->time(NULL);

    entry->node_acks.prev = &entry->node_acks;
    entry->node_acks.next = &entry->node_acks;

    iterator = kh_put(libfloat_entry_id_t, ctx->persistent.log, entry->id, &absent);
    kh_value(ctx->persistent.log, iterator) = entry;

@@ -83,7 +122,7 @@ bool libfloat_add_log(libfloat_ctx_t *ctx, libfloat_commit_type_t commit_type, l

    DEBUG(ctx, "libfloat_add_log: Successfully written log %d", entry->id);

    entry->node_count++;
    libfloat_log_add_node_ack(ctx, entry, ctx->me->id);
    ctx->timeout_elapsed = 0;
    ctx->last_log = entry;
    libfloat_set_current_commit_index(ctx, entry->id);
@@ -183,6 +222,10 @@ static bool libfloat_get_log_from_db(libfloat_ctx_t *ctx, size_t id, libfloat_lo
        khint_t                     iterator;
        int                         absent;

        /* Reset the list head */
        entry->node_acks.prev = &entry->node_acks;
        entry->node_acks.next = &entry->node_acks;

        iterator = kh_put(libfloat_entry_id_t, ctx->persistent.log, entry->id, &absent);
        kh_value(ctx->persistent.log, iterator) = entry;
        *out = entry;
@@ -229,6 +272,9 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
                ctx->persistent.snapshot.index
            );
            node->snapshot_count = 0;
            ctx->send_snapshot(ctx, node);
            /* check that there is not already a snapshot in progress */
            return;
        }
        else
            node->snapshot_count++;
@@ -404,15 +450,6 @@ void libfloat_append_entries_receive(libfloat_ctx_t *ctx, libfloat_rpc_append_en
                    /* We don't have a log anymore apparently, let's inform the leader */
                    resp->current_index = 0;
                }
                else
                {
                    if (req->prev_log_index >= ctx->persistent.commit_index)
                    {
                        /* Decrease our log by one */
                        ctx->persistent.commit_index = ctx->persistent.commit_index - 1;
                        ctx->write_commit_index(ctx, ctx->persistent.commit_index);
                    }
                }
                goto end;
            }
        }
@@ -479,6 +516,8 @@ response:
        log = ctx->calloc(1, sizeof(*log));
        log->id = req->entries[i]->id;
        log->term = req->entries[i]->term;
        log->node_acks.prev = &log->node_acks;
        log->node_acks.next = &log->node_acks;
        /* Don't allocate memory for data, we don't need to keep it in RAM for now */

        iterator = kh_put(libfloat_entry_id_t, ctx->persistent.log, log->id, &absent);
@@ -503,6 +542,7 @@ response:
            if (!ctx->write_log(ctx, req->entries[i]))
            {
                kh_del(libfloat_entry_id_t, ctx->persistent.log, iterator);
                libfloat_log_free_acks(ctx, log);
                ctx->free(log);
                ERROR(ctx, "libfloat_append_entries_receive: Cannot write log id=%u", req->entries[i]->id);
                goto end;
@@ -535,7 +575,7 @@ end:
            resp->current_index = ctx->persistent.commit_index;
        else
        {
            if (!ctx->is_snapshotting)
            if (!ctx->is_snapshotting && ctx->last_log)
                libfloat_set_current_commit_index(ctx, ctx->last_log->id);
        }
    }
@@ -654,10 +694,10 @@ void libfloat_append_entries_response(libfloat_ctx_t *ctx, libfloat_rpc_append_e

            log = kh_value(ctx->persistent.log, iterator);

            /* XXX: Can be replays, need to have a unique way to count acks */
            log->node_count++;

            if (log->node_count >= ctx->n_nodes)
            libfloat_log_add_node_ack(ctx, log, node->id);

            if (libfloat_list_count(&log->node_acks) >= ctx->n_nodes)
            {
                if (log->commit_type == LIBFLOAT_ABSOLUTELY_CONSISTENT)
                {
@@ -674,10 +714,11 @@ void libfloat_append_entries_response(libfloat_ctx_t *ctx, libfloat_rpc_append_e
                {
                    ctx->free(log->data->buf);
                    ctx->free(log->data);
                    libfloat_log_free_acks(ctx, log);
                    log->data = NULL;
                }
            }
            else if (log->node_count >= ctx->n_nodes / 2 + 1)
            else if (libfloat_list_count(&log->node_acks) >= ctx->n_nodes / 2 + 1)
            {
                if (log->commit_type <= LIBFLOAT_STRONGLY_CONSISTENT)
                {
@@ -705,3 +746,68 @@ void libfloat_append_entries_response(libfloat_ctx_t *ctx, libfloat_rpc_append_e
    else
        node->is_up_to_date = true;
}

bool libfloat_get_last_term(libfloat_ctx_t *ctx, libfloat_entry_id_t *id, libfloat_term_t *term)
{
    libfloat_entry_id_t                 i = ctx->persistent.commit_index;
    khint_t                             iterator;
    libfloat_log_entry_t                *log = NULL;

    iterator = kh_get(libfloat_entry_id_t, ctx->persistent.log, i);
    if (iterator == kh_end(ctx->persistent.log))
    {
        /* NOT FOUND, NEED TO LOAD LOG */
        if (!libfloat_get_log_from_db(ctx, i, &log))
        {
            ERROR(ctx, "Log %ld not found while retrieving last term", i);
            return false;
        }
    }
    else
    {
        log = kh_value(ctx->persistent.log, iterator);
    }

    if (log->data == NULL)
    {
        /* Log is not in memory, let's get it from db */
        log->data = ctx->malloc(sizeof(*log->data));
        ctx->get_log(ctx, log->id, &log->term, log->data);
    }

    if (term != NULL)
        *term = log->data->term;
    if (id != NULL)
        *id = log->id;

    return true;
}

void libfloat_log_memory_cleanup(libfloat_ctx_t *ctx)
{
    khiter_t                            iterator;
    libfloat_log_entry_t                *log = NULL;

    for(iterator = kh_begin(ctx->persistent.log); iterator != kh_end(ctx->persistent.log); iterator++)
    {
        if (kh_exist(ctx->persistent.log, iterator))
        {
            log = kh_value(ctx->persistent.log, iterator);

            if (log->data != NULL)
            {
                ctx->free(log->data->buf);
                ctx->free(log->data);
            }

            if (log->commit != NULL)
            {
                log->commit(log->udata, LIBFLOAT_ENTRY_REFUSED);
                libfloat_list_del(&log->next);
            }

            kh_del(libfloat_entry_id_t, ctx->persistent.log, iterator);
            ctx->free(log);
        }
    }
}
diff --git a/log.h b/log.h
index 051d37f..ad2a148 100644
--- a/log.h
+++ b/log.h
@@ -21,8 +21,16 @@ typedef struct {
    uint32_t            type;   /*!< Type of log. Implementation defined. 0 is reserved */
    size_t              len;    /*!< Length of the buffer */
    uint8_t             *buf;   /*!< Log data */

    /* */
    libfloat_term_t     term;   /*!< Log Term */
} libfloat_log_data_t;

typedef struct {
    libfloat_node_id_t  id;
    libfloat_list_t     next;
} libfloat_node_ack_t;

typedef struct {
    libfloat_entry_id_t         id;     /*!< Log ID */
    libfloat_term_t             term;   /*!< Log Term */
@@ -33,7 +41,7 @@ typedef struct {
    libfloat_commit_type_t      commit_type;                                    /*!< Commit type */
    void                        *udata;                                         /*!< User data for callback */
    time_t                      started;                                        /*!< Age of the log */
    uint32_t                    node_count;                                     /*!< Number of nodes that have replicated this log */
    libfloat_list_t             node_acks;                                      /*!< List of nodes that have replicated this log */

    libfloat_list_t             next;
} libfloat_log_entry_t;
diff --git a/raft.c b/raft.c
index 9070b2f..9bb83f7 100644
--- a/raft.c
+++ b/raft.c
@@ -104,6 +104,13 @@ void libfloat_reload_state(libfloat_ctx_t *ctx)
        ctx->persistent.snapshot.term = 0;
    }

    /* when reloading a snapshot, ctx may have older logs and we want to drop them */
    if (kh_size(ctx->persistent.log) > 0)
    {
        libfloat_log_memory_cleanup(ctx);
    }

    /* XXX some part of code try to load missing log but other not */
    for (size_t i = ctx->persistent.snapshot.index + 1; i <= ctx->persistent.commit_index; i++)
    {
        libfloat_log_entry_t    *log;
@@ -112,25 +119,12 @@ void libfloat_reload_state(libfloat_ctx_t *ctx)

        log = ctx->calloc(1, sizeof(*log));
        log->id = i;
        log->data = NULL;

        if (ctx->get_log(ctx, i, &log->term, NULL))
        {
            iterator = kh_put(libfloat_entry_id_t, ctx->persistent.log, log->id, &absent);
            kh_value(ctx->persistent.log, iterator) = log;

            ctx->last_log = log;
        }
        else
        {
            ERROR(ctx, "Consistent log is not the same as the expected commit index (%u vs %u)",
                ctx->last_log->id, ctx->persistent.commit_index);

            ctx->free(log);
        log->node_acks.prev = &log->node_acks;
        log->node_acks.next = &log->node_acks;

            ctx->persistent.commit_index = ctx->last_log->id;
            ctx->write_commit_index(ctx, ctx->persistent.commit_index);
        }
        iterator = kh_put(libfloat_entry_id_t, ctx->persistent.log, log->id, &absent);
        kh_value(ctx->persistent.log, iterator) = log;
    }
}

diff --git a/snapshot.c b/snapshot.c
index acf67a4..e487d4a 100644
--- a/snapshot.c
+++ b/snapshot.c
@@ -33,7 +33,9 @@ static void libfloat_internal_snapshot_apply_log(libfloat_ctx_t *ctx)
        }

        ERROR(ctx, "libfloat_internal_snapshot_apply_log: Successfully written log %d (type=%d)", entry->id, entry->data->type);
        entry->node_count++;
        if (ctx->state == RAFT_STATE_LEADER)
            libfloat_log_add_node_ack(ctx, entry, ctx->me->id);

        ctx->timeout_elapsed = 0;
        ctx->last_log = entry;
        libfloat_set_current_commit_index(ctx, entry->id);
@@ -161,6 +163,7 @@ void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success)

            kh_del(libfloat_entry_id_t, ctx->persistent.log, iterator);
            libfloat_list_del(&log->next);
            libfloat_log_free_acks(ctx, log);
            ctx->free(log);
        }
    }
-- 
2.25.1