openmessaging / dledger Goto Github PK
View Code? Open in Web Editor NEWA raft-based java library for building high-available, high-durable, strong-consistent commitlog.
License: Apache License 2.0
A raft-based java library for building high-available, high-durable, strong-consistent commitlog.
License: Apache License 2.0
Dledger mode does not support SSL/TLS now, do you have any plans in the future about this feature?
this.remotingClient = new NettyRemotingClient(new NettyClientConfig(), null); // default value of useTLS in NettyClientConfig is false
Initially, currTerm
is -1 and lastParseResult
is WAIT_TO_REVOTE.
When there is just one server, after election, the server is leader but the term is still -1, and it looks weird. So I think the the initial value of currTerm
should be 0.
And according to the raft paper, currentTerm is also
initialized to 0 on first boot, increases monotonically
您好,我在QuorumAckChecker#doWork发现一个问题,可能会造成延迟响应客户端的Append Request。
我先标记几个重点步骤:
(a)dLedgerStore.appendAsLeader(dLedgerEntry),leader将entry写到store中。
(b)dLedgerEntryPusher.waitAck(resEntry),leader将AppendFuture放到pendingAppendResponsesByTerm中。
(c)dispatcher线程推送entry给follower。
这三个步骤是并行的。
当以下情况发生时,可能会出现延迟处理append future,步骤如下:
1)假设客户端要写10条entry,由10个RPC线程去处理这10个append request。
2)RPC线程在执行完(a)后,产生的index分别为0~9,(c)将这10条entry同步到了过半的节点,并且更新了peerWaterMarksByTerm;
3)假设部分RPC线程还没执行(b),比如pendingAppendResponsesByTerm中此时只有index为1、3、5、7、9的future。
4)checker线程quorumIndex为9,通过Long i = quorumIndex; i >= 0; i--的方式处理future,当i为8的时候结束循环(因为8还没放进map中),并且记录lastQuorumIndex为9。
5)假设到目前为止,没有新写入。
6)checker线程再次执行doWork时,Long i = quorumIndex; i >= 0; i--,i为9,remove返回null,
由于上一次lastQuorumIndex为9,lastQuorumIndex != quorumIndex不成立,needCheck=false,退出循环。
7)needCheck为false,因此不会立马执行check leak。
8)在接下来的一秒钟以内,会重复第6~7步(即使index 0、2、4、6、8放进了map)。
直到lastCheckLeakTimeMs > 1000。或者有了新的写入(lastQuorumIndex != quorumIndex成立)
这样一来,即使数据同步到过半节点了,0、2、4、6、8的future仍然至少要等1000毫秒后才得到future.complete
个人想法,checker线程能否做的简单点?在doWork中,
ConcurrentMap<Long, TimeoutFuture> responses = pendingAppendResponsesByTerm.get(currTerm);
直接遍历responses,如果index<=quorumIndex,则响应客户端SUCCESS。如果大于并且超时,则响应WAIT_ACK_TIMEOUT。
遍历完之后,如果ackNum为0,就休眠1毫秒,否则继续下一次doWork。
即使每次都遍历整个map,但理论上只要responses中的记录不是很多,应该不会消耗太多CPU。
The variable DLedgerMmapFileStore#isDiskFull
should be decorated with the volatile,
because it will be written by the thread CleanSpaceService
, and it will be read by other threads when append DLedgerEntry
, as shown below:
class CleanSpaceService extends ShutdownAbleThread {
double storeBaseRatio = DLedgerUtils.getDiskPartitionSpaceUsedPercent(dLedgerConfig.getStoreBaseDir());
double dataRatio = DLedgerUtils.getDiskPartitionSpaceUsedPercent(dLedgerConfig.getDataStorePath());
public CleanSpaceService(String name, Logger logger) {
super(name, logger);
}
@Override public void doWork() {
try {
storeBaseRatio = DLedgerUtils.getDiskPartitionSpaceUsedPercent(dLedgerConfig.getStoreBaseDir());
dataRatio = DLedgerUtils.getDiskPartitionSpaceUsedPercent(dLedgerConfig.getDataStorePath());
long hourOfMs = 3600L * 1000L;
long fileReservedTimeMs = dLedgerConfig.getFileReservedHours() * hourOfMs;
if (fileReservedTimeMs < hourOfMs) {
logger.warn("The fileReservedTimeMs={} is smaller than hourOfMs={}", fileReservedTimeMs, hourOfMs);
fileReservedTimeMs = hourOfMs;
}
//If the disk is full, should prevent more data to get in
DLedgerMmapFileStore.this.isDiskFull = isNeedForbiddenWrite();
// ....
}
}
}
Hello. We had the rocketmq cluster installed with master-slave architecture, and then we moved it to new dledger architecture. Brokers are running on small ec2 instances with such java options:
-Xms128m -Xmx128m -Xmn128m -XX:G1HeapRegionSize=1m -XX:MaxDirectMemorySize=256m
When we've used master-slave architecture all worked correctly, but when we moved to the dledger architecture we became to get Out of memory errors. We see many threads that allocate 4mb buffer. Is it the right behavior?
The log file with errors is attached.
hs_err_pid_1.log
For leader transferring, there should be a way to detect metadata from a dledger group.
So add a commond to get metadata.
When RocketMQ uses the DLedger deployment mode, it will rely on DLedger to delete expired commitLog, but this problem apache/rocketmq#3245 also exists in the DLedger library.
As the title says,can we design a set of metrics to monitor and expose the performance and load of Dledger
?
Problem Description
Problem Analysis
Code Analysis
As this code is not donate to ASF, we can not use the License header as ASF project does.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
Please use the ALV2 license header with right copyright information.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Hi, @RongtongJin
In method DLedgerServer#checkPreferredLeader()
, The first peer in the list preferredLeaderIds
that meets the condition is determined to be the leader transferee, as show below:
I think, the peer with the smallest lag(fall behind index) be the transferee
, which may be better.
Currently reading and writing are all on the leader, We test the performance of 4u8g*3 dledger cluster and find out that when the CPU consumption of the leader node reaches 3.1u, the CPU consumption of the follower node is only 0.3u(1/13 cpu utilization)
You are expected to do the following:
hi , do the dledger support to send batch messages ?
getFirstEntryIndex、getLastEntryIndex、getCount、getTotalSize。
These methods can't start with get, Because fastjson will include them in json string.
Or you can use JSONField(serialize = false) to solve.
When the write pressure is high, a large amount of data may be accumulated in the ChannelOutboundBuffer of the same Netty channel, which may cause the heartbeat request to be ranked behind, thereby triggering a new election.
in dledger ,Use a lot of CompletableFutures . But there is a problem,if you use thenAccept ,thenApply . the thread that executes the function is, Thread that executes complete.
在dledger 用了非常多的 CompletableFutures,但是CompletableFutures 有一个问题,就是如果用 thenAccept ,thenApply 这样的方法后,谁来执行回调函数呢,是调用future.complete方法的线程。
在如下例子中
CompletableFuture<String> completableFuture = new CompletableFuture<>();
completableFuture.thenApply(String -> {
System.out.println(Thread.currentThread());
System.out.println("thenApply");
return 123;
}).thenAccept(num -> {
System.out.println(Thread.currentThread());
System.out.println("thenAccept");
});
completableFuture.complete("abc");
Thread.sleep(10000);
打印值为
Thread[main,5,main]
thenApply
Thread[main,5,main]
thenAccept
执行回调函数的都是main 函数。
dledger 中CompletableFutures 往往都传递的很深,而执行 complete 都是一些单线程的任务,这部分线程资源非常珍贵。
列如发生消息流程中,我用arthas 观察执行回调函数的线程
watch *.StoreStatsService setPutMessageEntireTimeMax "@java.lang.Thread@currentThread().name" -x 1 -n 10
DefaultMessageStore 437 :
putResultFuture.thenAccept((result) -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});
执行DefaultMessageStore 437 行代码的线程居然是 QuorumAckChecker-n0
QuorumAckChecker-n0 是master 用来同步消息的线程,是个单线程的任务。我觉得如此重要的线程资源用来执行这种回调函数是非常影响性能的
类似于raft的prevote要解决的问题,在检测到有leader的数目 + !validNum > Quorum时候,是不是可以先不增加term,因为增加term是在有可能获得大部分的选票,这样才有意义。
Mostly, ledgerEndTerm is assigned in two ways:
Think about a cluster with node a, b, c.
Node a has complete data, as leader.
Node b has complete data but has just been restarted, as follower.
Node c has poor data, and is synchronizing from a for now.
The ledgerEndTerm on node c could a value greater than log's term, caused by past election.
The ledgerEndTerm on node b is the value equals log's term.
The node c with poor data will become leader, if we shutdown node a.
HandleVote method only compare ledgerEndIndex when ledgerEndTerm is equal. On this situation, ledgerEndTerm on node b is less than node c, so node b will accept the voteRequest from node c.
What does ledgerEndTerm for?
What's the reason to make ledgerEndTerm = memberState.currTerm()?
Can we let ledgerEndTerm = entry.term?
Add bulk copy support for performance
step1:Leader and follower is consistent.
step2:Shutdown follower, Delete all data, Simulated data loss.
step3:Restart follower. At this time, When leader has no new write, then existing data will not be push.
Because EntryDispatcher#doAppend always run the following code:
if (writeIndex > dLedgerStore.getLedgerEndIndex()) {
doCommit();
doCheckAppendResponse();
break;
}
I think re-compare should be triggered at this time, Triggered by doCommit response or exception.
Irregular enum name in VoteResponse$RESULT.REJECT_ALREADY__HAS_LEADER, which douled "_".
Are there performance indicators? Raft sync messages need at least 1.3ms under my test.
If I need ultra-low latency,what should I do
Add maven wrapper for user easy to use and idea support maven wrapper natively.It will help us develop and package more easily.
This issue exists the two point ideas:
(1)fistly, dledger is java raft lib, so user can select their rpc remoting framework, such as apache remoting framework, grpc, sofabolt.
(2)secondly, it can optimize dledger performance if we support protocolbuf.
I was reading the raft implementation recently, and learned a lot from it. I met some problems which are very likely I understood something wrong. Would appreciate it if someone can help with my process.
Supposing a very normal 4-node cluster, with maxHeartBreakLeak: node 0 < 1 < 2 < 3
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node0 | node0 | 1 | leader | PASS |
node1 | node0 | 1 | follower | WAIT_TO_REVOTE |
node2 | node0 | 1 | follower | WAIT_TO_REVOTE |
node3 | node0 | 1 | follower | WAIT_TO_REVOTE |
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node1 | node0 | 1 | follower | WAIT_TO_REVOTE |
node2 | node0 | 1 | follower | WAIT_TO_REVOTE |
node3 | node0 | 1 | follower | WAIT_TO_REVOTE |
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node1 | node1 | 1 | candidate | WAIT_TO_REVOTE |
node2 | node2 | 1 | candidate | WAIT_TO_REVOTE |
node3 | node0 | 1 | follower | WAIT_TO_REVOTE |
This can be mitigated by giving node 3 a much larger timeout interval, but still we might look for something better..
Some thoughts: In raft we would usually first increase term and request vote, here node 1 requests votes without increase its term and I suppose this is implementing the pre-vote algorithm mentioned in the paper.
IMHO, pre-vote is for a potential candidate(like node1) to check if it is possible to pass the election(more up-to-date than a majority of nodes) before increasing term. In our current implementation, node 1 first requests votes with term 1, and other followers with same term would return REJECT_ALREADY_HAS_LEADER. In this case other followers didn't check if the incoming vote request is more up-to-date than followers themselves. If we can somehow subdivide the REJECT_ALREADY_HAS_LEADER response to something like
then node1 receives enough prevote_accept, it can increase its term and start a revote immediately. Otherwise it just remains in WAIT_TO_REVOTE process.
p.s. Actually I didn't find how the pre-vote was implemented in Dledger, wondering if there are some pre-vote design documentations? Thanks if anyone can give me some hints on pre-vote implementation in Dledger.
For implementing a highly available embedded KV storage with DLedger, consider applying HPK to these local maps, such as HashMap.Does this consider HPPC high-performance HashMaps here?
1. Background
In the mixed deployment mode of namesrv and broker, we hope that the machine/node where namesrv and broker are mixed will not be selected as the leader, and other machines/nodes can be selected as the leader. At the same time, we hope to trigger the re-election as little as possible, so multiple nodes are set as the preferredLeader to reduce the occurrence of this situation.
2. Optimization
Send once and take multiple times mmp.
For example ,in commit
DLedgerMmapFileStore#updateCommittedIndex updateCommittedIndex
DLedgerEntry dLedgerEntry = get(newCommittedIndex);
PreConditions.check(dLedgerEntry != null, DLedgerResponseCode.DISK_ERROR);
this.committedIndex = newCommittedIndex;
this.committedPos = dLedgerEntry.getPos() + dLedgerEntry.getSize();
just want know pos and size.but in get()
indexSbr = indexFileList.getData(index * INDEX_UNIT_SIZE, INDEX_UNIT_SIZE);
PreConditions.check(indexSbr != null && indexSbr.getByteBuffer() != null, DLedgerResponseCode.DISK_ERROR, "Get null index for %d", index);
indexSbr.getByteBuffer().getInt(); //magic
long pos = indexSbr.getByteBuffer().getLong();
int size = indexSbr.getByteBuffer().getInt();
dataSbr = dataFileList.getData(pos, size);
PreConditions.check(dataSbr != null && dataSbr.getByteBuffer() != null, DLedgerResponseCode.DISK_ERROR, "Get null data for %d", index);
DLedgerEntry dLedgerEntry = DLedgerEntryCoder.decode(dataSbr.getByteBuffer());
PreConditions.check(pos == dLedgerEntry.getPos(), DLedgerResponseCode.DISK_ERROR, "%d != %d", pos, dLedgerEntry.getPos());
indexFileList have already known pos and size.Have to read dataFileList and
DLedgerEntryCoder.decode?decode? Is this necessary?
In my test ,dLedger is very poor performance !!!!
raft论文中,Figure 8图展示了一种即使满足多数节点数据复制,仍然可能被覆盖的场景。
To eliminate problems like the one in Figure 8, Raft never commits log entries from previous terms by counting replicas. Only log entries from the leader’s current term are committed by counting replicas; once an entry from the current term has been committed in this way, then all prior entries are committed indirectly because of the Log Matching Property. There are some situations where a leader could safely conclude that an older log entry is committed (for example, if that entry is stored on every server), but Raft takes a more conservative approach for simplicity。
我看到dledger实现中truncate和append是分开的,是否会破坏匹配规则。
rocketmq 使用dledger模式,pom文件配置:
io.openmessaging.storage
dledger
0.1
org.apache.rocketmq
rocketmq-remoting
org.slf4j
slf4j-log4j12
仓库下载的dledger包,rocketmq可以正常启动。
下载 github上release/0.1版本添加了日志信息,打包命令:mvn clean package -DskipTests=true
新的dledger包替换原来仓库下载的包,报错信息:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/app/jlxiao/wemq-master/lib/dledger-0.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/app/jlxiao/wemq-master/lib/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
load config properties file OK, ../conf/broker.properties
java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory cannot be cast to ch.qos.logback.classic.LoggerContext
请问该如何打包? 能够避免这个问题。
Hi is there any enhancement feature do u have? i would like to contribute in this project
public boolean flush(final int flushLeastPages) {
boolean result = true;
MmapFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
}
return result;
}
As you can see on line 506, the result would be false if it's flushed successfully, I was wondering if it's expected by design or a bug?
7w in tps at cluster ,but one slave just only 1.7w tps.
i see this Quota exhaust in broker master broker_default.log
你好,io.openmessaging.storage.dledger.DLedgerLeaderElector
中属性needIncreaseTermImmediately
最好是设置成volatile类型,保证多线程间内存可见性。
private boolean needIncreaseTermImmediately = false;
Now we use invokeAsync in this way in dledger
https://github.com/openmessaging/openmessaging-storage-dledger/blob/2450e39933931afb10d5eea47f55cdaa78973f11/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java#L124-L127
if invokeAsync method timeout, there will be a thread to invoke the callback and responseFuture.getResponseCommand() return null, so a NullPointerException will be thrown.
we should deal with this situation when invokeAsync method timeout
openmessaging-storage-dledger/src/main/java/io/openmessaging/storage/dledger/DLedger.java
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
logger.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
dLedgerServer.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
logger.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
这个hook不存在多个线程并发调用吧,而且也不会被调用多次,shutdownTimes感觉是多余的吧。
Background
The online service is a single push, and now I want to upgrade to batch push.
When the configuration of the leader node and the follower node is inconsistent with batch push, the data cannot be pushed.
optimization
Compatible with master and follower Synchronize data when the batch push configuration is inconsistent.
@dongeforever 这个项目是要写一个简单实现我们这个协议的例子吗,还是有其他的考量,最近想在研究https://github.com/atomix/atomix 用openmessaging封装下,实现一个简单的事件驱动服务,初步发现思路好想一样。
Bump junit to 5.8.1.
RocketMQ doesn't implement putMessages in DLedgerCommitLog.
It could be easier to implement putMessages with DLedger supporting Batch Entry Appending.
在每一次选举完成的时候是不是应该切回到默认值,要不然下次的选举还会使用上一轮lastParseResult,虽然最终没有影响,但是从逻辑上和思考角度来说,不适合。
The initial state of the broker group
n0 is master, preferredLeader
n1 and n2 is slave
The status after the re-election is as follows
n0 and n2 is slave
n1 is master
Problem Description
n0 node cannot synchronize data.
If the cluster restarts, the brokerId is always -1.
Problem Analysis
When the preferredLeader is triggered to re-initiate the election, if the ldeger entry index lags behind the leader node, the vote fails.
As the term increases by 1, the node is always larger than the other node's term and cannot be re-elected as a follower. Therefore, the preferredLeader will always be in the candidate state, resulting in the node being unavailable.
io.openmessaging.storage.dledger.MemberState.Role.UNKNONW 定义错了吧
If quota exhaust, warn it..
We do perf append test on 4u8g*3 dledger cluster, when leader node reaches 3.1u, follower node only 0.3u
cpu utiliazation = (3.1 + 0.3 + 0.3)/ 12 = 30%
You are expected to do the following task to implement multi-raft on dledger:
If over half node handleVote response REJECT_ALREADY_VOTED, whether need change parseResult from WAIT_TO_VOTE_NEXT to WAIT_TO_REVOTE?
Because in extreme cases(e.g net blocking, time factor...), candidate maybe start a new election with incremented term before leader's heartbeat, this will cause the leader failure.
peerMap.put(peerInfo.split("-")[0], peerInfo.split("-")[1]);
The code like this doesn't support using domain name with '-', such as "n1-123.abc.com"
Java 9 introduces overridden methods with covariant return types for the following methods in java.nio.ByteBuffer:
In Java 9 they all now return ByteBuffer, whereas the methods they override return Buffer,
resulting in exceptions like this when executing on Java 8 and lower:
java.lang.NoSuchMethodError: java.nio.ByteBuffer.limit(I)Ljava/nio/ByteBuffer
This is because the generated byte code includes the static return type of the method, which is not found on Java 8 and lower because the overloaded methods with covariant return types don't exist (the issue appears even with source and target 8 or lower in compilation parameters).
The workaroud solution is to cast ByteBuffer instances to Buffer before calling the method. So, If we want to support JDK 11, we need to polish here and keep be compatible.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.