Git Product home page Git Product logo

mq-demo's People

Contributors

chenzlalvin avatar duhenglucky avatar fuyou001 avatar lollipopjin avatar panzhi33 avatar zhouxinyu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mq-demo's Issues

消费消息的时候部分的信息打印不出来,查看阿里的控制台出现错误

TID: 284 STATE: WAITING
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

如何维护一个TOPIC多个TAG的订阅关系?

java-srpingboot-demo版本中订阅者代码:

//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
public class ConsumerClient {

    @Autowired
    private MqConfig mqConfig;

    @Autowired
    private DemoMessageListener messageListener;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);

        //订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getTopic());
        subscription.setExpression(mqConfig.getTag());
        subscriptionTable.put(subscription, messageListener);
        //订阅多个topic如上面设置

        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }

}

其中

        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getTopic());
        subscription.setExpression(mqConfig.getTag());

如果同一个topic不同的tag,无法加入到subscriptionTable 中,因为Subscription 重写了hashCode equals方法

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (getClass() != obj.getClass()) {
            return false;
        }
        Subscription other = (Subscription) obj;
        if (topic == null) {
            if (other.topic != null) {
                return false;
            }
        } else if (!topic.equals(other.topic)) {
            return false;
        }
        return true;
    }

所以对于同一个topic不同的tag的场景,如何去使用?

查询 官方文档

image

那么如何根据不同的TAG区分不同的业务处理?例如在自己的MessageListener 中区分:

    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {

            message.getTag(); // 根据tag区分不同业务???? 

            return Action.CommitMessage;
        } catch (Exception e) {
            return Action.ReconsumeLater;
        }
    }

无法使用最新版本SDK

ONS服务器版本:4.0
TCP SDK版本:2.0.3.Final
错误信息:
error code: FETCH_TOPIC_ROUTE_FAILURE

UNAVAILABLE: Network closed for unknown reason

排查为从1.x的SDK升级到2.x的SDK无法连接name server。原因可能是由于netty-gprc和netty版本对应问题,但不确定,目前暂无时间深入DEBUG。

使用项目为:
https://github.com/DR-YangLong/ons-spring-boot-starter

我怎么才能在 listener 中把 message 的内容,发送 post 给我的服务器呢?

public Action consume(Message message, ConsumeContext context) {
        System.out.println(" Receive: " + message);
        String body = new String(message.getBody());
        System.out.println(" body: " + body);
        this.setMessage(message);

        try {
       //官方的 spring 例子中写的 dosomething
            String url = "http://114.215.202.178:3002/v1/base2pro/appliance/data/report";
            String result;
            result = HttpUtils.API(url);//使用的import org.apache.http.client.HttpClient;这种第三方类,底层一直会报错class Not Found!
            System.out.println("send ok:" + result);

            return Action.CommitMessage;
        } catch (Exception e) {
            //消费失败
            return Action.ReconsumeLater;
        }
    }

ons-client jar包内guava 19.0版本源码与外部导入的guava 20.0版本jar包源码冲突

你好,我在使用ons-client 时发现ons-client jar包内有guava的源码,但我们项目使用的部分其他组件使用的是guava 20.0版本,对比19.0版本多了几个方法。应用启动后,classload先加载了ons-client jar内的guava部分源码,导致方法不存在。这个有啥好的解决方法吗?可以向ons-client的作者反馈先最好把别的引用的源码去掉吗?
多谢啦!
ps.虽然可以在jvm启动时优先指定guava 20.0版本jar包优先加载,但问题的根源应该是ons-client jar包内不应该有其他外部依赖的源码,所以问问有没有别的办法。

不支持jdk1.6?

image

我看pom里写的java版本是1.6,但是用1.6根本不能运行

1.8.4.Final~2.0.2.Final消费者注册无效

@AllArgsConstructor
@Slf4j
public class HomeCardCallbackListener implements MessageListener {

    private final HomeCardMapper homeCardMapper;

    private final MsgRecordService msgRecordService;

    private final AliRocketMQProperties aliRocketMQProperties;

    @Override
    @Transactional
    public Action consume(Message message, ConsumeContext context) {

        log.info("callback msg -> {}", message);

        try {
            String callbackContent = new String(message.getBody());
            CardCallbackRequestDTO request = JSON.parseObject(callbackContent, CardCallbackRequestDTO.class);

            //更新发送记录的回执
            MsgRecord msgRecord = msgRecordService.findByMessageId(request.getMessageId());
            msgRecord.setCallback(callbackContent);
            msgRecord.setUpdateTime(new Date());
            msgRecordService.updateById(msgRecord);

            //如果回调成功且是创建卡片,更新卡片创建状态
            if (Objects.equals(0, request.getCode()) && Objects.equals(request.getSendTag(), aliRocketMQProperties.getCreateTag())) {
                Long planId = Long.valueOf(request.getReportId());
                HomeCard homeCard = homeCardMapper.selectById(planId);
                homeCard.setStatus(HomeCardStatusEnum.OK.getValue());
                homeCard.setUpdateTime(new Date());
                homeCardMapper.updateById(homeCard);
            }
            return Action.CommitMessage;
        } catch (Exception e) {
            log.error("consume failed ->", e);
            return Action.ReconsumeLater;
        }
    }
}```
```java
    @PostConstruct
    public void startConsumer() {
        Properties properties = mqProperties.getProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqProperties.getGroupId());
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(mqProperties.getTopic(), mqProperties.getCallbackTag(),
                new HomeCardCallbackListener(homeCardMapper, msgRecordService, mqProperties));
        consumer.start();
    }

或者设置listener为bean

@Component
@AllArgsConstructor
@Slf4j
public class HomeCardCallbackListener implements MessageListener {

    private final HomeCardMapper homeCardMapper;

    private final MsgRecordService msgRecordService;

    private final AliRocketMQProperties aliRocketMQProperties;

    @Override
    @Transactional
    public Action consume(Message message, ConsumeContext context) {

        log.info("callback msg -> {}", message);

        try {
            String callbackContent = new String(message.getBody());
            CardCallbackRequestDTO request = JSON.parseObject(callbackContent, CardCallbackRequestDTO.class);

            //更新发送记录的回执
            MsgRecord msgRecord = msgRecordService.findByMessageId(request.getMessageId());
            msgRecord.setCallback(callbackContent);
            msgRecord.setUpdateTime(new Date());
            msgRecordService.updateById(msgRecord);

            //如果回调成功且是创建卡片,更新卡片创建状态
            if (Objects.equals(0, request.getCode()) && Objects.equals(request.getSendTag(), aliRocketMQProperties.getCreateTag())) {
                Long planId = Long.valueOf(request.getReportId());
                HomeCard homeCard = homeCardMapper.selectById(planId);
                homeCard.setStatus(HomeCardStatusEnum.OK.getValue());
                homeCard.setUpdateTime(new Date());
                homeCardMapper.updateById(homeCard);
            }
            return Action.CommitMessage;
        } catch (Exception e) {
            log.error("consume failed ->", e);
            return Action.ReconsumeLater;
        }
    }
}
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer(AliRocketMQProperties mqProperties, HomeCardCallbackListener listener) {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = mqProperties.getProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqProperties.getGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        //订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqProperties.getTopic());
        subscription.setExpression(mqProperties.getCallbackTag());
        subscriptionTable.put(subscription, listener);
        //订阅多个topic如上面设置
        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }

均不可以正常注册消费者,且没有报错日志,生产者是正常的

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.