aliwaremq / mq-demo Goto Github PK
View Code? Open in Web Editor NEWDemo for AliwareMQ
Home Page: https://www.aliyun.com/product/ons
Demo for AliwareMQ
Home Page: https://www.aliyun.com/product/ons
TID: 284 STATE: WAITING
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
如题
如果nameSrvAddr配置http连接
会提示no topic
java-srpingboot-demo版本中订阅者代码:
//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
public class ConsumerClient {
@Autowired
private MqConfig mqConfig;
@Autowired
private DemoMessageListener messageListener;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = mqConfig.getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
//将消费者线程数固定为20个 20为默认值
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
consumerBean.setProperties(properties);
//订阅关系
Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
Subscription subscription = new Subscription();
subscription.setTopic(mqConfig.getTopic());
subscription.setExpression(mqConfig.getTag());
subscriptionTable.put(subscription, messageListener);
//订阅多个topic如上面设置
consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}
}
其中
Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
Subscription subscription = new Subscription();
subscription.setTopic(mqConfig.getTopic());
subscription.setExpression(mqConfig.getTag());
如果同一个topic
不同的tag
,无法加入到subscriptionTable
中,因为Subscription
重写了hashCode
equals
方法
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Subscription other = (Subscription) obj;
if (topic == null) {
if (other.topic != null) {
return false;
}
} else if (!topic.equals(other.topic)) {
return false;
}
return true;
}
所以对于同一个topic
不同的tag
的场景,如何去使用?
查询 官方文档
那么如何根据不同的TAG区分不同的业务处理?例如在自己的MessageListener
中区分:
@Override
public Action consume(Message message, ConsumeContext context) {
try {
message.getTag(); // 根据tag区分不同业务????
return Action.CommitMessage;
} catch (Exception e) {
return Action.ReconsumeLater;
}
}
ONS服务器版本:4.0
TCP SDK版本:2.0.3.Final
错误信息:
error code: FETCH_TOPIC_ROUTE_FAILURE
UNAVAILABLE: Network closed for unknown reason
排查为从1.x的SDK升级到2.x的SDK无法连接name server。原因可能是由于netty-gprc和netty版本对应问题,但不确定,目前暂无时间深入DEBUG。
使用项目为:
https://github.com/DR-YangLong/ons-spring-boot-starter
public Action consume(Message message, ConsumeContext context) {
System.out.println(" Receive: " + message);
String body = new String(message.getBody());
System.out.println(" body: " + body);
this.setMessage(message);
try {
//官方的 spring 例子中写的 dosomething
String url = "http://114.215.202.178:3002/v1/base2pro/appliance/data/report";
String result;
result = HttpUtils.API(url);//使用的import org.apache.http.client.HttpClient;这种第三方类,底层一直会报错class Not Found!
System.out.println("send ok:" + result);
return Action.CommitMessage;
} catch (Exception e) {
//消费失败
return Action.ReconsumeLater;
}
}
导包失败
你好,我在使用ons-client 时发现ons-client jar包内有guava的源码,但我们项目使用的部分其他组件使用的是guava 20.0版本,对比19.0版本多了几个方法。应用启动后,classload先加载了ons-client jar内的guava部分源码,导致方法不存在。这个有啥好的解决方法吗?可以向ons-client的作者反馈先最好把别的引用的源码去掉吗?
多谢啦!
ps.虽然可以在jvm启动时优先指定guava 20.0版本jar包优先加载,但问题的根源应该是ons-client jar包内不应该有其他外部依赖的源码,所以问问有没有别的办法。
如题,在ConsumerClient里的消费者注册之后在管理后台看不到有消费者订阅。这是一段有bug的代码,spring版本的没有这个问题。
@AllArgsConstructor
@Slf4j
public class HomeCardCallbackListener implements MessageListener {
private final HomeCardMapper homeCardMapper;
private final MsgRecordService msgRecordService;
private final AliRocketMQProperties aliRocketMQProperties;
@Override
@Transactional
public Action consume(Message message, ConsumeContext context) {
log.info("callback msg -> {}", message);
try {
String callbackContent = new String(message.getBody());
CardCallbackRequestDTO request = JSON.parseObject(callbackContent, CardCallbackRequestDTO.class);
//更新发送记录的回执
MsgRecord msgRecord = msgRecordService.findByMessageId(request.getMessageId());
msgRecord.setCallback(callbackContent);
msgRecord.setUpdateTime(new Date());
msgRecordService.updateById(msgRecord);
//如果回调成功且是创建卡片,更新卡片创建状态
if (Objects.equals(0, request.getCode()) && Objects.equals(request.getSendTag(), aliRocketMQProperties.getCreateTag())) {
Long planId = Long.valueOf(request.getReportId());
HomeCard homeCard = homeCardMapper.selectById(planId);
homeCard.setStatus(HomeCardStatusEnum.OK.getValue());
homeCard.setUpdateTime(new Date());
homeCardMapper.updateById(homeCard);
}
return Action.CommitMessage;
} catch (Exception e) {
log.error("consume failed ->", e);
return Action.ReconsumeLater;
}
}
}```
```java
@PostConstruct
public void startConsumer() {
Properties properties = mqProperties.getProperties();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqProperties.getGroupId());
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(mqProperties.getTopic(), mqProperties.getCallbackTag(),
new HomeCardCallbackListener(homeCardMapper, msgRecordService, mqProperties));
consumer.start();
}
或者设置listener为bean
@Component
@AllArgsConstructor
@Slf4j
public class HomeCardCallbackListener implements MessageListener {
private final HomeCardMapper homeCardMapper;
private final MsgRecordService msgRecordService;
private final AliRocketMQProperties aliRocketMQProperties;
@Override
@Transactional
public Action consume(Message message, ConsumeContext context) {
log.info("callback msg -> {}", message);
try {
String callbackContent = new String(message.getBody());
CardCallbackRequestDTO request = JSON.parseObject(callbackContent, CardCallbackRequestDTO.class);
//更新发送记录的回执
MsgRecord msgRecord = msgRecordService.findByMessageId(request.getMessageId());
msgRecord.setCallback(callbackContent);
msgRecord.setUpdateTime(new Date());
msgRecordService.updateById(msgRecord);
//如果回调成功且是创建卡片,更新卡片创建状态
if (Objects.equals(0, request.getCode()) && Objects.equals(request.getSendTag(), aliRocketMQProperties.getCreateTag())) {
Long planId = Long.valueOf(request.getReportId());
HomeCard homeCard = homeCardMapper.selectById(planId);
homeCard.setStatus(HomeCardStatusEnum.OK.getValue());
homeCard.setUpdateTime(new Date());
homeCardMapper.updateById(homeCard);
}
return Action.CommitMessage;
} catch (Exception e) {
log.error("consume failed ->", e);
return Action.ReconsumeLater;
}
}
}
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer(AliRocketMQProperties mqProperties, HomeCardCallbackListener listener) {
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = mqProperties.getProperties();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqProperties.getGroupId());
//将消费者线程数固定为20个 20为默认值
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
consumerBean.setProperties(properties);
//订阅关系
Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
Subscription subscription = new Subscription();
subscription.setTopic(mqProperties.getTopic());
subscription.setExpression(mqProperties.getCallbackTag());
subscriptionTable.put(subscription, listener);
//订阅多个topic如上面设置
consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}
均不可以正常注册消费者,且没有报错日志,生产者是正常的
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.