Kafka(1)—消息队列

Kafka主要作用于三个领域:消息队列、存储和持续处理大型数据流、实时流平台

作为消息队列,Kafka允许发布和订阅数据,这点和其他消息队列类似,但不同的是,Kafka作为一个分布式系统,是以集群的方式运行的,可以自由伸缩。同时还提供了数据传递保证—可复制、持久化等。

Kafka可以存储和持续处理大型数据流,并保持持续性的低延迟。就这点上,可以看成一个实时版的Hadoop。

Kafka的低延迟特点更适合用在核心的业务应用上,当业务事件发生时,Kafka能够及时对这些事件作出响应。

Kafka其实是一个面向实时数据的流平台,也就是它不仅可以将现有的应用程序和数据系统连接起来,它还能用于加强这些触发相同数据流的应用。

Kafka的使用

Kafka的安装就不特别说明了,唯一需要注意的是安装Kafka之前需要先安装它的依赖医用 — zookeeper,它是一个分布式的应用程序协调服务。

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

添加以下配置:

#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers={服务器公网IP地址}:9093

#=============== 生产者配置=======================

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#===============消费者配置=======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

这样我们就完成了SpringBoot的装配。

但如何使用Kafka呢?首先我们要先了解Kafka的发布订阅消息系统。

Kafka消息订阅的前提是需要一个主题(topic),这点与之前的RabbitMQ不同。每个消息都有一个明确的topic来筛选消息的订阅者,topic可以在生产时进行设置。除了主题,最重要的就是需要一个消息的内容了。

在Java中Kafka消息用类ProducerRecord<K,V>表示。这里的V指的就是消息的内容,而K不是主题,可以将其当做消息的附加信息,因此,一个消息体的结构大致为:

内容序列化

为了网络传输,我们通常需要将内容进行序列化,Kafka也是如此,需要分别将Key和Value进行序列化。我们在之前的配置内就能看到:

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
//#1. key序列化
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
//#2. value序列化
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

可以看出,key和value都用的字符串序列化方式。同样也约定了我们的消息体,应该ProducerRecord<String, String>类型。

当然Kafka还提供了整数字节数组序列化器,甚至还提供了自定义序列化器作为拓展方案。

加入了序列化器,我们的消息流程就变成了:

主题分区

接下来,我们需要考虑,对于消息Kafka应该用什么数据结构存储呢?

消息需要满足先入先出的规则,所以最好使用队列进行存储,因此我们称其消息队列,但Kafka是为了应对大量数据,大批消息而设计的,简单的队列模型显然不支持这么大的并发,我们需要系统支持横向拓展能力。

因此,Kafka提出了分区(Partition)的概念,每个分区都是一个队列,每个消息会按照一定的规则放置在某个分区中。

当消息通过序列化器到达分区器时,系统会先根据Topic寻找对应的主题区域,再通过规则找到对应主题下的分区。

在默认情况下,消息会被随机发送到主题内各个可用的分区上,并且通过算法保证分区消息量均衡

如果消息体里有Key,则会根据Key的哈希值找到某个固定的分区,也就是说如果key相同,分区也相同。

了解这些,我们就能去发送消息了:

@Controller
@RequestMapping("kafka")
public class TestController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    //发送消息方法
    @RequestMapping("/send")
    @ResponseBody
    public String send() {
        // key不为null
        kafkaTemplate.send("topic","key", "value");
        // key为null
        kafkaTemplate.send("topic", "value");
        return "success";
    }
}

和Redis类似,Spring也提供了Kafka的客户端来自动连接Kafka,并且约定消息体类型。

需要注意的就是,消息体类型需要和配置的序列化器相对应:

消费消息

正如其他消息队列一样,存在生产者就存在消费者,Kafka也存在自己的消费者 — KafkaConsumer

对于消费者,Kafka也提供了横向扩展的能力,就像多个生产者可以向同一个主题写入消息一样,多个消费者也可以从同一个主题读取消息。

这就存在一个概念—消费者组

一个消费者组里的消费者订阅同一个主题,每个消费者接受主题的一部分分区的消息。

这就存在几个例子:

案例1:单消费者

如果一个消费者组只有一个消费者,它将消费这个主题下所有的分区消息:

案例2:多消费者

如果一个消费者组有多个消费者(但不超过分区数量),它将均衡分流所有分区的消息:

如果消费者数量和分区数量相同,每个消费者接受一个分区的消息:

注意的是,一条消息只会被同组消费一次,不会在同一个消费者组里重复消费,具有排他性。

已经收到消息的消费者重启计算机后,也不会再次接受同一条消息。

案例3:超消费者

如果消费者数量大于分区数量,那么一部分消费者将闲置,不会接受任何消息:

案例4:多消费者组

如果我们存在多个消费者组,订阅了同样的主题,会怎么样呢?

多个消费者组将会分别消费这个消息,即一个消息都会通知每个消费者组。

实现消息消费,与生产类似,首先需要指定反序列化器:

// 配置消费者组
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
// 配置消息体Key反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
// 配置消息体Value反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

实现代码:

@Component
public class KafkaReceiver {

  // #1. 监听主题为topic的消息
  @KafkaListener(topics = {"topic"})
  public void listen(ConsumerRecord<?, ?> record) {
    // #2. 如果消息存在
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
      // #3. 获取消息
      Object message = kafkaMessage.get();
      System.out.println("message =" + message);
    }

  }
}

其中Optional 是 Java8 的工具类,主要用于解决空指针异常的问题。它提供很多有用的方法,这样我们就不用显式进行空值检测。这里主要用到三个常用的方法,以判断消息是否存在,如果存在则取出消息值。

一些注意点:

  • kafkaTemplate.send() 是一个异步的发送方法,大多数情况下应该不会阻塞主线程),但实际上某些情况下仍然会出现阻塞主线程的情况。
    如第一次发送消息时,需要连接Kafka,并且创建topic,这个过程是阻塞的