Git Product home page Git Product logo

lolraft's Introduction

lolraft

Crates.io API doc CI MIT licensed

A Multi-Raft implementation in Rust language.

Documentation

146726060-63b12378-ecb7-49f9-8025-a65dbd37e9b2

Features

スクリーンショット 2024-03-11 7 54 06

  • Implements all core Raft features for production use.
  • Supports Multi-Raft. Mutliple Raft processes can coexist in a single OS process so they can share resources efficiently.
    • Tested with 1000 shards.
    • Batched heartbeat optimization is implemented.
  • Based on Tonic and efficient gRPC streaming is exploited in log replication and snapshot.
  • Phi Accrual Failure Detector is used for leader failure detection. The adaptive algorithm allows you to not choose a fixed timeout number in prior to deployment and makes it possible to deploy Raft node in even geo-distributed environment.

Author

Akira Hayakawa
EMail: [email protected]

lolraft's People

Contributors

akiradeveloper avatar fteychene avatar nerdondon avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

lolraft's Issues

Fix: try_insert_entry

try_insert_entry:

                if sender_id != core.id && snapshot_index > 1 {
                    let res = core.fetch_snapshot(snapshot_index, sender_id.clone()).await;
                    if res.is_err() {
                        log::error!(
                            "could not fetch app snapshot (idx={}) from sender {}",
                            snapshot_index,
                            sender_id
                        );
                        return Ok(TryInsertResult::Rejected);
                    }
                }

I think this code is wrong because returning Rejected here leads to decrement the replication pointer on the leader side, which is not what we want. Instead, return Err here so the replication on the current point will retry.

Change Id format

Raft node can be identified by its address because it is unique. This is why lol defines Id = Address.

My idea is change this to Id = (Label, Address). This change will make it easy to

  1. Read the log: If address is used as Id in the log, it is difficult to read. If it is shorter like "ND1", it will become easy to identify.
  2. Match different Ids: You may make a Raft cluster on k8s. How to match Pod's label and lol's node? If we use the same label in both layers, matching becomes easy.

Also, as the label is shorter than Address it is much efficient to compare two labels instead of addresses. This will be a bit of optimization.

Immediate self-vote after init_cluster

In init_cluster, the initial log is made but the server is waiting for election timeout to self-vote. This behavior is ok when the election timeout is short but no ok when it's long.

Sending TimeoutNow API to itself will trigger election.

Persistent States

In theory,

  • recent vote (id, term)
  • log (including snapshot)

should be persistent before responding to any node communications involving the data. Otherwise, cluster can't survive catastrophic failure mixing network split and node crashes, for example. Also, we can't shut down entire cluster if we hold everything in memory.

To address this, these data should be abstracted so both in-memory and persisted log are able to perform fast.

Release 0.7

Version 0.7 will include at least:

  1. Dependency updates: Tonic 0.4, Tokio 1.0, Bytes 1.0
  2. Introduction of pre-vote phase
  3. Change the internal serializer

Arbitrary streamable type for snapshot

In the current version, the snapshot is only Vec<u8> embedded in an entry called Snapshot. With this design, snapshot can be implemented as a normal entry and the implementation is simplified.

However, this design choice causes two problems

  1. The buffer is passed to RaftApp and RaftApp needs to deserialize it to use and serialize to return it to the core.
  2. The snapshot is limited by system memory.

If we solve these two problems, lol will improve performance and versatility dramatically.

My idea is

  1. Don't embed the Vec<u8> in the snapshot entry and the buffer part of the entry is empty.
  2. When follower receives the entry, follower request the leader the buffer part as stream (in try_insert_entry). The follower acks to the replication rpc iff the stream is complete and save it to the data structure later explained.
  3. The type is statically given by RaftApp as associated type.
  4. The streamable type is maintained outside of log by the key (term, index). The data structure is like (term, index) -> S

Remove type parameter A from RaftCore

As of 0.7.1, RaftCore has A: RaftApp as the type parameter but we don't use this. The reason I added this is we can define an associated type A::Snapshot so the App implementer can give the type statically. However, with the abstraction snapshot tag this plan won't be done.

pub struct RaftCore<A: RaftApp>

