Kafka(1)—消息队列
Kafka主要作用于三个领域:消息队列、存储和持续处理大型数据流、实时流平台
作为消息队列,Kafka允许发布和订阅数据,这点和其他消息队列类似,但不同的是,Kafka作为一个分布式系统,是以集群的方式运行的,可以自由伸缩。同时还提供了数据传递保证—可复制、持久化等。
Kafka可以存储和持续处理大型数据流,并保持持续性的低延迟。就这点上,可以看成一个实时版的Hadoop。
Kafka的低延迟特点更适合用在核心的业务应用上,当业务事件发生时,Kafka能够及时对这些事件作出响应。
Kafka其实是一个面向实时数据的流平台,也就是它不仅可以将现有的应用程序和数据系统连接起来,它还能用于加强这些触发相同数据流的应用。
Kafka的使用
Kafka的安装就不特别说明了,唯一需要注意的是安装Kafka之前需要先安装它的依赖医用 — zookeeper,它是一个分布式的应用程序协调服务。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
添加以下配置:
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers={服务器公网IP地址}:9093
#=============== 生产者配置=======================
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#===============消费者配置=======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
这样我们就完成了SpringBoot的装配。
但如何使用Kafka呢?首先我们要先了解Kafka的发布订阅消息系统。
Kafka消息订阅的前提是需要一个主题(topic),这点与之前的RabbitMQ不同。每个消息都有一个明确的topic来筛选消息的订阅者,topic可以在生产时进行设置。除了主题,最重要的就是需要一个消息的内容了。
在Java中Kafka消息用类ProducerRecord<K,V>
表示。这里的V
指的就是消息的内容,而K
不是主题,可以将其当做消息的附加信息,因此,一个消息体的结构大致为:
内容序列化
为了网络传输,我们通常需要将内容进行序列化,Kafka也是如此,需要分别将Key和Value进行序列化。我们在之前的配置内就能看到:
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
//#1. key序列化
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
//#2. value序列化
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
可以看出,key和value都用的字符串序列化方式。同样也约定了我们的消息体,应该ProducerRecord<String, String>
类型。
当然Kafka还提供了整数和字节数组序列化器,甚至还提供了自定义序列化器作为拓展方案。
加入了序列化器,我们的消息流程就变成了:
主题分区
接下来,我们需要考虑,对于消息Kafka应该用什么数据结构存储呢?
消息需要满足先入先出的规则,所以最好使用队列进行存储,因此我们称其消息队列,但Kafka是为了应对大量数据,大批消息而设计的,简单的队列模型显然不支持这么大的并发,我们需要系统支持横向拓展能力。
因此,Kafka提出了分区(Partition)的概念,每个分区都是一个队列,每个消息会按照一定的规则放置在某个分区中。
当消息通过序列化器到达分区器时,系统会先根据Topic寻找对应的主题区域,再通过规则找到对应主题下的分区。
在默认情况下,消息会被随机发送到主题内各个可用的分区上,并且通过算法保证分区消息量均衡
如果消息体里有Key,则会根据Key的哈希值找到某个固定的分区,也就是说如果key相同,分区也相同。
了解这些,我们就能去发送消息了:
@Controller
@RequestMapping("kafka")
public class TestController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//发送消息方法
@RequestMapping("/send")
@ResponseBody
public String send() {
// key不为null
kafkaTemplate.send("topic","key", "value");
// key为null
kafkaTemplate.send("topic", "value");
return "success";
}
}
和Redis类似,Spring也提供了Kafka的客户端来自动连接Kafka,并且约定消息体类型。
需要注意的就是,消息体类型需要和配置的序列化器相对应:
消费消息
正如其他消息队列一样,存在生产者就存在消费者,Kafka也存在自己的消费者 — KafkaConsumer
对于消费者,Kafka也提供了横向扩展的能力,就像多个生产者可以向同一个主题写入消息一样,多个消费者也可以从同一个主题读取消息。
这就存在一个概念—消费者组
一个消费者组里的消费者订阅同一个主题,每个消费者接受主题的一部分分区的消息。
这就存在几个例子:
案例1:单消费者
如果一个消费者组只有一个消费者,它将消费这个主题下所有的分区消息:
案例2:多消费者
如果一个消费者组有多个消费者(但不超过分区数量),它将均衡分流所有分区的消息:
如果消费者数量和分区数量相同,每个消费者接受一个分区的消息:
注意的是,一条消息只会被同组消费一次,不会在同一个消费者组里重复消费,具有排他性。
已经收到消息的消费者重启计算机后,也不会再次接受同一条消息。
案例3:超消费者
如果消费者数量大于分区数量,那么一部分消费者将闲置,不会接受任何消息:
案例4:多消费者组
如果我们存在多个消费者组,订阅了同样的主题,会怎么样呢?
多个消费者组将会分别消费这个消息,即一个消息都会通知每个消费者组。
实现消息消费,与生产类似,首先需要指定反序列化器:
// 配置消费者组
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
// 配置消息体Key反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
// 配置消息体Value反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
实现代码:
@Component
public class KafkaReceiver {
// #1. 监听主题为topic的消息
@KafkaListener(topics = {"topic"})
public void listen(ConsumerRecord<?, ?> record) {
// #2. 如果消息存在
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
// #3. 获取消息
Object message = kafkaMessage.get();
System.out.println("message =" + message);
}
}
}
其中Optional
是 Java8 的工具类,主要用于解决空指针异常的问题。它提供很多有用的方法,这样我们就不用显式进行空值检测。这里主要用到三个常用的方法,以判断消息是否存在,如果存在则取出消息值。
一些注意点:
kafkaTemplate.send()
是一个异步的发送方法,大多数情况下应该不会阻塞主线程),但实际上某些情况下仍然会出现阻塞主线程的情况。
如第一次发送消息时,需要连接Kafka,并且创建topic,这个过程是阻塞的