It is claimed that "I/O corruptions are allowed across all replicas but only for a minority per op". The problem in this case is quorum_replication_max
, which allows corruptions of a minority of replicas to lose data.
Below we have a cluster of 9 replicas with one client. I/O operations are not failing across a majority of replicas, only 0, 3 and 6. With quorum_replication_max
set to 3, this may lead to data loss of op 6, as shown below.
diff --git a/src/config.zig b/src/config.zig
index f4c3781..32ecc92 100644
--- a/src/config.zig
+++ b/src/config.zig
@@ -6,14 +6,14 @@ pub const log_level = 6;
/// The maximum number of replicas allowed in a cluster.
/// This has been limited to 5 just to decrease the amount of memory required by the VOPR simulator.
-pub const replicas_max = 5;
+pub const replicas_max = 9;
/// The maximum number of clients allowed per cluster, where each client has a unique 128-bit ID.
/// This impacts the amount of memory allocated at initialization by the server.
/// This determines the size of the VR client table used to cache replies to clients by client ID.
/// Each client has one entry in the VR client table to store the latest `message_size_max` reply.
/// This has been limited to 3 just to decrease the amount of memory required by the VOPR simulator.
-pub const clients_max = 3;
+pub const clients_max = 1;
/// The minimum number of nodes required to form a quorum for replication:
/// Majority quorums are only required across view change and replication phases (not within).
diff --git a/src/simulator.zig b/src/simulator.zig
index f89a899..5c6eae2 100644
--- a/src/simulator.zig
+++ b/src/simulator.zig
@@ -10,6 +10,7 @@ const Header = @import("vsr.zig").Header;
const Replica = @import("test/cluster.zig").Replica;
const StateChecker = @import("test/state_checker.zig").StateChecker;
const StateMachine = @import("test/cluster.zig").StateMachine;
+const PartitionMode = @import("test/packet_simulator.zig").PartitionMode;
/// The `log` namespace in this root file is required to implement our custom `log` function.
const output = std.log.scoped(.state_checker);
@@ -57,14 +58,14 @@ pub fn main() !void {
var prng = std.rand.DefaultPrng.init(seed);
const random = &prng.random;
- const replica_count = 1 + prng.random.uintLessThan(u8, config.replicas_max);
- const client_count = 1 + prng.random.uintLessThan(u8, config.clients_max);
+ const replica_count = config.replicas_max; //1 + prng.random.uintLessThan(u8, config.replicas_max);
+ const client_count = config.clients_max; //1 + prng.random.uintLessThan(u8, config.clients_max);
const node_count = replica_count + client_count;
const ticks_max = 100_000_000;
const transitions_max = config.journal_size_max / config.message_size_max;
- const request_probability = 1 + prng.random.uintLessThan(u8, 99);
- const idle_on_probability = prng.random.uintLessThan(u8, 20);
+ const request_probability = 1; //1 + prng.random.uintLessThan(u8, 99);
+ const idle_on_probability = 0; //prng.random.uintLessThan(u8, 20);
const idle_off_probability = 10 + prng.random.uintLessThan(u8, 10);
cluster = try Cluster.create(allocator, &prng.random, .{
@@ -74,25 +75,36 @@ pub fn main() !void {
.seed = prng.random.int(u64),
.network_options = .{
.packet_simulator_options = .{
+ .replica_count = replica_count,
+ .client_count = client_count,
.node_count = node_count,
.seed = prng.random.int(u64),
- .one_way_delay_mean = 3 + prng.random.uintLessThan(u16, 10),
- .one_way_delay_min = prng.random.uintLessThan(u16, 3),
- .packet_loss_probability = prng.random.uintLessThan(u8, 30),
- .path_maximum_capacity = 20 + prng.random.uintLessThan(u8, 20),
+
+ .one_way_delay_mean = 5, //3 + prng.random.uintLessThan(u16, 100),
+ .one_way_delay_min = 1, //prng.random.uintLessThan(u16, 3),
+ .packet_loss_probability = 0, //prng.random.uintLessThan(u8, 30),
+ .path_maximum_capacity = 250, //20 + prng.random.uintLessThan(u8, 20),
+
+ .partition_mode = .fixed, //random_enum(PartitionMode, random),
+ .partition_probability = 100, //prng.random.uintLessThan(u8, 3),
+ .unpartition_probability = 100, //1 + prng.random.uintLessThan(u8, 10),
+ .partition_stability = 10_000, //100 + prng.random.uintLessThan(u32, 100),
+ .unpartition_stability = 800, //prng.random.uintLessThan(u32, 20),
+
.path_clog_duration_mean = prng.random.uintLessThan(u16, 500),
- .path_clog_probability = prng.random.uintLessThan(u8, 2),
- .packet_replay_probability = prng.random.uintLessThan(u8, 50),
+ .path_clog_probability = 0, //prng.random.uintLessThan(u8, 2),
+ .packet_replay_probability = 0, //prng.random.uintLessThan(u8, 50),
+ .packet_misdeliver_probability = 0, //prng.random.uintLessThan(u8, 10),
},
},
.storage_options = .{
.seed = prng.random.int(u64),
- .read_latency_min = prng.random.uintLessThan(u16, 3),
- .read_latency_mean = 3 + prng.random.uintLessThan(u16, 10),
- .write_latency_min = prng.random.uintLessThan(u16, 3),
- .write_latency_mean = 3 + prng.random.uintLessThan(u16, 10),
- .read_fault_probability = prng.random.uintLessThan(u8, 10),
- .write_fault_probability = prng.random.uintLessThan(u8, 10),
+ .read_latency_min = 0, //prng.random.uintLessThan(u16, 3),
+ .read_latency_mean = 0, //3 + prng.random.uintLessThan(u16, 10),
+ .write_latency_min = 0, //prng.random.uintLessThan(u16, 3),
+ .write_latency_mean = 0, //3 + prng.random.uintLessThan(u16, 10),
+ .read_fault_probability = 0, //prng.random.uintLessThan(u8, 10),
+ .write_fault_probability = 80, //prng.random.uintLessThan(u8, 10),
},
});
defer cluster.destroy();
@@ -121,6 +133,12 @@ pub fn main() !void {
\\ path_clog_duration_mean={} ticks
\\ path_clog_probability={}%
\\ packet_replay_probability={}%
+ \\ packet_misdeliver_probability={}%
+ \\ partition_mode = {},
+ \\ partition_probability = {}%,
+ \\ unpartition_probability = {}%,
+ \\ partition_stability = {} ticks,
+ \\ unpartition_stability = {} ticks,
\\ read_latency_min={}
\\ read_latency_mean={}
\\ write_latency_min={}
@@ -142,6 +160,14 @@ pub fn main() !void {
cluster.options.network_options.packet_simulator_options.path_clog_duration_mean,
cluster.options.network_options.packet_simulator_options.path_clog_probability,
cluster.options.network_options.packet_simulator_options.packet_replay_probability,
+ cluster.options.network_options.packet_simulator_options.packet_misdeliver_probability,
+
+ cluster.options.network_options.packet_simulator_options.partition_mode,
+ cluster.options.network_options.packet_simulator_options.partition_probability,
+ cluster.options.network_options.packet_simulator_options.unpartition_probability,
+ cluster.options.network_options.packet_simulator_options.partition_stability,
+ cluster.options.network_options.packet_simulator_options.unpartition_stability,
+
cluster.options.storage_options.read_latency_min,
cluster.options.storage_options.read_latency_mean,
cluster.options.storage_options.write_latency_min,
@@ -263,6 +289,12 @@ fn client_callback(
assert(user_data == 0);
}
+fn random_enum(comptime EnumType: type, random: *std.rand.Random) EnumType {
+ const typeInfo = @typeInfo(EnumType).Enum;
+ const enumAsInt = random.uintAtMost(typeInfo.tag_type, typeInfo.fields.len - 1);
+ return @intToEnum(EnumType, enumAsInt);
+}
+
fn parse_seed(bytes: []const u8) u64 {
return std.fmt.parseUnsigned(u64, bytes, 10) catch |err| switch (err) {
error.Overflow => @panic("seed exceeds a 64-bit unsigned integer"),
diff --git a/src/test/packet_simulator.zig b/src/test/packet_simulator.zig
index adb4942..d277f97 100644
--- a/src/test/packet_simulator.zig
+++ b/src/test/packet_simulator.zig
@@ -11,9 +11,19 @@ pub const PacketSimulatorOptions = struct {
packet_loss_probability: u8,
packet_replay_probability: u8,
+ packet_misdeliver_probability: u8,
seed: u64,
+
+ replica_count: u8,
+ client_count: u8,
node_count: u8,
+ partition_mode: PartitionMode,
+ partition_stability: u32,
+ unpartition_stability: u32,
+ partition_probability: u8,
+ unpartition_probability: u8,
+
/// The maximum number of in-flight packets a path can have before packets are randomly dropped.
path_maximum_capacity: u8,
@@ -27,11 +37,19 @@ pub const Path = struct {
target: u8,
};
+pub const PartitionMode = enum {
+ uniform_size,
+ uniform_partition,
+ isolate_single,
+ fixed,
+};
+
/// A fully connected network of nodes used for testing. Simulates the fault model:
/// Packets may be dropped.
/// Packets may be delayed.
/// Packets may be replayed.
pub const PacketStatistics = enum(u8) {
+ dropped_due_to_partition,
dropped_due_to_congestion,
dropped,
replay,
@@ -58,6 +76,11 @@ pub fn PacketSimulator(comptime Packet: type) type {
stats: [@typeInfo(PacketStatistics).Enum.fields.len]u32 = [_]u32{0} **
@typeInfo(PacketStatistics).Enum.fields.len,
+ is_partitioned: bool,
+ partition: []bool,
+ replicas: []u8,
+ stability: u32,
+
pub fn init(allocator: *std.mem.Allocator, options: PacketSimulatorOptions) !Self {
assert(options.one_way_delay_mean >= options.one_way_delay_min);
var self = Self{
@@ -71,8 +94,17 @@ pub fn PacketSimulator(comptime Packet: type) type {
),
.options = options,
.prng = std.rand.DefaultPrng.init(options.seed),
+
+ .is_partitioned = false,
+ .stability = options.unpartition_stability,
+ .partition = try allocator.alloc(bool, @as(usize, options.replica_count)),
+ .replicas = try allocator.alloc(u8, @as(usize, options.replica_count)),
};
+ for (self.replicas) |_, i| {
+ self.replicas[i] = @intCast(u8, i);
+ }
+
for (self.paths) |*queue| {
queue.* = std.PriorityQueue(Data).init(allocator, Self.order_packets);
try queue.ensureCapacity(options.path_maximum_capacity);
@@ -134,6 +166,10 @@ pub fn PacketSimulator(comptime Packet: type) type {
return self.prng.random.uintAtMost(u8, 100) < self.options.packet_replay_probability;
}
+ fn should_misdeliver(self: *Self) bool {
+ return self.prng.random.uintAtMost(u8, 100) < self.options.packet_misdeliver_probability;
+ }
+
/// Return a value produced using an exponential distribution with
/// the minimum and mean specified in self.options
fn one_way_delay(self: *Self) u64 {
@@ -142,14 +178,83 @@ pub fn PacketSimulator(comptime Packet: type) type {
return min + @floatToInt(u64, @intToFloat(f64, mean - min) * self.prng.random.floatExp(f64));
}
+ fn random_choice(self: *Self, probability: u8) bool {
+ return self.prng.random.uintLessThan(u8, 100) < probability;
+ }
+
+ fn partition_network(
+ self: *Self,
+ ) void {
+ self.is_partitioned = true;
+ self.stability = self.options.partition_stability;
+
+ switch (self.options.partition_mode) {
+ .uniform_size => {
+ const sz = self.prng.random.uintAtMost(u8, self.options.replica_count);
+ self.prng.random.shuffle(u8, self.replicas);
+ for (self.replicas) |r, i| {
+ self.partition[r] = i < sz;
+ }
+ },
+ .uniform_partition => {
+ for (self.replicas) |_, i| {
+ self.partition[i] = self.random_choice(50);
+ }
+ },
+ .isolate_single => {
+ for (self.replicas) |_, i| {
+ self.partition[i] = false;
+ }
+ const n = self.prng.random.uintLessThan(u8, self.options.replica_count);
+ self.partition[n] = true;
+ },
+ .fixed => {
+ // XXX: You can make this do anything you want
+ var i: usize = 0;
+ while (i < self.replicas.len) : (i += 3) {
+ self.partition[i] = false;
+ if (i + 1 < self.replicas.len) self.partition[i + 1] = true;
+ if (i + 2 < self.replicas.len) self.partition[i + 2] = true;
+ }
+ },
+ }
+ }
+
+ fn unpartition_network(
+ self: *Self,
+ ) void {
+ self.is_partitioned = false;
+ self.stability = self.options.unpartition_stability;
+
+ for (self.replicas) |_, i| {
+ self.partition[i] = false;
+ }
+ }
+
pub fn tick(self: *Self) void {
self.ticks += 1;
+ if (self.stability > 0) {
+ self.stability -= 1;
+ } else {
+ if (self.is_partitioned) {
+ if (self.random_choice(self.options.unpartition_probability)) {
+ self.unpartition_network();
+ log.alert("unpartitioned network: partition={d}", .{self.partition});
+ }
+ } else {
+ if (self.random_choice(self.options.partition_probability)) {
+ self.partition_network();
+ log.alert("partitioned network: partition={d}", .{self.partition});
+ }
+ }
+ }
+
var from: u8 = 0;
while (from < self.options.node_count) : (from += 1) {
var to: u8 = 0;
while (to < self.options.node_count) : (to += 1) {
- const path = .{ .source = from, .target = to };
+ var path = .{ .source = from, .target = to };
if (self.is_clogged(path)) continue;
const queue = self.path_queue(path);
@@ -157,14 +262,33 @@ pub fn PacketSimulator(comptime Packet: type) type {
if (data.expiry > self.ticks) break;
_ = queue.remove();
+ const is_misdeliver = self.options.node_count > 1 and self.should_misdeliver();
+ var orig_to: u8 = self.options.node_count;
+ if (is_misdeliver) {
+ orig_to = to;
+ to = self.prng.random.uintLessThan(u8, self.options.node_count - 1);
+ if (to >= orig_to) to += 1;
+ assert(to < self.options.node_count and to != orig_to);
+
+ path.target = to;
+ log.debug("misrouting packet from={} orig_to={} new_to={}", .{ from, orig_to, to });
+ }
+
+ if (self.is_partitioned) {
+ if (from < self.options.replica_count and to < self.options.replica_count and self.partition[from] != self.partition[to]) {
+ self.stats[@enumToInt(PacketStatistics.dropped_due_to_partition)] += 1;
+ log.alert("dropped packet (different partitions): from={} to={}", .{ from, to });
+ data.packet.deinit(path);
+ continue;
+ }
+ }
+
if (self.should_drop()) {
self.stats[@enumToInt(PacketStatistics.dropped)] += 1;
log.alert("dropped packet from={} to={}.", .{ from, to });
data.packet.deinit(path);
continue;
- }
-
- if (self.should_replay()) {
+ } else if (self.should_replay()) {
self.submit_packet(data.packet, data.callback, path);
log.debug("replayed packet from={} to={}", .{ from, to });
@@ -176,6 +300,12 @@ pub fn PacketSimulator(comptime Packet: type) type {
data.callback(data.packet, path);
data.packet.deinit(path);
}
+
+ if (is_misdeliver) {
+ assert(orig_to < self.options.node_count);
+ to = orig_to;
+ path.target = to;
+ }
}
const reverse_path: Path = .{ .source = to, .target = from };
The problem here is that replicas do not verify that their writes are successful before committing.
One possiblity would be to require each replica to reproduce its write before committing. This rules out this specific error case, where the write itself corrupts all copies of an operation, however it does not protect against disk corruptions occurring after the write.
I don't think that this is something the protocol can deal with, instead I think that the guarantee has to be reduced from
I found the claim "I/O corruptions are allowed across all replicas but only for a minority per op" to sound too strong given that commits can be done with fewer than the majority of replicas. I came up with the scenario depicted in my drawing first, and then specifically set the system parameters to reproduce it.