Another drawback of this design aside being needless will become clear when we implement in-library multi-raft (#131). In there we could have multiple RaftCores that may have apps of different types. With the type parameter, we can't implement this.

Adaptive election timeout

This is my idea to share in prior to implementation.

Motivation

Fixed heartbeat interval and election timeout usually works well with on-premises servers but not with cloud servers where RTT isn't guaranteed to be stable. Also, let's think about such cluster: one server is in Tokyo, one server is in China and the last one is in US. The latency between Tokyo and China is smaller than between US, this makes it difficult to fix a certain value.

Algorithm

Followers decides the next election timeout based on the time between previous two heartbeats. It is obvious that between Tokyo and China will be smaller value so the election timeout may happen in nearer future (in case leader is in Tokyo).

For example, if server in China receives two heartbeats in 100ms then the next election timeout will happen in 10 x 100ms later. I don't know what the calculation should be.

Cache connectivity

Endpoint doesn't cache connectivity using Keep-Alive (TCP or HTTP/2) because firstly I don't want to trust the keep-alive mechanism (I think it's 100% trust-worthy) and secondly it will be less meaningful when HTTP/3 comes to hand as it reduces connection overhead. However, as we need to go along with HTTP/2 for a while this means something.

Optimize commit_index calculation

Calculating commit_index is done by leader and later propagated to followers. Simply saying, the algorithm is list the replicated index of all the servers and find the center value.

Let n be the number of servers in the cluster. Naively, the computation is the order of O(n logn) and it is trivial when n is small. It can be O(n) optimally but it will be practically slower because the coefficients are larger as the code is complicated.

In the worst case, this calculation is done n time to increment one commit_index. So in total we consume O(n^2 logn) calculation per one log entry. If n is big (say 1000) this will be non-trivial. (I don't know someone to manage n=1000 Raft cluster)

If we have bitmask where bit 1 is "replication index is updated since last commit_index update", n is replaced with popcnt which is interpreted as CPU instruction because if number of 1s in the bitmask is smaller than the majority number, the computation can be skipped.

Disadvantage: N will be limited to at most 64 if we are adhere to call popcnt only once. This is practically okay I believe.

Scheduling fold snapshotting from RaftApp

As of 0.5.0, snapshotting has two paths: one is returning snapshot from RaftApp and the other is periodical compaction (calling fold_snapshot). The problem is RaftApp can't trigger fold snapshotting by itself.

This suggestion changes the return type of apply_message to enum Snapshotting which is None or CopySnapshot(SnapshotTag) or FoldSnapshot: If FoldSnapshot is returned RaftCore starts compaction at the apply_index.

enum Snapshotting {
  None,
  CopySnapshot(SnapshotTag),
  FoldSnapshot
}

Make server APIs protocol agnostic

Suppose we build a REST server that use Raft consensus algorithm to share the state. As of 0.5.0, it is only possible by building an REST server that proxies http requests to lol server that is listening to gRPC requests.

While RPCs like append-entries and request-votes are inter-node and they needs optimization like gRPC streaming, request-apply and others are for simple client-interactions and the latter can be protocol agnostic so we can statically link other protocol layer on top of the APIs.

Doc as mdBook

Use Rust! We have doc in Wiki but Rust software should publish doc in mdBook written in Rust.

Use UDP in heartbeat

In v0.6.0, the heartbeat is sent over gRPC which is HTTP/2 over TCP.

We don't need a connection to send heartbeat. Even the response is not necessary. It is just leader sending heartbeat to followers periodically (like 100ms).

If we can send the heartbeat over UDP, there is no connection and it is ideal.

Separate the core of lol-test

The core of the lol-test is emulating server by a process and management of the ports in localhost.

The functionality is versatile and it may be worth getting out of it.

Two ways of making snapshot

RaftApp currently requires fold_snapshot to be implemented. This function is, as the name implies, compacts the log by folding the messages. Let's call this "compaction snapshot". In this way of snapshotting, the baseline is identity function (just append the messages to the old snapshot) but app implementor can choose more complicated function that reruns the copy of the state machine from the old snapshot.

Another type of snapshotting is even apparent and widely accepted. The state machine copies the state and use it as a new snapshot. Let's call this type of snapshotting "copy snapshot".

There pros/cons between these two methods. Of course, copy snapshot is cheaper because there is no re-computation involved however the copy is typically made between two apply_messages and this may blocks later apply_message a bit while. On the contrary, compaction snapshot can run completely independently from applying in sacrifice of re-computation cost.

I have a great idea to support these two methods at the same time in a single library.

Part ways with message-pack

lol 0.6.2 uses message pack. The reason is zero-copy decoding. When the backing memory lives long enough, it is obvious that the Bin field of decoded object can be just referencing to it.

MessagePack is nicely designed serialization framework however, my concern is firstly messgepack-rust is not going to be maintained actively (3Hren/msgpack-rust#255) and secondly mixing two frameworks isn't straight forward.

Regarding the zero-copy decoding, prost 0.7 started to support zero-copy decoding:

@rolftimmermans added support for generating Rust bytes::Bytes fields from protobuf bytes fields. When deserializing from a Bytes instance, this enables zero-copy deserialization for bytes fields!

https://github.com/danburkert/prost/releases/tag/v0.7.0

We can use this instead of message-pack and since Bytes is reference-counted this way is more convenient than referencing because we are free from lifetime stuffs.

Related: hyperium/tonic#517

Use term "bootstrap"

To add a new node, there should be an existing cluster to accept the joining. But how about the first node?

The first node is taken as a special node that forms a single node cluster with only itself.

Other softwares like elasticsearch and Consul seem to do the same thing and both call it bootstrapping.

lol should use the same term.

Run benchmark in one process to use performance analyzer

In 0.7.0, the benchmark tests uses the same infrastructure with the logic tests: The kvs servers are launches in form of process. I do this way to emulate server down by process STOP/CONT.

However, this makes it hard to use performance analyzers (like cargo flamegraph) because they usually assume the running binary is only one.

If kvs servers and client are running in one Tokio system, we can make a deeper analysis on the performance.

I am working on this repository: https://github.com/akiradeveloper/lol-perf

Efficient log replication

As of 0.5.0, the log replication is suboptimal.

It uses gRPC's streaming to inter-node communication but internally, temporarily gather the entries into buffer on heap. This is the same for both sender side and receiver side.

struct AppendEntryBuffer {
    sender_id: String,
    prev_log_term: Term,
    prev_log_index: Index,
    entries: Vec<AppendEntryElem>,
}

With persistent backend, this results in allocating the whole entries on the buffer and this makes it vulnerable to OOM when the replication is gigantic (say sending 10000 entries at once and the each command in entry is 1MB. this consumes 10GB on heap. amazing!)

We can remove the temporary buffer: To send a log region [l,r) we can first make a stream internal from this region and mapping the stream or inter-node communication and vice versa.

One of the implementation is firstly make this such a struct and chain the header part and the mapped stream.

struct AppendEntryStream {
    sender_id: String,
    prev_log_term: Term,
    prev_log_index: Index,
    entries: Stream<AppendEntryElem>,
}

We don't need DNS resolve on the client side

In lol-admin for instance, we resolve the socket addr (here as id) before sending a request

        Sub::AddServer { id } => {
            let id = lol_core::connection::resolve(&id).unwrap();

but this is not necessary because hyper is doing this.

In tonic, Endpoint's connect calls HttpConnector::new

    pub async fn connect(&self) -> Result<Channel, Error> {
        let mut http = hyper::client::connect::HttpConnector::new();

And the connector uses GaiResolver

impl HttpConnector {
    /// Construct a new HttpConnector.
    pub fn new() -> HttpConnector {
        HttpConnector::new_with_resolver(GaiResolver::new())
    }
}

Here GAI means getaddrinfo which is thread-pooled

A resolver using blocking getaddrinfo calls in a threadpool.

https://docs.rs/hyper/0.13.9/hyper/client/connect/dns/struct.GaiResolver.html

    fn call(&mut self, name: Name) -> Self::Future {
        let blocking = tokio::task::spawn_blocking(move || {
            debug!("resolving host={:?}", name.host);
            (&*name.host, 0)
                .to_socket_addrs()
                .map(|i| IpAddrs { iter: i })
        });

and it is actually doing the same thing as our resolve function.

pub fn resolve(id: &str) -> Option<String> {
    use std::net::ToSocketAddrs;
    let res = id.to_socket_addrs();

Support Mutual TLS

To support mTLS, RaftCore should have client cert for the client authentication. Add client_tls to Config and use the tls config in every node communications.

Related:

Separate gateway as independent crate

gateway doesn't necessarily have to use the core_message to get the current cluster. Rather it can defines its own Proto file that the server should implement.

This change makes it gateway more versatile and easy to reuse.

Tiered Log

In-memory log consumes lot of memory. New entries has a chance of read again when they are applied or replicated to other nodes but older entries are just waiting for being compacted. As most of the newer entries are consumed very quickly, the log is usually filled with useless entries.

Tiered log can address this problem by putting old entries in disk and newers in memory.

Remove InitCluster command

It is better to manage cluster membership by only two command add/remove. Treating the initialization specially is hard to understand for users and also make it hard to implement a software to automatically build a cluster.

We can remove InitCluster in this way: If a server that doesn't belong to any membership receive a AddServer then it interprets it as InitCluster to itself.

Use Bytes as message in the log

Messages in the log are intrinsically immutable but they are deep-copied when they are passed to RaftApp. This is very sub-optimal because reference counted shallow-copy is enough.

If we use Bytes instead of raw Vec<u8> cloning will be shallow.

Add connect_timeout to EndpointConfig

Now EndpointConfig is allowed to set timeout but it is about entire request, not about connection. In some request what is needed is connection timeout rather than request timeout.

The current tonic doesn't support this feature but it is technically possible and would be available when hyperium/tonic#498 is accomplished.

Implement Tune message

RaftCore is initialized with TunableConfig and these settings should be changeable runtime but as of 0.5.0 the message isn't implemented.

Define Snapshot as an associated type of RaftApp

Snapshot is defined as Vec<u8>. This means everytime it is passed to RaftApp, deserialization is needed and serialization is needed in fold_snapshot to return new snapshot. This is extravagant. Especially when snapshot is large like 100MB~, the cost becomes non-negligible with shorter compaction interval.

If we define RaftApp::Snapshot as an associated type we could avoid these overheads. The type should be convertible between Vec<u8> and cloneable.

Stress test

To make sure that lol will not slow down or memory leak in long run, stress should be done.

Support HTTPS

Now, lol uses ip:socket as the identity and the scheme is fixed to http. This restriction doesn't allow servers to be placed AWS or some other cloud infrastructure.

By setting ServerTlsConfig and ClientTlsConfig, tonic server can become secure.

Explicit lightweight clone

lol uses Arc everywhere to share an object in low cost. Bytes is also reference counted and lol uses it to zero copy buffers. Some other classes are cheap to clone too.

This is documented but should be clear by code.

The idea is like discussed here

rust-lang/rfcs#2588 (comment)

We define FastClone trait for these classes to make it clear the cloning is cheap.

Generalize failure detection algorithm

We use adaptive failure detection to detect leader failure. In 0.6.2, we use basic phi accrual algorithm but there are improved algorithms from research papers. It would be nice idea to generalize failure detection so users can choose the best.

Faster transport for in-process communications

In certain application, sending requests from RaftApp to RaftCore happens. For example, if it's a distributed storage application, it may want to send new writes to rebuild a missing part of the data.

In the current lol 0.7.0, it is possible to do this by sending requests through gRPC as usual client but it is not efficient as it goes through network stack.

But ideally, transport can be a simple channel.

My idea here is to expose such server from RaftCore as well as tonic::Server.

My first choice is tarpc which is a non-official Google product but well-maintained. It seems to be transport-agnostic

Pluggable transport: any type impling Stream<Item = Request> + Sink can be used as a transport to connect the client and server.

and it supports both channel and tcp.

Sync constraint in SnapshotStream should be removed

pub type SnapshotStream = std::pin::Pin<Box<dyn futures::stream::Stream<Item = anyhow::Result<Bytes>> + Send + Sync>>

SnapshotStream is a public API and it has Sync constraint. This should be removed because adding Sync to stream means nothing but only harmful for implementators.

We could remove this from public API and give it again internally with sync_wrapper.

Support in-library Multi-Raft

Multi-Raft is a data-sharding in Raft where the key space is split into N groups and each group has individual Raft log. This technique is used in KV-store like TiKV.

Multi-Raft in TiKV

The current lol 0.6.2 can achieve multi-raft by use socket port as the group ID however this isn't efficient in the following reasons:

  1. Wasting heartbeat: A node can be a leader for group A and B. In this case, we can only send heartbeats in only one group. In other words, the two groups can share the liveness of nodes.
  2. State machines can't share the same data like configuration.

Also, keeping connections to 1000 nodes in cluster may let the system be unstable.

Since the implementation will be very hard and there is no demand for this feature at the moment the priority isn't high but at least worth to be noted.

Emit membership changes from RaftCore

Membership change is hidden inside RaftCore and not visiable from RaftApp or anywhere outside the RaftCore.

It is possible to get the current membership by polling the membership but some software may want the changes. For example, some RaftApp may only use the dynamic membership change from the functionalities while the other operations are done outside Raft logs.

My idea is to open channel to notify membership changes.

Zero-copy to/from streaming

Prost 0.7 starts to support zero-copy decoding based on reference-counting bytes::Bytes. This means we can share the same memory region from RaftStorage to inter-node streaming vice versa! (at least I hope so)

Related: hyperium/tonic#517

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.