Message Queue 初探

消息队列(MQ),平时经常遇到的中间件技术。个人工作中已经使用的RabbitMQ,最近开始使用Kafka,因此也打算把ActiveMQ实验一下。
所以本文章将围绕这三个MQ产品进行相应的实验。

创建实验工程

三个MQ实验工程地址

  • 工程使用spring boot创建,所有实现都将基于spring全家桶,已经很完毕了,拿来直接用吧。
  • 为了后续实验方便,分别多个mq的producer和consumer工程
  • 使用gradle管理项目结构
  • 使用docker搭建测试所需要的服务器,文件可以参考,实际部署环境有所调整

Kafka

首先实验了Kafka的生产和消费基本代码
procuder:

1
this.kafkaTemplate.send(topic, message);

consumer:

1
@KafkaListener(topics = "${app.topic}")

运行说明

  1. 使用三个kafka broker 节点,使用docker-compose对应的service实现
    ![](/images/post/20181016/kafka brokers.png)

  2. 创建topic,使用三个分区和两个副本
    ![](/images/post/20181016/kafka topic.png)

  3. 使用idea,运行工程,配置多个运行实例
    ![](/images/post/20181016/kafka run dashboard.png)

特性总结

  • 一个分区对应一个消费者
  • 一个分区内消息顺序消费
  • 分区和和服务器应该成倍数关系,保证分区均匀分布
  • 副本数量应该小于服务器数量,当可用分区失效时,从副本中选出leader,成为新的可用分区
  • 节点高可用性,消费服务瞬间切换
  • 保存近期所有数据,通过offset可以获得任意位置的消息
  • 是否允许自动创建topic需要在kafka中配置

ACK MODE 和 Commit

需要注意ack mode和 commit,此项影响数据的刷盘机制。根据实际情况选择

  • RECORD 逐笔ack并提交
  • BATCH 一个poll周期进行ack提交
  • TIME 通过设置ackTime定时提交
  • COUNT 通过设置ackCount累计数量提交
  • COUNT_TIME 同上组合,哪个复合执行哪个
  • MANUAL 手动方式生成ack,批量提交
  • MANUAL_IMMEDIATE 手动方式生成ack并立即提交

广播与点对点

广播有时也叫pub和sub,就是一个topic,多个订阅者。每个订阅者都会收到相同的消息,消息被多次消费。
点对点,一个topic中的消息对应一个消费者,消息只会被消费一次

在Kafka的系统中,如何区分以上两种方式,是通过consumer group实现的。

多个消费者在一个consumer group中,那么他们就是一个消费整体,消息只会被一个具体的消费者消费一次
如果想多个同时消费,那么需要多个consumer group。
这样来看consumer group需要与具体服务对应,一般一个独立的服务需要消费一次消息。

总的来说,体会到了大家喜欢的它原因,具体一些实验后续将深入。
建议参考阅读

ActiveMQ

每个MQ中的术语大同小异,但又十分不统一,随着遵循的协议不一样,让我们看看ActiveMQ,它使用JMS协议

Queue 和 Topic

Topics
In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
![](/images/post/20181016/jms topic.png)

Queues
A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.
![](/images/post/20181016/jms queue.png)

这里的 topics 类似kafka中的广播,pub/sub 模式,queue是队列是点对点模式。
他的主要区别是,消费方式的确定不是在consumer设置,而是在MQ Server中设置
消费性质上同上:

  • topic的消息会多个消费者同时消费,但并不做消息堆积,没有之前的消息
  • queue消息只有一个消费者消费,并做消息缓存堆积,直到消费为止
  • 同时queue的消费者是负责均衡的会分摊消息队列的中的数据,但是不能保证按顺序执行
    ![](/images/post/20181016/activemq consumer.png)

关键代码

消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Bean
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}

@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true);
return factory;
}

通过spring boot 配置文件可以指明消费方式,是topic还是queue,但是只能只一种。
为了同时支持两种方式,我么需要声明JmsListenerContainerFactory。是否开启pub sub domain,决定是queue还是topic

1
factory.setPubSubDomain(true);

同时配合JmsListener注解,就可以指明消费方式了。如果指明的消费方式与ActiveMQ所创建的queue或topic不一致,则消息不能正常消费。

1
@JmsListener(destination = "${app.queue}", containerFactory = "queueListenerFactory")

生产端

1
2
3
4
5
6
7
8
9
@Bean
public Queue queue() {
return new ActiveMQQueue(queueName);
}

@Bean
public Topic topic() {
return new ActiveMQTopic(topicName);
}

