基于Spring的RabbitMQ消息异步处理实践
前言:在当今的分布式系统架构中,消息中间件扮演着至关重要的角色。RabbitMQ 作为一款成熟、稳定且广泛使用的消息队列系统,凭借其出色的可靠性、灵活性和易集成性,成为众多企业构建异步通信和解耦服务的首选方案。本文将带你快速上手 RabbitMQ,助你掌握这一关键技术,为系统带来更高的可扩展性与健壮性。
1.添加Maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2.application.yml 配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated # 开启发布确认
publisher-return: true # 开启发布返回
listener:
simple:
acknowledge-mode: manual # 手动确认模式
prefetch: 1 # 每次预取一个消息
3.配置类RabbitConfig.java
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
// 定义队列名称
public static final String DEMO_QUEUE = "demo.queue";
public static final String DEMO_EXCHANGE = "demo.exchange";
public static final String DEMO_ROUTING_KEY = "demo.routingkey";
// 创建队列
@Bean
public Queue demoQueue() {
return new Queue(DEMO_QUEUE, true, false, false);
}
// 创建直连交换机
@Bean
public DirectExchange demoExchange() {
return new DirectExchange(DEMO_EXCHANGE, true, false);
}
// 绑定队列到交换机
@Bean
public Binding bindingDemo() {
return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(DEMO_ROUTING_KEY).noargs();
}
// 配置消息序列化方式
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
4.生产者DemoProducer.java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class DemoProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(RabbitConfig.DEMO_EXCHANGE,
RabbitConfig.DEMO_ROUTING_KEY,
message);
System.out.println("Sent message: " + message);
}
}
5.消费者DemoConsumer.java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AcknowledgeMode;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
@Component
public class DemoConsumer {
@RabbitListener(queues = RabbitConfig.DEMO_QUEUE, ackMode = "MANUAL")
public void processMessage(Message message, Channel channel) throws Exception {
try {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("Received message: " + msg);
// 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 处理异常,拒绝消息或重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 重新入队
throw e;
}
}
}
6.监控与维护
使用 RabbitMQ 管理界面监控队列状态
配置 Prometheus + Grafana 实现可视化监控
设置警报规则,及时发现消息堆积等问题
定期检查 RabbitMQ 日志(默认位于 /var/log/rabbitmq/)
评论区