Git Product home page Git Product logo

chengxy-nds / springboot-notebook Goto Github PK

View Code? Open in Web Editor NEW
3.0K 54.0 1.1K 6.55 MB

Springboot-Notebook 一个以 springboot 为基础开发框架, 整合 Redis 、Mysql 、 Rabbitmq 、ES 、MongoDB、sharding-jdbc 分库分表、zookeeper 、web人脸识别 、实时消息推送 、SQL优化、注册中心 、数据脱敏 等互联网主流技术, 文章图解理论配合实战案例,实现开发中常见功能点的综合项目。 本着拿来即用的原则,助力于减少开发者在工作中的学习成本。

Java 80.88% HTML 12.53% CSS 2.10% JavaScript 4.43% Shell 0.06%
rabbitmq redis springboot java mysql

springboot-notebook's People

Contributors

chengxy-nds avatar dependabot[bot] avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

springboot-notebook's Issues

RabbitMQ 消息重试+手动确认

我觉得那个踩坑可能不对,比如:channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
为什么不这样设置 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);,让其不要重新入队,而是给这个队列配置一个死信交换机和死信队列,一旦拒绝了,就让其发送到死信队列中,这样就省略进行人工补偿等。

当然,SpringBoot 也支持消费者重试机制,坑爹的是,不能使用 try..catch 处理异常,一旦使用了,重试机制就失效了,不过我提供了支持重试+重试失败之后消息自动到死信队列中的方案。

① application.yaml

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated # 消息发送到交换机会触发回调方法
    publisher-returns: true # 是否开启生产者消息回退功能,队列中的消息不可路由回退到生产者
    listener:
      simple:
        retry:
          enabled: true # 开启消费者重试
          max-attempts: 5 # 最大重试次数,默认为 3 次
          initial-interval: 2000 # 消息的间隔时间 默认为 1 秒
        acknowledge-mode: manual # 手动确认
        default-requeue-rejected: false # 重试次数超过上面的设置之后是否丢失

② 配置类:配置普通交换机、普通队列、死信交换机和死信队列

package com.github.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2022-05-27 10:49:11
 */
@Configuration
public class ConfirmConfig {

    /**
     * 交换机名称
     */
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

    /**
     * 死信交换机名称
     */
    public static final String DEAD_EXCHANGE_NAME = "dead.exchange";

    /**
     * 队列的名称
     */
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    /**
     * 死信队列
     */
    public static final String DEAD_QUEUE_NAME = "dead.queue";

    /**
     * routing_key
     */
    public static final String CONFIRM_ROUTING_KEY = "confirm";

    /**
     * dead routing key
     */
    public static final String DEAD_ROUTING_KEY = "dead";

    /**
     * 配置交换机
     */
    @Bean
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    /**
     * 配置交换机
     */
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE_NAME);
    }

    /**
     * 配置队列
     */
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME)
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                .build();
    }

    /**
     * 配置队列
     */
    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE_NAME)
                .build();
    }

    /**
     * 配置绑定关系
     */
    @Bean
    public Binding confirmBinding() {
        return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(CONFIRM_ROUTING_KEY);
    }

    /**
     * 配置绑定关系
     */
    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTING_KEY);
    }


}

③ 生产者的代码

package com.github.web;

import com.github.config.ConfirmConfig;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2022-05-27 13:37:39
 */
@Slf4j
@Value
@RestController
@RequiredArgsConstructor
@RequestMapping(value = "/confirm")
public class ConfirmController {

    RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/sendMsg/{msg}")
    public String sendMsg(@PathVariable String msg) {
        log.info("发送的消息是:{}", msg);
        String id = "1";
        CorrelationData correlationData = new CorrelationData(id);
        // 发送消息
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, msg.getBytes(StandardCharsets.UTF_8), correlationData);
        return "发送消息";
    }

}

④ 消费者的代码

package com.github.listener;

import com.github.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2022-05-25 14:20:14
 */
@Slf4j
@Component
public class RabbitmqListener {


    // 重试次数
    private int retryCount = 0;

    /**
     * 如果是重试机制,就不能使用 try-catch 捕获,否则重试机制将失效。
     * 如果是自动 ack ,多次重试还是失败,消息就会自动确认,消息就会丢失;
     * 如果是手动 ack ,多次重试还是失败,也无法 nack ,就会一直处于 unacked 状态,导致消息积压。
     *
     * @param message
     * @param channel
     * @param tag
     * @throws IOException
     */
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receive(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        try {
            // 处理主要业务
            log.info("接收到的消息是:{}", msg);
            int i = 10 / 0;
            // 确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            retryCount++;
            log.info("重试次数:{}", retryCount);
            // 处理业务失败,还要进行其他操作,比如记录失败原因
            log.info("记录失败原因 ====>");
            // 抛出异常,触发重试机制
            throw e;
        } finally {
            // 重试次数达到限制
            if (retryCount == 5) {
                log.info("消息异常,nack 消息到死信队列");
                channel.basicNack(tag, false, false);
            }
        }
    }

    @RabbitListener(queues = ConfirmConfig.DEAD_QUEUE_NAME)
    public void receiveDead(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("死信队列接收到的消息是:{}", msg);
        channel.basicAck(tag, false);
    }

}

⑤ 测试

  • 发送请求
curl 'http://127.0.0.1:8080/confirm/sendMsg/哈哈' -X GET 
  • 结果输出:自己看控制台。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.