通过不通类型指明是topic还是queue

运行说明

同样创建多个运行实例,来观察消费情况
![](/images/post/20181016/activemq run dashboard.png)

观察ActiveMQ管理后台,你可以看到相关的配置信息
![](/images/post/20181016/activemq queue.png)
![](/images/post/20181016/activemq topic.png)

特性总结

  • 简单,两种消费模式非常清晰,没有复杂的系统结构
  • 自带管理后台还算好用

还需要深入体会其一些参数设置,初步感觉中规中矩没有什么好说的。😜

RabbitMQ

最后说熟悉的RabbitMQ吧。
我很喜欢RabbitMQ,功能上丰富,性能不错,相比ActiveMQ,我更喜欢RabbitMQ吧。当然具体使用,还是要分应用场景。

基本概念

RabbitMQ中有几个主要概念,和其它MQ的定义有些区别,其为AMQP协议

  • Exchange 交换机,用于消息的分发,如果不指定则会使用默认的
  • Queue 队列,实际缓存数据的消息队列,可对应多个消费者
  • Topic 主题,消息可以有主题,用于exchange分配消息是的判断依据
  • virtual host 虚拟路径,可以创建虚拟路径将相关配置分开并设定特定用户用于访问控制

![](/images/post/20181016/exchanges topic fanout direct.png)

消费方式

RabiitMQ的消费方式和灵活度上应该是这三者中最好的,并且官方给出了详细的实例说明,超有爱❤️
当然由于模式多,它也就是最复杂的。官方给出的六个实例,这里简单说一下,具体代码看原址

  • Hello World 最普通的生产消费代码
  • Work Queue 默认exchange,工作队列,一个队列被多个消费者消费,且一个消息只消费一次
  • Publish/Subscribe 发布订阅,通过exchange Fanout模式进行队列分发,实现一条消息可以被多个消费者消费
  • Routing 路由方式,通过exchange和routing key指定消息路由到具体的队列别特定的消费者消费
  • Topics 主题方式,通过exchange和topic比配规则指定消息路由到具体的队列别特定的消费者消费
  • RPC 通过消息队列实现RPC调用过程

Exchange 方式:

  1. direct exchange 直接点对点
  2. fanout exchange 全体广播
  3. topic exchange 主题广播
  4. headers exchange 还没有用过
  5. system exchange 还没有用过

关键代码

生产端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 声明queue
@Bean
public Queue queue() {
return new Queue(queueName);
}
// 声明fanout exchange
@Bean
public FanoutExchange exchange() {
return new FanoutExchange(broadcast, false, true);
}
// 声明 queue与exchange的绑定关系
@Bean
Binding binding1(Queue queueBroadone, FanoutExchange exchange) {
return BindingBuilder.bind(queueBroadone).to(exchange);
}

消费端

1
2
3
4
@RabbitListener(queues = "${app.broadone}")
public void recevieOne(String message) {
log.info("consumer receive broadcast one: {}", message);
}

运行说明

这里我只做消息在多消费者下顺序消费观察和广播观察,其它实例参考官网代码吧,没有必要重复了。

一个生产者、一个queue、多个消费者,和ActiveMQ queue方式,观察效果一致:

  • 多个消费者的消息分摊,如果每条消费执行时间差异较大,消费顺序不保证一致
    ![](/images/post/20181016/rabbitmq consumer.png)
  • 消费退出,另一个消费者会接替未完成的消息工作
  • 广播消息默认堆积,其本质还是消费队列,不能多个消费者消费一个queue实现pub/sub

同样创建多个运行实例,来观察消费情况
![](/images/post/20181016/rabbitmq run dashboard.png)

管理后台可以看到的信息
![](/images/post/20181016/rabbitmq queue.png)
![](/images/post/20181016/rabbitmq exchange.png)

特性总结

  • 功能多,使用灵活
  • AMQP,性能不错
  • 管理后台方便

实验总结

  • 如果你需要消息顺序消费、分布式存储的高可用性,指定消息位置再次消费,较高的吞吐 —— Kafka
  • 如果你要消息正常消费,无顺序要求、无特殊路由要求。广播消费时,多个消费者可以监听一个queue的方式完成 —— ActiveMQ
  • 如果你要适合多种消息根据自定义规则(通配符等)路由道不同的消息队列,或兼容不通场景,希望广播消息有堆积不丢失 —— RabbitMQ

以上MQ都还有很多配置参数应对不通的应用需求,可能通过配置实现默认不支持的功能,具体使用情况,如果以后遇到我再单独说明。