FC-Redirectβs control plane needs distributed consensus to coordinate flow policies across multiple nodes. For two years, we used a homegrown consensus protocol. It worked, but it was complex, difficult to reason about, and had subtle bugs. This year, I reimplemented it using Raft, a consensus algorithm designed to be understandable. The experience was enlightening.
Why Consensus Matters
In a distributed system, nodes must agree on shared state:
Three-node cluster needs to agree on flow policies:
Node 1: Flow X should redirect to Port 5
Node 2: Flow X should redirect to Port 5 β All agree
Node 3: Flow X should redirect to Port 5
Without consensus:
Node 1: Flow X β Port 5
Node 2: Flow X β Port 7 β Disagreement!
Node 3: Flow X β Port 5
Result: Inconsistent behavior, split-brain
Consensus ensures all nodes agree, even in the presence of failures.
Why Raft?
I evaluated several consensus algorithms:
Paxos:
- Pros: Proven correct, widely used
- Cons: Notoriously difficult to understand and implement correctly
ZooKeeperβs ZAB:
- Pros: Production-proven, good tooling
- Cons: Requires separate ZooKeeper cluster
Raft:
- Pros: Designed for understandability, great paper
- Cons: Relatively new (2014)
Raftβs understandability won. If I canβt understand the algorithm, I canβt implement it correctly.
Raft Basics
Raft breaks consensus into three subproblems:
- Leader Election: One node is elected leader
- Log Replication: Leader replicates log entries to followers
- Safety: Committed entries are never lost
State Machine
Each node is a state machine:
βββββββββββ
βFollower β β All nodes start as followers
ββββββ¬βββββ
β Election timeout
βΌ
βββββββββββ
βCandidateβ β Request votes from other nodes
ββββββ¬βββββ
β Receive majority votes
βΌ
βββββββββββ
β Leader β β Send heartbeats to maintain leadership
βββββββββββ
Terms
Time is divided into terms:
Term 1 Term 2 Term 3 Term 4
|---------|---------|---------|---------|
Leader Leader No Leader Leader
A B (election) B
Each term has at most one leader
Implementation
Node State
typedef enum {
RAFT_STATE_FOLLOWER,
RAFT_STATE_CANDIDATE,
RAFT_STATE_LEADER
} raft_state_t;
typedef struct raft_node {
// Persistent state (survives crashes)
uint64_t current_term;
node_id_t voted_for;
log_entry_t *log;
uint64_t log_size;
// Volatile state
raft_state_t state;
uint64_t commit_index;
uint64_t last_applied;
// Leader state
uint64_t *next_index; // Per-follower next log index
uint64_t *match_index; // Per-follower highest replicated index
// Configuration
node_id_t *peers;
int num_peers;
int election_timeout_ms;
int heartbeat_interval_ms;
// Callbacks
apply_fn apply_callback;
} raft_node_t;
Leader Election
When a follower times out, it becomes a candidate:
void start_election(raft_node_t *node) {
// Increment term
node->current_term++;
node->state = RAFT_STATE_CANDIDATE;
node->voted_for = node->self_id;
// Reset election timeout
reset_election_timeout(node);
int votes_received = 1; // Vote for self
// Request votes from all peers
for (int i = 0; i < node->num_peers; i++) {
vote_request_t req = {
.term = node->current_term,
.candidate_id = node->self_id,
.last_log_index = node->log_size - 1,
.last_log_term = node->log[node->log_size - 1].term
};
vote_response_t resp = send_vote_request(&node->peers[i], &req);
if (resp.vote_granted) {
votes_received++;
// Majority?
if (votes_received > node->num_peers / 2) {
become_leader(node);
return;
}
} else if (resp.term > node->current_term) {
// Discovered higher term, step down
node->current_term = resp.term;
node->state = RAFT_STATE_FOLLOWER;
node->voted_for = -1;
return;
}
}
}
void become_leader(raft_node_t *node) {
node->state = RAFT_STATE_LEADER;
// Initialize leader state
for (int i = 0; i < node->num_peers; i++) {
node->next_index[i] = node->log_size;
node->match_index[i] = 0;
}
// Send initial heartbeat
send_heartbeats(node);
}
Handling Vote Requests
vote_response_t handle_vote_request(raft_node_t *node, vote_request_t *req) {
vote_response_t resp = {
.term = node->current_term,
.vote_granted = false
};
// Reject if request term is old
if (req->term < node->current_term) {
return resp;
}
// Update term if request term is newer
if (req->term > node->current_term) {
node->current_term = req->term;
node->state = RAFT_STATE_FOLLOWER;
node->voted_for = -1;
}
// Grant vote if:
// 1. Haven't voted yet OR already voted for this candidate
// 2. Candidate's log is at least as up-to-date as ours
if ((node->voted_for == -1 || node->voted_for == req->candidate_id) &&
is_log_up_to_date(node, req)) {
node->voted_for = req->candidate_id;
reset_election_timeout(node);
resp.vote_granted = true;
}
return resp;
}
bool is_log_up_to_date(raft_node_t *node, vote_request_t *req) {
uint64_t last_log_index = node->log_size - 1;
uint64_t last_log_term = node->log[last_log_index].term;
// Compare terms first
if (req->last_log_term != last_log_term) {
return req->last_log_term > last_log_term;
}
// If terms equal, compare indices
return req->last_log_index >= last_log_index;
}
Log Replication
Leader replicates log entries:
void replicate_log(raft_node_t *node, log_entry_t *entry) {
assert(node->state == RAFT_STATE_LEADER);
// Append to local log
node->log[node->log_size++] = *entry;
node->log[node->log_size - 1].term = node->current_term;
// Replicate to followers
for (int i = 0; i < node->num_peers; i++) {
send_append_entries(node, &node->peers[i]);
}
}
void send_append_entries(raft_node_t *node, node_id_t *peer) {
uint64_t next_idx = node->next_index[peer->index];
append_entries_request_t req = {
.term = node->current_term,
.leader_id = node->self_id,
.prev_log_index = next_idx - 1,
.prev_log_term = node->log[next_idx - 1].term,
.entries = &node->log[next_idx],
.num_entries = node->log_size - next_idx,
.leader_commit = node->commit_index
};
append_entries_response_t resp = send_append_entries_request(peer, &req);
if (resp.success) {
// Update indices
node->next_index[peer->index] = next_idx + req.num_entries;
node->match_index[peer->index] = next_idx + req.num_entries - 1;
// Check if we can advance commit index
update_commit_index(node);
} else {
// Follower's log inconsistent, decrement nextIndex and retry
if (node->next_index[peer->index] > 0) {
node->next_index[peer->index]--;
}
send_append_entries(node, peer);
}
}
Handling Append Entries
append_entries_response_t handle_append_entries(raft_node_t *node,
append_entries_request_t *req) {
append_entries_response_t resp = {
.term = node->current_term,
.success = false
};
// Reject if request term is old
if (req->term < node->current_term) {
return resp;
}
// Reset election timeout (received heartbeat from leader)
reset_election_timeout(node);
// Update term and step down if necessary
if (req->term > node->current_term) {
node->current_term = req->term;
node->state = RAFT_STATE_FOLLOWER;
node->voted_for = -1;
}
// Check log consistency
if (req->prev_log_index >= node->log_size ||
node->log[req->prev_log_index].term != req->prev_log_term) {
// Log inconsistent
return resp;
}
// Append new entries
uint64_t idx = req->prev_log_index + 1;
for (int i = 0; i < req->num_entries; i++) {
if (idx < node->log_size) {
// Overwrite conflicting entry
if (node->log[idx].term != req->entries[i].term) {
// Delete this entry and all following
node->log_size = idx;
node->log[idx] = req->entries[i];
node->log_size++;
}
} else {
// Append new entry
node->log[node->log_size++] = req->entries[i];
}
idx++;
}
// Update commit index
if (req->leader_commit > node->commit_index) {
node->commit_index = min(req->leader_commit, node->log_size - 1);
apply_committed_entries(node);
}
resp.success = true;
return resp;
}
Applying Committed Entries
void apply_committed_entries(raft_node_t *node) {
while (node->last_applied < node->commit_index) {
node->last_applied++;
log_entry_t *entry = &node->log[node->last_applied];
// Apply to state machine
node->apply_callback(entry->command, entry->command_size);
}
}
Integration with FC-Redirect
FC-Redirect uses Raft for flow policy consensus:
// State machine: Flow policy database
typedef struct flow_policy_db {
flow_policy_t *policies;
int num_policies;
} flow_policy_db_t;
// Apply function: Add/update/delete policy
void apply_policy_command(const void *command, size_t size) {
policy_command_t *cmd = (policy_command_t*)command;
switch (cmd->type) {
case POLICY_ADD:
add_policy(&policy_db, &cmd->policy);
break;
case POLICY_UPDATE:
update_policy(&policy_db, cmd->policy_id, &cmd->policy);
break;
case POLICY_DELETE:
delete_policy(&policy_db, cmd->policy_id);
break;
}
}
// Client API: Create policy
policy_id_t create_flow_policy(flow_policy_t *policy) {
// Only leader can handle writes
if (raft_node.state != RAFT_STATE_LEADER) {
// Redirect to leader
return forward_to_leader(policy);
}
// Create command
policy_command_t cmd = {
.type = POLICY_ADD,
.policy = *policy
};
// Append to Raft log
log_entry_t entry = {
.command = &cmd,
.command_size = sizeof(cmd)
};
replicate_log(&raft_node, &entry);
// Wait for commit
while (entry.index > raft_node.commit_index) {
usleep(1000); // 1ms
}
// Policy committed, return ID
return cmd.policy.id;
}
Persistence
Raft state must survive crashes:
void persist_raft_state(raft_node_t *node) {
// Write to disk
int fd = open(RAFT_STATE_FILE, O_WRONLY | O_CREAT, 0644);
// Write term
write(fd, &node->current_term, sizeof(node->current_term));
// Write voted_for
write(fd, &node->voted_for, sizeof(node->voted_for));
// Write log
write(fd, &node->log_size, sizeof(node->log_size));
write(fd, node->log, node->log_size * sizeof(log_entry_t));
// Ensure durability
fsync(fd);
close(fd);
}
void restore_raft_state(raft_node_t *node) {
int fd = open(RAFT_STATE_FILE, O_RDONLY);
if (fd < 0) {
// No saved state, start fresh
return;
}
// Read term
read(fd, &node->current_term, sizeof(node->current_term));
// Read voted_for
read(fd, &node->voted_for, sizeof(node->voted_for));
// Read log
read(fd, &node->log_size, sizeof(node->log_size));
node->log = malloc(node->log_size * sizeof(log_entry_t));
read(fd, node->log, node->log_size * sizeof(log_entry_t));
close(fd);
}
Log Compaction (Snapshots)
Logs grow unbounded. Snapshots compact them:
void create_snapshot(raft_node_t *node) {
// Capture state machine state
snapshot_t snapshot = {
.last_included_index = node->last_applied,
.last_included_term = node->log[node->last_applied].term,
.data = serialize_state_machine(&policy_db),
.data_size = get_state_machine_size(&policy_db)
};
// Write snapshot to disk
write_snapshot(&snapshot);
// Discard log entries up to last_applied
uint64_t new_log_size = node->log_size - node->last_applied;
memmove(node->log, &node->log[node->last_applied],
new_log_size * sizeof(log_entry_t));
node->log_size = new_log_size;
// Persist
persist_raft_state(node);
}
void restore_from_snapshot(raft_node_t *node) {
snapshot_t snapshot;
read_snapshot(&snapshot);
// Restore state machine
deserialize_state_machine(&policy_db, snapshot.data, snapshot.data_size);
// Update Raft state
node->last_applied = snapshot.last_included_index;
node->commit_index = snapshot.last_included_index;
free(snapshot.data);
}
Testing Raft
Raft is complex. Testing is critical:
// Partition network to test split-brain handling
void test_network_partition() {
// Start 5-node cluster
raft_node_t nodes[5];
start_cluster(nodes, 5);
// Elect leader
wait_for_leader_election(nodes, 5);
raft_node_t *leader = find_leader(nodes, 5);
// Partition: {Leader, Node1} vs {Node2, Node3, Node4}
partition_network(nodes, (int[]){0, 1}, 2, (int[]){2, 3, 4}, 3);
// Minority partition should elect new leader
sleep(2 * ELECTION_TIMEOUT_MS / 1000);
raft_node_t *new_leader = find_leader(&nodes[2], 3);
assert(new_leader != NULL);
assert(new_leader != leader); // Different leader
// Old leader should step down
assert(leader->state == RAFT_STATE_FOLLOWER);
// Heal partition
heal_network(nodes, 5);
// System should converge
sleep(1);
assert(count_leaders(nodes, 5) == 1);
}
// Test log replication
void test_log_replication() {
raft_node_t nodes[3];
start_cluster(nodes, 3);
raft_node_t *leader = find_leader(nodes, 3);
// Submit command
log_entry_t entry = {.command = "test", .command_size = 5};
replicate_log(leader, &entry);
// Wait for replication
sleep(1);
// Verify all nodes have entry
for (int i = 0; i < 3; i++) {
assert(nodes[i].log_size > 0);
assert(memcmp(nodes[i].log[0].command, "test", 5) == 0);
}
}
Results
Raft implementation delivered:
Correctness:
- 0 consensus bugs in 6 months (vs 5 bugs in homegrown protocol)
- Passed 10,000 iterations of chaos tests
- Survived all partition scenarios
Performance:
- Commit latency: 2ms (3-node cluster)
- Throughput: 5000 ops/sec
- Leader election: <500ms
Simplicity:
- Code size: 2000 lines (vs 5000 for homegrown)
- Understandable by team members
- Easier to debug
Lessons Learned
Implementing Raft taught me:
-
Understandability matters: Raftβs clarity made implementation faster and more correct.
-
Persist everything: Any state that affects consensus must survive crashes.
-
Test network failures: Partitions, delays, and message loss expose subtle bugs.
-
Leader election is tricky: Randomized timeouts are crucial for liveness.
-
Log compaction is essential: Without snapshots, logs grow unbounded.
Conclusion
Distributed consensus is hard, but Raft makes it manageable. The algorithm is well-specified, and implementations exist in many languages.
For FC-Redirect, Raft provides:
- Strong consistency across nodes
- Automatic leader election
- Fault tolerance (handles minority node failures)
- Understandable behavior
If you need distributed consensus, use Raft. Donβt build your own protocol unless you have a PhD in distributed systems and months to spare.
Raft: consensus for mere mortals.