SpringBoot使用RabbitMQ RabbitMQ 是
消息队列
实现AMQP(高级消息队列协议Advanced Message Queuing Protocol)的消息中间件的一种
作用:主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。
流程:
一般消息队列都是生产者将消息发送到队列,消费者监听队列进行消费。
rabbitmq中一个虚拟主机(vhost默认 /)持有一个或者多个交换机(Exchange)。 用户只能在虚拟主机的粒度进行权限控制,交换机根据一定的策略(RoutingKey)绑定(Binding)到队列(Queue)上, 这样生产者和队列就没有直接联系,而是将消息发送的交换机,交换机再把消息转发到对应绑定的队列上。
交换机(Exchange)为rabbitmq
独特的概念,用到的最常见的是4中类型:
Direct: 先匹配, 再投送。即在绑定时设定一个routing_key, 消息的routing_key匹配时, 才会被交换器投送到绑定的队列中去. 交换机跟队列必须是精确的对应关系,这种最为简单。
Topic: 转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息 这种可以认为是Direct 的灵活版
Headers: 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routingkey , headers则是一个自定义匹配规则的类型, 在队列与交换器绑定时会设定一组键值对规则,消息中也包括一组键值对( headers属性),当这些键值对有一对或全部匹配时,消息被投送到对应队列
Fanout : 消息广播模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routingkey会被忽略
举例说明 创建 2 个交换机directExchange
、fanoutExchange
,3个 队列 queueA
、 queueB
、 queueC
。
队列directExchange
作为定点发送,包含队列 A B
队列fanoutExchange
作为广播发送,包含队列 A B C
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 @Configuration public class RabbitConfig { @Bean("directExchange") public Exchange directExchange () { return ExchangeBuilder.directExchange("DIRECT_EXCHANGE" ).durable(true ).build(); } @Bean("fanoutExchange") public FanoutExchange fanoutExchange () { return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE" ).durable(true ).build(); } @Bean("queueA") public Queue directQueue () { return QueueBuilder.durable("QUEUE_A" ).build(); } @Bean("queueB") public Queue directQueue2 () { return QueueBuilder.durable("QUEUE_B" ).build(); } @Bean("queueC") public Queue directQueue3 () { return QueueBuilder.durable("QUEUE_C" ).build(); } @Bean public Binding bindingA (@Qualifier("queueA") Queue queue, @Qualifier("directExchange") Exchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with("DIRECT_ROUTING_KEY_A" ).noargs(); } @Bean public Binding bindingB (@Qualifier("queueB") Queue queue, @Qualifier("directExchange") Exchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with("DIRECT_ROUTING_KEY_B" ).noargs(); } @Bean public Binding bindingA1 (@Qualifier("queueA") Queue queue, @Qualifier("fanoutExchange") Exchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange).with("FANOUT_ROUTING_KEY_A" ).noargs(); } @Bean public Binding bindingA2 (@Qualifier("queueB") Queue queue, @Qualifier("fanoutExchange") Exchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange).with("FANOUT_ROUTING_KEY_B" ).noargs(); } @Bean public Binding bindingA3 (@Qualifier("queueC") Queue queue, @Qualifier("fanoutExchange") Exchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange).with("FANOUT_ROUTING_KEY_C" ).noargs(); } }
消息发送类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Service public class SenderService { private Logger logger = LoggerFactory.getLogger(this .getClass()); @Resource private RabbitTemplate rabbitTemplate; public void broadcast (String p) { CorrelationData correlationData = new CorrelationData (UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("FANOUT_EXCHANGE" , "" , p, correlationData); } public void directA (String p) { CorrelationData correlationData = new CorrelationData (UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_EXCHANGE" , "DIRECT_ROUTING_KEY_A" , p, correlationData); } public void directB (String p) { CorrelationData correlationData = new CorrelationData (UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_EXCHANGE" , "DIRECT_ROUTING_KEY_B" , p, correlationData); } public void directNull (String p) { CorrelationData correlationData = new CorrelationData (UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_EXCHANGE" , "" , p, correlationData); } }
消息接收类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Component public class Receiver { private static final Logger log = LoggerFactory.getLogger(Receiver.class); @RabbitListener(queues = {"QUEUE_A"}) public void on (Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true ); log.debug("FANOUT_QUEUE_A " + new String (message.getBody())); } @RabbitListener(queues = {"QUEUE_B"}) public void t (Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true ); log.debug("FANOUT_QUEUE_B " + new String (message.getBody())); } @RabbitListener(queues = {"QUEUE_C"}) public void t (Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true ); log.debug("FANOUT_QUEUE_C " + new String (message.getBody())); } @RabbitListener(queues = {"DIRECT_QUEUE"}) public void message (Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true ); log.debug("DIRECT_QUEUE " + new String (message.getBody())); } }
测试类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class SenderServiceTest { @Autowired private SenderService senderService; @Test public void testCache () throws InterruptedException { senderService.broadcast(" Test 同学们集合啦!" ); senderService.directA(" Test 定点消息 A " ); senderService.directB(" Test 定点消息 B " ); senderService.directNull(" Test 定点消息 null key " ); Thread.sleep(5000L ); } }
结果 1 2 3 4 5 DIRECT_QUEUE_A " Test 同学们集合啦!" FANOUT_QUEUE_B " Test 同学们集合啦!" FANOUT_QUEUE_C " Test 同学们集合啦!" DIRECT_QUEUE_A" Test 定点消息 A " DIRECT_QUEUE_B" Test 定点消息 B "
null key的并没有出现,所以在 directExchange
中没有可以广播的队列(都绑定了routingkey)。
Maven 依赖 1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 spring: rabbitmq: host: 127.0 .0 .1 port: 5672 username: maxzhao password: maxzhao publisher-confirms: true publisher-returns: true virtual-host: maxzhao_vhost listener: simple: acknowledge-mode: manual concurrency: 1 max-concurrency: 1 retry: enabled: true
JMS介绍和使用场景 简介: 讲解什么是消息队列,JMS的基础知识和使用场景
什么是JMS: Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口
JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API
使用场景:
跨平台
多语言
多项目
解耦
分布式事务
流量控制
最终一致性
RPC调用
上下游对接,数据源变动->通知下属
概念
JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
JMS生产者(Message Producer)
JMS消费者(Message Consumer)
JMS消息
JMS队列
JMS主题
JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe
术语
Broker - 简单来说就是消息队列服务器的实体。
Exchange - 消息路由器,转发消息到绑定的队列上,指定消息按什么规则,路由到哪个队列。
Queue - 消息队列,用来存储消息,每个消息都会被投入到一个或多个队列。
Binding - 绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。
RoutingKey - 路由关键字,Exchange 根据这个关键字进行消息投递。
Producter - 消息生产者,产生消息的程序。
Consumer - 消息消费者,接收消息的程序。
Channel - 消息通道,在客户端的每个连接里可建立多个Channel,每个channel代表一个会话。
编程模型
ConnectionFactory :连接工厂,JMS 用它创建连接
Connection :JMS 客户端到JMS Provider 的连接
Session: 一个发送或接收消息的线程
Destination :消息的目的地;消息发送给谁.
MessageConsumer / MessageProducer: 消息接收者,消费者
RabbitMQ RabbitMQ是一个出色的消息代理中间件(Message Broker):接受和转发消息。你可以将它看作是一个邮局, 你把自己的信件写上收件人地址,然后放到邮筒里面就不用管了,由邮局负责将这个信件送到目的地。来自
一、安装 我这里使用的是 ArchLInux
1 2 sudo pacman -S rabbitmq
RPM 安装 1 2 3 4 5 6 7 8 9 10 11 12 13 rpm -Uvh https://mirrors.ustc.edu.cn/epel/7/x86_64/Packages/e/epel-release-7-11.noarch.rpm yum install epel-release yum install erlang rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm ll yum install rabbitmq-server-3.6.15-1.el7.noarch.rpm
安装web管理界面 1 2 3 sudo rabbitmq-plugins enable rabbitmq_management sudo rabbitmq-plugins list
启动 1 2 3 4 5 6 systemctl enable rabbitmq-server systemctl start rabbitmq-server systemctl restart rabbitmq-server
配置 1 vim /etc/rabbitmq/rabbitmq.config
配置端口,备注:消息端口5672
,则web访问端口为 15672
1 2 3 4 5 6 7 8 [ { rabbit, [ { loopback_users, [ ] } , { tcp_listeners, [ 5672 ] } ] } ]
用户管理 1 2 3 4 5 6 7 8 9 10 11 12 sudo rabbitmqctl list_users sudo rabbitmqctl change_password guest guest sudo rabbitmqctl add_user maxzhao maxzhao sudo rabbitmqctl set_user_tags maxzhao administrator sudo rabbitmqctl set_permissions -p / maxzhao ".*" ".*" ".*"
添加vhost 1 2 3 4 5 6 7 8 9 10 11 sudo rabbitmqctl --help sudo rabbitmqctl add_vhost --help sudo rabbitmqctl add_vhost maxzhao_vhost sudo rabbitmqctl list_vhosts sudo rabbitmqctl set_permissions -p /maxzhao_vhost maxzhao ".*" ".*" ".*"
删除 vhost 1 2 sudo rabbitmqctl add_vhost maxzhaoTest sudo rabbitmqctl delete_vhost maxzhaoTest
二、任务队列等引用
本文地址:SpringBoot使用RabbitMQ及RabbitMQ介绍
推荐
官方示例
下面2017年写的,但是比较全面的新手教材。
RabbitMQ简易教程 - 任务队列
RabbitMQ简易教程 - 发布订阅
RabbitMQ简易教程 - 路由
RabbitMQ简易教程 - 主题
RabbitMQ简易教程 - RPC
RabbitMQ简易教程 - WebSocket
RabbitMQ简易教程 - 并发调度
本文地址: https://github.com/maxzhao-it/blog/post/2922/