19年毕业,本科是软件工程,喜欢写作,小时候曾幻想以后写一本爆火的小说。大三开源 JavaGuide 并一直完善至今。
关于作者:
联系我:
A custom RPC framework implemented by Netty+Kyro+Zookeeper.(一款基于 Netty+Kyro+Zookeeper 实现的自定义 RPC 框架-附详细实现过程和相关教程。)
Home Page: https://gitee.com/SnailClimb/guide-rpc-framework
License: Other
19年毕业,本科是软件工程,喜欢写作,小时候曾幻想以后写一本爆火的小说。大三开源 JavaGuide 并一直完善至今。
关于作者:
联系我:
如果一个服务提供者下线,和已经建立连接的服务消费者无法应答心跳,抛出异常断开连接。服务重新上线后,因为服务提供者信息被放在了缓存,缓存内容没有更新,导致客户端无法感知到服务重新上线,而不会去向该服务提供者请求服务
如题:一致性哈希负载均衡器中,可以使用服务名的哈希值作为选择服务端地址的 key 吗?
Dubbo 中使用调用参数进行散列作为 key,保证相同参数的请求总是发到同一提供者。
如果使用服务名称散列后的值做为 key 的话,会不会导致不同客户端对同服务的所有请求都落在一个服务器上?
看代码服务注册是创建的永久节点,那zk怎么感知服务宕机呢
当前master的SingletonFactory代码是这样的:
public final class SingletonFactory {
private static final Map<String, Object> OBJECT_MAP = new HashMap<>();
private SingletonFactory() {
}
public static <T> T getInstance(Class<T> c) {
String key = c.toString();
Object instance;
synchronized (SingletonFactory.class) {
instance = OBJECT_MAP.get(key);
if (instance == null) {
try {
instance = c.getDeclaredConstructor().newInstance();
OBJECT_MAP.put(key, instance);
} catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
return c.cast(instance);
}
}
我很疑惑为什么所有单例的对象需要共享同一把锁?他们之间也互不冲突。
其次,代码中似乎没有体现出double check,目前master中代码的逻辑中,无论OBJECT_MAP中是否已经存在目标单例对象,都需要先与其他线程竞争锁,这一点是不是不太合理?
基于这些存在的问题, 是否改成下面的代码会更好?
public class SingletonFactory {
private static final Map<String, Object> SINGLETON_MAP = new ConcurrentHashMap<>();
private static final Map<Class<?>, Object> LOCK_MAP = new ConcurrentHashMap<>();
private SingletonFactory() {
}
/**
* Class must have no-arguments constructor.
*/
public static <T> T getInstance(Class<T> clazz) {
if (clazz == null) {
throw new IllegalArgumentException();
}
String key = clazz.toString();
Object instance = SINGLETON_MAP.get(key);
if (instance == null) {
synchronized (LOCK_MAP.computeIfAbsent(clazz, k -> new Object())) {
instance = SINGLETON_MAP.get(key);
if (instance == null) {
try {
instance = clazz.getDeclaredConstructor().newInstance();
SINGLETON_MAP.put(key, instance);
} catch (InstantiationException | InvocationTargetException | NoSuchMethodException
| IllegalAccessException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
}
return clazz.cast(instance);
}
}
Zookeeper已启动
其它配置都和教程一致
服务端端运行状态:
[main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
[main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established, initiating session, client: /127.0.0.1:59179, server: 127.0.0.1/127.0.0.1:2181
[main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x18318f179ba0006, negotiated timeout = 40000
[main-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: RECONNECTED
客户端运行状态:
[main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x18318f179ba0004, negotiated timeout = 40000
[main-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
[main] ERROR github.javaguide.registry.zk.util.CuratorUtils - get children nodes for path [/my-rpc/github.javaguide.HelloServicetest1version1] fail
Exception in thread "main" github.javaguide.exception.RpcException: 没有找到指定的服务:github.javaguide.HelloServicetest1version1
我看这个 EXTENSION_INSTANCES 只是用来实例化,那么能不能改成这样?
难道只是因为 putIfAbsent 是原子性的?在进入方法之前,不也用 synchronized 加锁了?
private T createExtension(String name) {
// 从加载类缓存中获取 class 对象
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw new RuntimeException("No such extension of name " + name);
}
T instance = null;
try {
instance = (T) clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
log.error(e.getMessage());
}
return instance;
}
如果是的话,怎么实现远程服务调用方想调用远程服务提供方任何类任何方法都行呢?
项目和readme中有些地方写成了kyro,实际应该是kryo。
/**
* Avoid re applying buffer space every time serialization
*/
private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
@Override
public byte[] serialize(Object obj) {
Class<?> clazz = obj.getClass();
Schema schema = RuntimeSchema.getSchema(clazz);
byte[] bytes;
try {
bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
} finally {
BUFFER.clear();
}
return bytes;
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
Schema<T> schema = RuntimeSchema.getSchema(clazz);
T obj = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
1,其中LinkedBuffer 为静态,在并发情况下其中一个线程执行这个方法BUFFER.clear(),其他的是否会报错
2,Schema schema 这个可以用一个ConcurrentHashMap 存储起来,性能会更好
下面是我修改的代码:
private static Map<Class<?>, Schema<?>> schemaMap = new ConcurrentHashMap<>();
private static <T>Schema<T> getSchema(Class<?> clazz) {
if (schemaMap.containsKey(clazz)) {
return (Schema<T>)schemaMap.get(clazz);
}
Schema schema = RuntimeSchema.getSchema(clazz);
// 双重检查
if (schema != null) {
schemaMap.put(clazz, schema);
}
return schema;
}
@Override
public byte[] serialize(Object object) {
Class<?> clazz = object.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema schema = getSchema(clazz);
return ProtostuffIOUtil.toByteArray(object, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
Schema<T> schema = getSchema(clazz);
T obj = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
Will the second handler overwrite the first handler?
[main] WARN org.springframework.context.annotation.AnnotationConfigApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'helloServiceImpl' defined in file [/Users/liuyangos8888/20200808/guide-rpc-framework/example-server/target/classes/github/javaguide/serviceimpl/HelloServiceImpl.class]: Initialization of bean failed; nested exception is github.javaguide.exception.RpcException: KeeperErrorCode = Unimplemented for /my-rpc/github.javaguide.HelloServicetest1version1/127.0.0.1:9998
Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'helloServiceImpl' defined in file [/Users/liuyangos8888/20200808/guide-rpc-framework/example-server/target/classes/github/javaguide/serviceimpl/HelloServiceImpl.class]: Initialization of bean failed; nested exception is github.javaguide.exception.RpcException: KeeperErrorCode = Unimplemented for /my-rpc/github.javaguide.HelloServicetest1version1/127.0.0.1:9998
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:603)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:226)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:893)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:879)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551)
at org.springframework.context.annotation.AnnotationConfigApplicationContext.(AnnotationConfigApplicationContext.java:89)
at NettyServerMain.main(NettyServerMain.java:18)
Caused by: github.javaguide.exception.RpcException: KeeperErrorCode = Unimplemented for /my-rpc/github.javaguide.HelloServicetest1version1/127.0.0.1:9998
at github.javaguide.registry.zk.util.CuratorUtils.createPersistentNode(CuratorUtils.java:58)
at github.javaguide.registry.zk.ZkServiceRegistry.registerService(ZkServiceRegistry.java:23)
at github.javaguide.provider.ServiceProviderImpl.publishService(ServiceProviderImpl.java:73)
at github.javaguide.spring.SpringBeanPostProcessor.postProcessBeforeInitialization(SpringBeanPostProcessor.java:48)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:416)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1788)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:595)
... 10 more
想问下在RpcRequestHandler里对获取到的Method加个缓存会不会有性能上的提升/会不会有什么副作用?反射调用的invoke性能到底如何呢?
codec包中的编码器解码器是根据这个自定义协议设计的,那这个自定义协议如果让我自己设计,思路应该是怎样的?字段、长度等方面的考量
指的是以下部分
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
+-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+----- --+-----+-----+-------+
| magic code |version | full length | messageType| codec|compress| RequestId |
+-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
| |
| body |
| |
| ... ... |
+-------------------------------------------------------------------------------------------------------+
这个部分 这里没有对齐请谅解
本地调用时需要把具体调用哪个实现类传递过去吗
CollectionUtils StringUtils
不建议为了使用一个小的 API 就引入两个库哈,自己提取一个工具类或许更好。
Originally posted by @Snailclimb in #60 (comment)
项目里这儿没有@component注解,但在markdown的介绍里是有的,分析了下也该有
很多地方使用了github.javaguide.factory.SingletonFactory获取单例bean
为什么不用@Autowired,Spring不是天然支持单例吗
public static <T> T getInstance(Class<T> c) {
String key = c.toString();
Object instance = null;
if (instance == null) {
synchronized (SingletonFactory.class) {
instance = OBJECT_MAP.get(key);
if (instance == null) {
try {
instance = c.getDeclaredConstructor().newInstance();
OBJECT_MAP.put(key, instance);
} catch (IllegalAccessException | InstantiationException e) {
throw new RuntimeException(e.getMessage(), e);
} catch (NoSuchMethodException | InvocationTargetException e) {
e.printStackTrace();
}
}
}
}
return c.cast(instance);
}
这一段感觉在模仿标准的 dcl 写法,但是
Object instance = null;
if (instance == null) {
synchronized (SingletonFactory.class) {
明显 instance 肯定为空,判断多余,且synchronized (SingletonFactory.class) { 使用 类锁,锁的力度很大,在多线程情况下并发度很低阻塞会很严重
我自己 有一个Rest风格的分布式中间件,主要是用来补充Mars-Java的开源生态的。但是RPC的还没有,我先了解一下,后续或许会专门为这个项目开发一个starter。
看到作者说最开始使用BIO(Socket),那么如果能够提供一个最简单的版本,没有使用Zookeeper、Spring、负载均衡这些,初学者能够看得更明白,也更能够明白RPC的本质
guide 哥我在复习 zookeeper 的时候发现它的 watcher 机制会在触发一次后就删除掉监听器,也就是说现在只能对节点的一次改变进行监听,第二次就不生效了。
目前有两种解决方案:
PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildCacheEvent) -> {
// List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);
// SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);
// 删除缓存以便下次从 zookeeper 拉取最新数据
SERVICE_ADDRESS_MAP.remove(rpcServiceName);
};
你好,有个疑问RPC调用服务端方法参数传递的是一个java对象,那这个java对象参数是不是必须要在客户端和服务端必须存在并且包路径都必须相同
请问github.javaguide.provider.impl.ZkServiceProviderImpl类中为什么有serviceMap,还需要registeredService这个set去保存接口名?
/**
* key: rpc service name(interface name + version + group)
* value: service object
*/
private final Map<String, Object> serviceMap;
private final Set<String> registeredService;
// 在get方法中:
String rpcServiceName = rpcServiceConfig.getRpcServiceName();
if (registeredService.contains(rpcServiceName)) {
return;
}
registeredService.add(rpcServiceName);
serviceMap.put(rpcServiceName, rpcServiceConfig.getService());
直接用serviceMap.containsKey(rpcServiceName)不行吗?对此有些疑惑 :-)
我个人很喜欢这个项目,因为它能帮助我更好的理解rpc的细节。前段时间我看了李林峰的 《Netty权威指南》,也自己实现过简单的rpc,但是却不如你的全面和细节。在阅读源码的过程中,我发现了一下小问题,也可能是我没有get到你的思路,请指教
个人的一些理解,如有不足,还望指教
没看到对应的相关清除对应服务地址的方法,可能是我没发现吧,希望指出
This method may get the wrong IP address, it may get virtual IP when system is Win and VMware installed.
guide 教程可以开源吗。。。我就问问。。
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.