消息队列(MQ),平时经常遇到的中间件技术。个人工作中已经使用的RabbitMQ,最近开始使用Kafka,因此也打算把ActiveMQ实验一下。
所以本文章将围绕这三个MQ产品进行相应的实验。
创建实验工程
三个MQ实验工程地址
- 工程使用spring boot创建,所有实现都将基于spring全家桶,已经很完毕了,拿来直接用吧。
- 为了后续实验方便,分别多个mq的producer和consumer工程
- 使用gradle管理项目结构
- 使用docker搭建测试所需要的服务器,文件可以参考,实际部署环境有所调整
Kafka
首先实验了Kafka的生产和消费基本代码
procuder:
1 | this.kafkaTemplate.send(topic, message); |
consumer:
1 | "${app.topic}") (topics = |
运行说明
使用三个kafka broker 节点,使用docker-compose对应的service实现
![](/images/post/20181016/kafka brokers.png)创建topic,使用三个分区和两个副本
![](/images/post/20181016/kafka topic.png)使用idea,运行工程,配置多个运行实例
![](/images/post/20181016/kafka run dashboard.png)
特性总结
- 一个分区对应一个消费者
- 一个分区内消息顺序消费
- 分区和和服务器应该成倍数关系,保证分区均匀分布
- 副本数量应该小于服务器数量,当可用分区失效时,从副本中选出leader,成为新的可用分区
- 节点高可用性,消费服务瞬间切换
- 保存近期所有数据,通过offset可以获得任意位置的消息
- 是否允许自动创建topic需要在kafka中配置
ACK MODE 和 Commit
需要注意ack mode和 commit,此项影响数据的刷盘机制。根据实际情况选择
- RECORD 逐笔ack并提交
- BATCH 一个poll周期进行ack提交
- TIME 通过设置ackTime定时提交
- COUNT 通过设置ackCount累计数量提交
- COUNT_TIME 同上组合,哪个复合执行哪个
- MANUAL 手动方式生成ack,批量提交
- MANUAL_IMMEDIATE 手动方式生成ack并立即提交
广播与点对点
广播有时也叫pub和sub,就是一个topic,多个订阅者。每个订阅者都会收到相同的消息,消息被多次消费。
点对点,一个topic中的消息对应一个消费者,消息只会被消费一次
在Kafka的系统中,如何区分以上两种方式,是通过consumer group实现的。
多个消费者在一个consumer group中,那么他们就是一个消费整体,消息只会被一个具体的消费者消费一次
如果想多个同时消费,那么需要多个consumer group。
这样来看consumer group需要与具体服务对应,一般一个独立的服务需要消费一次消息。
总的来说,体会到了大家喜欢的它原因,具体一些实验后续将深入。
建议参考阅读
ActiveMQ
每个MQ中的术语大同小异,但又十分不统一,随着遵循的协议不一样,让我们看看ActiveMQ,它使用JMS协议
Queue 和 Topic
Topics
In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
![](/images/post/20181016/jms topic.png)
Queues
A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.
![](/images/post/20181016/jms queue.png)
这里的 topics 类似kafka中的广播,pub/sub 模式,queue是队列是点对点模式。
他的主要区别是,消费方式的确定不是在consumer设置,而是在MQ Server中设置
消费性质上同上:
- topic的消息会多个消费者同时消费,但并不做消息堆积,没有之前的消息
- queue消息只有一个消费者消费,并做消息缓存堆积,直到消费为止
- 同时queue的消费者是负责均衡的会分摊消息队列的中的数据,但是不能保证按顺序执行
![](/images/post/20181016/activemq consumer.png)
关键代码
消费端
1 |
|
通过spring boot 配置文件可以指明消费方式,是topic还是queue,但是只能只一种。
为了同时支持两种方式,我么需要声明JmsListenerContainerFactory。是否开启pub sub domain,决定是queue还是topic
1 | factory.setPubSubDomain(true); |
同时配合JmsListener注解,就可以指明消费方式了。如果指明的消费方式与ActiveMQ所创建的queue或topic不一致,则消息不能正常消费。
1 | "${app.queue}", containerFactory = "queueListenerFactory") (destination = |
生产端
1 |
|
通过不通类型指明是topic还是queue
运行说明
同样创建多个运行实例,来观察消费情况
![](/images/post/20181016/activemq run dashboard.png)
观察ActiveMQ管理后台,你可以看到相关的配置信息
![](/images/post/20181016/activemq queue.png)
![](/images/post/20181016/activemq topic.png)
特性总结
- 简单,两种消费模式非常清晰,没有复杂的系统结构
- 自带管理后台还算好用
还需要深入体会其一些参数设置,初步感觉中规中矩没有什么好说的。😜
RabbitMQ
最后说熟悉的RabbitMQ吧。
我很喜欢RabbitMQ,功能上丰富,性能不错,相比ActiveMQ,我更喜欢RabbitMQ吧。当然具体使用,还是要分应用场景。
基本概念
RabbitMQ中有几个主要概念,和其它MQ的定义有些区别,其为AMQP协议
- Exchange 交换机,用于消息的分发,如果不指定则会使用默认的
- Queue 队列,实际缓存数据的消息队列,可对应多个消费者
- Topic 主题,消息可以有主题,用于exchange分配消息是的判断依据
- virtual host 虚拟路径,可以创建虚拟路径将相关配置分开并设定特定用户用于访问控制
![](/images/post/20181016/exchanges topic fanout direct.png)
消费方式
RabiitMQ的消费方式和灵活度上应该是这三者中最好的,并且官方给出了详细的实例说明,超有爱❤️
当然由于模式多,它也就是最复杂的。官方给出的六个实例,这里简单说一下,具体代码看原址
- Hello World 最普通的生产消费代码
- Work Queue 默认exchange,工作队列,一个队列被多个消费者消费,且一个消息只消费一次
- Publish/Subscribe 发布订阅,通过exchange Fanout模式进行队列分发,实现一条消息可以被多个消费者消费
- Routing 路由方式,通过exchange和routing key指定消息路由到具体的队列别特定的消费者消费
- Topics 主题方式,通过exchange和topic比配规则指定消息路由到具体的队列别特定的消费者消费
- RPC 通过消息队列实现RPC调用过程
Exchange 方式:
- direct exchange 直接点对点
- fanout exchange 全体广播
- topic exchange 主题广播
- headers exchange 还没有用过
- system exchange 还没有用过
关键代码
生产端
1 | // 声明queue |
消费端
1 | "${app.broadone}") (queues = |
运行说明
这里我只做消息在多消费者下顺序消费观察和广播观察,其它实例参考官网代码吧,没有必要重复了。
一个生产者、一个queue、多个消费者,和ActiveMQ queue方式,观察效果一致:
- 多个消费者的消息分摊,如果每条消费执行时间差异较大,消费顺序不保证一致
![](/images/post/20181016/rabbitmq consumer.png) - 消费退出,另一个消费者会接替未完成的消息工作
- 广播消息默认堆积,其本质还是消费队列,不能多个消费者消费一个queue实现pub/sub
同样创建多个运行实例,来观察消费情况
![](/images/post/20181016/rabbitmq run dashboard.png)
管理后台可以看到的信息
![](/images/post/20181016/rabbitmq queue.png)
![](/images/post/20181016/rabbitmq exchange.png)
特性总结
- 功能多,使用灵活
- AMQP,性能不错
- 管理后台方便
实验总结
- 如果你需要消息顺序消费、分布式存储的高可用性,指定消息位置再次消费,较高的吞吐 —— Kafka
- 如果你要消息正常消费,无顺序要求、无特殊路由要求。广播消费时,多个消费者可以监听一个queue的方式完成 —— ActiveMQ
- 如果你要适合多种消息根据自定义规则(通配符等)路由道不同的消息队列,或兼容不通场景,希望广播消息有堆积不丢失 —— RabbitMQ
以上MQ都还有很多配置参数应对不通的应用需求,可能通过配置实现默认不支持的功能,具体使用情况,如果以后遇到我再单独说明。