1.Kafka概述

官方网站:http://kafka.apache.org
kafka的目标是实现一个为处理实时数据提供一个统一、高吞吐、低延迟的平台。是分布式发布-订阅消息系统,是一个分布式的,可划分的,冗余备份的持久性的日志服务

简单概念

  • Kafka作为一个集群运行在一个或多个可以跨多个数据中心的服务器上
  • Kafka集群以**主题(topic)**为类别进行数据存储
  • 每条数据都是由一个键、一个值和一个时间戳组成

2.环境搭建

2.1 预备环境

2.2 启动

解压缩成功后就可以启动Kafka服务器了。不过在此之前,首先需要启动Zookeeper服务器(Zookeeper为Kafka提供协调服务的工具)。Kafka内置了一个Zookeeper服务器以及一组相关的管理脚本

启动Zookeeper

1
bin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafaka

1
bin/kafka-server-start.sh config/server.properties

创建topic

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1

查看topic列表

1
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看topic状态

1
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

输出

1
Topic:test	PartitionCount:1	ReplicationFactor:1

topic名为test,分区数为1,副本数为1

2.3 发送消息

Kafka默认提供了脚本工具可以不断接收标准输入,并将它们发送到Kafka的某个topic

用户在控制台终端下启动该命令,输入一行文本数据,该脚本将该行文本封装成一条Kafka消息发送给指定的topic

生产消息

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

消费消息

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2.4 创建多broker集群

为每个broker创建独立的配置文件

1
2
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

修改配置

1
2
3
4
5
6
7
8
9
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

启动多个broker

1
2
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties

创建新的topic

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看topic状态

1
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

生产消息

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

消费消息

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

3.Kafka架构设计

3.1 Kafka设计目标

Kafka设计的初衷是为了解决互联网公司超大量级数据的实时传输。为了实现这个目标,Kafka在设计之初就需要考虑以下4个方面的问题

  • 吞吐量/延时
  • 消息持久化
  • 负载均衡和故障转移
  • 伸缩性

吞吐量/延时

吞吐量是某种处理能力的最大值。对于Kafka而言,它的吞吐量就是每秒能够处理的消息数或每秒能够处理的字节数。很显然,吞吐量越大越好

延时是衡量一段时间间隔,例如一次请求到响应之间的时间。对于Kafka而言,延时表示客户端发起请求与服务器处理请求并发送响应给客户端之间的这一段时间。显而易见,延时间隔越短越好

在实际使用场景下,这两个指标通常是一对矛盾体,即调优其中一个指标通常会使另一个指标变差

如何提高吞吐量,降低延时

Kafka的写入操作很快,这得益于它对磁盘的使用方法。Kafka会持久化所有数据到磁盘,但每次的写入操作实际都是把数据写入到操作系统的页缓存(page cache)中。由操作系统决定何时把页缓存中的数据写到磁盘上。

设计优势:

  • 页缓存是在内存中进行分配,写入操作非常快
  • 无需直接与底层文件系统打交道,由操作系统进行处理
  • 使用追加写入方式进行数据写入,避免磁盘随机写操作

普通的物理磁盘(非固态硬盘)而言,随机读/写确实很慢,但顺序读/写的操作其实非常快,它的速度甚至可以匹敌内存的随机读写速度。鉴于这一事实,Kafka在设计时采用追加写入消息的方式,即只能在日志文件末尾追加写入新的消息,且不允许修改已写入的消息

“零拷贝”

在写一个服务端程序时(Web Server或者文件服务器),文件下载是一个基本功能。这时候服务端的任务是:将服务端主机磁盘中的文件不做修改地从已连接的socket发出去,我们通常用下面的代码完成:

1
2
while((n = read(diskfd, buf, BUF_SIZE)) > 0)
write(sockfd, buf , n);

基本操作就是循环的从磁盘读入文件内容到缓冲区,再将缓冲区的内容发送到socket。但是由于Linux的I/O操作默认是缓冲I/O。这里面主要使用的也就是readwrite两个系统调用,我们并不知道操作系统在其中做了什么。实际上在以上I/O操作中,发生了多次的数据拷贝

当应用程序访问某块数据时,操作系统首先会检查,是不是最近访问过此文件,文件内容是否缓存在内核缓冲区,如果是,操作系统则直接根据read系统调用提供的buf地址,将内核缓冲区的内容拷贝到buf所指定的用户空间缓冲区中去。如果不是,操作系统则首先将磁盘上的数据拷贝的内核缓冲区,这一步目前主要依靠DMA来传输,然后再把内核缓冲区上的内容拷贝到用户缓冲区中。
接下来,write系统调用再把用户缓冲区的内容拷贝到网络堆栈相关的内核缓冲区中,最后socket再把内核缓冲区的内容发送到网卡上

从上图中可以看出,共产生了四次数据拷贝,即使使用了DMA来处理了与硬件的通讯,CPU仍然需要处理两次数据拷贝,与此同时,在用户态与内核态也发生了多次上下文切换,无疑也加重了CPU负担

在此过程中,我们没有对文件内容做任何修改,那么在内核空间和用户空间来回拷贝数据无疑就是一种浪费,而零拷贝主要就是为了解决这种低效性

零拷贝主要的任务就是避免CPU将数据从一块存储拷贝到另外一块存储,主要就是利用各种零拷贝技术,避免让CPU做大量的数据拷贝任务,减少不必要的拷贝,或者让别的组件来做这一类简单的数据传输任务,让CPU解脱出来专注于别的任务

使用mmap

我们减少拷贝次数的一种方法是调用mmap()来代替read调用

1
2
buf = mmap(diskfd, len);
write(sockfd, buf, len);

用程序调用mmap(),磁盘上的数据会通过DMA被拷贝到内核缓冲区,接着操作系统会把这段内核缓冲区与应用程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝。应用程序再调用write(),操作系统直接将内核缓冲区的内容拷贝到socket缓冲区中,这一切都发生在内核态,最后,socket缓冲区再把数据发到网卡去

使用sendfile

从2.1版内核开始,Linux引入了sendfile来简化操作

1
2
#include<sys/sendfile.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

系统调用sendfile()在代表输入文件的描述符in_fd和代表输出文件的描述符out_fd之间传送文件内容(字节)

使用sendfile不仅减少了数据拷贝的次数,还减少了上下文切换,数据传送始终只发生在kernel space

Kafka的消息消费机制使用的就是sendfile(),严格来说就是通过Java的FileChannel.transferTo方法实现

除了”零拷贝”技术,Kafka大量使用页缓存,因此读取消息时大部分消息很有可能依然保存在也缓存中。因此可以直接命中缓存,不用”穿透”到底层的物理磁盘上获取信息

总结,Kafka依靠下列4点达到高吞吐量、低延时的设计目标

  • 大量使用操作系统页缓存,内存操作速度快且命中率高
  • Kafka不直接参与物理I/O操作,交由操作系统完成
  • 采用追加写入方式,摒弃缓慢的磁盘随机读/写操作
  • 使用以sendfile为代表的零拷贝加强网络间的数据传输效率

消息持久化

Kafka对消息进行了持久化操作,这样做的好处如下

  • 解耦消息发送与消息消费
  • 实现灵活的消息处理

3.2 消息

Kafka的消息是由一个固定长度的消息头+可变长的key字节数组+可变长的vaue字节数组组成

消息头由以下几部分组成

  • 4个字节用于检测消息完整性的CRC校验和
  • 1个字节的版本号
  • 1个字节的消息属性
  • 8个字节的时间戳

完整的消息结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 1. 4 byte CRC32 of the message
* 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
* 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
* bit 0 ~ 2 : Compression codec.
* 0 : no compression
* 1 : gzip
* 2 : snappy
* 3 : lz4
* bit 3 : Timestamp type
* 0 : create time
* 1 : log append time
* bit 4 ~ 7 : reserved
* 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
* 5. 4 byte key length, containing length K
* 6. K byte key
* 7. 4 byte payload length, containing length V
* 8. V byte payload
*/

3.2 主题与分区

  • 主题(Topic)

    topic只是一个逻辑上的概念,它表示某种同类型消息的集合。也可以认为是消息被发送到的地方。通常我们可以通过topic来区分不同的业务,比如业务A使用一个topic,业务B使用另外一个topic

    Kafka中的topic通常会被多个消费者订阅,出于性能的考量,Kafka并不是topic-message,而是topic-partition-message的三层结构来分散负载。从本质上来说,每个topic都是由多个partition组成

  • 分区(Partition)

    每个分区都是一个有序的、不可变的记录序列,这些记录连续附加到一个结构化提交日志中。每个分区中的记录都被分配一个称为偏移量的顺序ID号,该偏移量唯一地标识分区中的每个记录

    Kafka中的partition实际上并没有太多的业务含义,它的引入只是单纯为了提升系统的吞吐量。因此在创建topic时,可以根据集群实际配置设置具体的partition数,实现整体性能的最大化

3.3 offset

在Kafka的消费端有着偏移量(offset)的概念

显然,每条消息在某个partition上的位移是固定的,但消费该partition的消费者的位移会随着消费进度不断前移,但终究不会超过该分区最新一条消息的偏移量

因此,对于Kafka中的任意一条消息,都可以表示成<topic,partition,offset>三元组,通过该元组来定位到唯一一条消息

3.4 replica

对于partition中的有序日志消息,为了保证的数据的有效性,以及系统的可靠性,数据本身是需要进行冗余操作的。这些冗余备份的日志消息,在Kafka中被称为副本(replica),它们存在的唯一目的就是防止数据丢失。

副本分为两类

  • 领导者副本(leader replica)
  • 跟随者副本(follower replica)

leader

​ leader对外提供服务

follower

​ 被动向leader获取数据,如果leader所在的broker出现宕机,Kafka就会从剩余的follower中选举出新的leader

3.5 ISR

ISR(in-sync replica),表示与leader replica保持同步的replica集合

Kafka为parition动态维护一个replica集合,该集合中的所有replica保存的消息日志都与leader replica保持同步状态。只有这个集合中的replica才会被选举为leader,也只有集合中所有replica都接收到同一消息,Kafka才会将该消息置为”已提交”状态,即认为这条消息发送成功

正常情况下,partition中所有的replica都应该与leader replica保持同步,即所有的replica都在ISR中。由于各种各样的原因,一小部分的replica开始落后与leader replica,当滞后到一定程度时,Kafka会将这些replica”踢”出ISR。当这些replica重新”追上”leader的进度时,Kafka会将他们重新加回到ISR中

这一些都是自动维护的,不需要人工干预

4.Kafka集群搭建

4.1 搭建Zookeeper集群

主机配置

主机名 部署模块
master zookeeper、kafka
node1 zookeeper、kafka
node2 zookeeper、kafka

Zookeeper集群搭建

  • 解压

    1
    tar zxf zookeeper-3.4.5.tar.gz -C /opt/modules
  • 复制配置文件

    1
    cp conf/zoo_sample.cfg conf/zoo.cfg
  • 配置数据存储目录

    1
    dataDir=/opt/modules/zookeeper-3.4.5/data
  • 创建数据存储目录

    1
    mkdir /opt/modules/zookeeper-3.4.5/data
  • 配置集群节点

    1
    2
    3
    4
    vi conf/zoo.cfg
    server.1=master:2888:3888
    server.2=slave1:2888:3888
    server.3=slave2:2888:3888
  • 在数据存储目录下创建名为myid的文件,内容为节点对应id

  • 启动每台节点的zookeeper

4.2 搭建Kafka集群

  • 编写配置文件(server.properties)

    1
    2
    3
    broker.id=0   #0/1/2
    listeners=PLAINTEXT://master:9092 #node1:9092/node2:9092
    zookeeper.connect=master:2181,master:2181,master:2181
  • 启动kafka server

    1
    bin/kafka-server-start.sh config/server.properties

5.Producer API

producer的主要功能就是向某个topic的某个分区发送消息,所以它首先需要确认想topic的哪个分区写入消息

分区的确认是由分区器(partitioner)完成

5.1 producer实现

添加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Properties props = new Properties();
props.put("bootstrap.servers", "10.211.55.12:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));

producer.close();

生产者包含一个缓冲区,其中保存尚未传输到服务器的记录,以及一个后台I/O线程,该线程负责将这些记录转换为请求并将它们传输到集群。使用后不关闭生产商将泄漏这些资源

send()方法是异步的。调用该方法时,它将记录发送到缓冲区,并立即返回。通过这种方式来以批处理的方式发送数据,从而提高效率

生产者为每个分区维护未发送记录的缓冲区。这些缓冲区的大小由batch.size配置指定。增大这个值可能会导致更多的批处理,但需要更多的内存

默认情况下,即使缓冲区中有额外的未使用空间,也可以立即发送缓冲区。但是,如果要减少请求数,可以将linger.ms设置为大于0的值。这将指示生产者在发送请求之前等待最长的毫秒数,以希望有更多的记录到达以填充同一批

获取返回结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Properties props = new Properties();
props.put("bootstrap.servers", "10.211.55.12:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null) {
System.out.println("send success");
}else {
System.out.println("send failure");
}
}
});

producer.close();

5.2 参数说明

参数名 描述
bootstrap.servers 用于建立与Kafka群集的初始连接的主机/端口对列表。此列表的格式应为host1:port1,host1:port2,…
key.serializer 设置key的序列化实现类
value.serializer 设置value的序列化实现类
acks 指定响应producer请求前,写入该消息的副本数。0表示producer不用关心leader端的处理结果,all表示leader以及ISR中所有副本写入完成后返回响应结果,1为0和all的折中方案,也是默认参数值,表示leader broker仅将消息写入本地日志后,就会producer进行相应
buffer.memory 指定producer端用于缓存消息的缓冲区大小,默认32M
compression.type 默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy、gzip 或 lz4,它指定了消息被发送给 broker 之前使用哪一种压缩算法进行压缩 1.snappy 压缩算法由 Google 发明,它占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法 2.gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法
retries 指定在写入消息出现异常时,进行写入重试的次数,默认为0
batch.size 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销
linger.ms 该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,就算批次里只有一个消息,生产者也会把消息发送出去。把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)
max.request.size 控制producer发送请求的大小,实际是控制producer端能够发送的最大消息大小,默认为1048576字节,通常无法满足实际需求
request.timeout.ms 控制producer发送后的等待响应时间,默认为30s

5.3 消息分区策略

producer在发送消息过程中,最重要的一步就是确定将消息发送到topic的哪个partition中。Kafka提供的分区器(partitioner)会尽力确保相同key的所有消息都发送到相同的分区上,若没有指定key,则会使用轮询的方式来确保消息在topic上是均匀分配的

5.4 自定义分区

实现Partitioner接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class MyPartitioner implements Partitioner {

private Random random;

@Override
public int partition(String topic, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
String key = (String) o1;
//获取topic的所有分区信息
List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
//随机获取分区
int partition = random.nextInt(partitionInfos.size());
return partition;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {
//该方法用于实现资源的初始化
random = new Random();
}
}

添加自定义分区类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ProducerTest {

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.211.55.12:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//设置定义分区
props.put("partitioner.class", "MyPartitioner");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null) {
System.out.println("send success");
}else {
System.out.println("send failure");
}
}
});

producer.close();
}
}

查看分区消息数

1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test

5.5 消息序列化

序列化类型 描述
ByteArraySerializer 本质上什么都不用做,因为己经是字节数组
ByteBufferSerializer 序列化 ByteBuffer
BytesSerializer 序列化 Kafka 自定义的 Bytes 类
DoubleSerializer 序列化 Double类型
IntegerSerializer 序列化 Integer类型
LongSerializer 序列化 Long类型
StringSerializer 序列化 String类型

5.6 producer拦截器

拦截器 (interceptor)是在Kafka 0.10.0.0版本中被引入,主要用于实现producer和consumer端的定制化控制逻辑

对于 producer 而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链( interceptor chain ) 。 intercetpor 的实现接口是org.apache.kafka.clients.producer.Producerlnterceptor

6.Consumer API

Kafka消费者(consumer)是从 Kafka读取数据的应用,若干个 consumer订阅 Kafka集群中的若干个topic并从 Kafka接收属于这些topic的消息

我们可以讲consumer分为两类

  • 独立消费者(standalone consumer)
  • 消费者组(consumer group)

6.1 消费者组

消费者使用一个消费者组名(即 group.id)来标记自己,topic 的每条消息都只会被发送到每个订阅它的消费者组的一个消费者实例上

通过消费者组,可以实现对两种模型的支持

  • 消息队列

    所有的consumer都属于同一个group,这样接收到的消息只能被其中一个consumer实例接收

  • 发布/订阅

    每个consumer都属于不同的group,这些Kafka消息就会被广播到所有的consumer实例上

6.2 consumer实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Properties props = new Properties();
props.put("bootstrap.servers", "10.211.55.12:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}finally {
consumer.close();
}

6.3 偏移量操作

Kafka为分区中的每个记录维护一个数字偏移量。此偏移量充当该分区中记录的唯一标识符,还表示使用者在分区中的位置。例如,位于位置5的使用者已使用偏移量为0到4的记录,接下来将接收偏移量为5的记录

实际上,与消费者的用户相关的位置有两个概念:

  • 消费者的位置给出了将给出的下一个记录的偏移量,每当消费者调用poll(long)时自动前进
  • 已提交的位置是安全存储的最后一个偏移量,如果进程失败并重新启动,这就是使用者将恢复到的偏移量。使用者可以定期自动提交偏移;也可以选择手动控制提交位置

自动提交偏移量

设置enable.auto.commit意味着自动提交偏移量,其频率由 auto.commit.interval.ms控制

手动提交偏移量

用户还可以控制何时应将记录视为已使用记录,从而提交其偏移量,而不是依赖使用者定期提交已使用的偏移量。当消息的消耗与一些处理逻辑相耦合时,这很有用,因此在完成处理之前,不应将消息视为消耗消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
//同步到数据库后,手动提交
consumer.commitSync();
buffer.clear();
}
}

为了避免这种情况,只有在相应的记录插入数据库之后,我们才会手动提交偏移量。这使我们能够精确地控制记录何时被视为已消耗。这就产生了相反的可能性:进程可能在插入到数据库之后但在提交之前的时间间隔内失败(尽管这可能只有几毫秒,但还是有可能的)。在这种情况下,接管消耗的进程将从上次提交的偏移量消耗,并重复插入最后一批数据。通过这种方式,Kafka提供了通常称为“至少一次”的交付保证,因为每个记录可能会交付一次,但在失败的情况下可能会出现重复数据

提交分区偏移量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}

6.4 偏移量存储外部化

消费程序可以不适用Kafka内置的偏移量存储,它可以在自己选择的存储中存储偏移量。这种方法主要用于应用程序在同一个系统中同时存储消耗的偏移量和结果,这样结果和偏移量就可以原子地存储

如果是这样,它将使消费完全原子化,并提供“恰好一次”语义,这比使用Kafka的偏移提交功能获得的默认“至少一次”语义更强

使用场景

  • 如果消费的结果存储在关系数据库中,那么可以在数据库中存储偏移量,并将存储结果与偏移量放在同一个事务中
  • offset也可存储在Zookeeper、HBase、Redis、HDFS、Spark Streaming checkpoint中

6.5 控制消费位置

在大多数用例中,使用者只需从头到尾使用记录,定期提交其位置(自动或手动)。然而,Kafka允许用户手动控制其位置,在分区中随意向前或向后移动。这意味着使用者可以重新使用旧记录,或者跳过最新的记录而不实际使用中间记录

有几种情况下,手动控制消费者的位置是有用的

  • 对于时间敏感的记录处理,对于远远落后于处理所有记录的消费者来说,这可能是有意义的,而不仅仅是跳到最近的记录
  • 维护本地状态的系统,在这样的系统中,消费者将希望在启动时将其位置初始化为本地存储中包含的任何内容

Kafka允许使用seek(TopicPartition, long)指定位置来指定新位置

还提供了寻找服务器维护的最早和最新偏移量的特殊方法,seekToBeginning(Collection)seekToEnd(Collection)

6.6 参数说明

参数名 描述
bootstrap.servers 用于建立与Kafka群集的初始连接的主机/端口对列表。此列表的格式应为host1:port1,host1:port2,…
key.deserializer 设置key的序列化实现类
value.deserialize 设置value的序列化实现类
group.id 指定consumer group的名称,它能够唯一标识一个 consumer group。默认值为空字符串
enable.auto.commit 指定consumer是否自动提交位移
session.timeout.ms 第一层含义:指定consumer group检测组内成员发送崩溃的时间。第二层含义consumer消息处理逻辑的最大时间,若 consumer 两次poll之间的间隔超过了该参数所设置的阈值,那么coordinator就会认为这个consumer己经追不上组内其他成员的消费进度了,因此会将该consumer实例“踢出”组,该consumer负责的分区也会被分配给其他consumer,默认值为10s
max.poll.interval.ms 单独设置consumer处理逻辑最大时间
auto.offset.reset 指定无位移信息或位移越界时Kafka的应对策略。earliest表示从最早的位移开始消费,但未必是从0开始。latest表示从最新处位移开始消费。none表示若未发现位移信息或位移越界,则抛出异常
fetch.max.bytes 指定consumer端单次获取数据的最大字节数
connections.max.idle.ms 设置定时关闭consumer与broker的socket连接时间,默认为9分钟,推荐设置为-1

7.Kafka Eagle

Kafka Eagle是一个监控系统,用于监控你的Kafka集群、可视化消费者线程、offset等

下载地址:http://download.kafka-eagle.org

  • 关闭防火墙

  • ```shell
    systemctl stop firewalld

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64

    - 解压kafka-eagle-bin-1.2.3.tar.gz

    - 解压kafka-eagle-web-1.2.3-bin.tar.gz

    - 编辑conf/system-config.properties

    - ```properties
    ######################################
    # multi zookeeper&kafka cluster list
    ######################################
    kafka.eagle.zk.cluster.alias=cluster1,cluster2
    cluster1.zk.list=localhost:2181
    cluster2.zk.list=localhost:2181

    ######################################
    # zk client thread limit
    ######################################
    kafka.zk.limit.size=25

    ######################################
    # kafka eagle webui port
    ######################################
    kafka.eagle.webui.port=8048

    ######################################
    # kafka offset storage
    ######################################
    cluster1.kafka.eagle.offset.storage=kafka
    cluster2.kafka.eagle.offset.storage=zookeeper
    ######################################
    # enable kafka metrics
    ######################################
    kafka.eagle.metrics.charts=true

    ######################################
    # alarm email configure
    ######################################
    kafka.eagle.mail.enable=true
    kafka.eagle.mail.sa=alert_sa
    kafka.eagle.mail.username=alert_sa@163.com
    kafka.eagle.mail.password=mqslimczkdqabbbg
    kafka.eagle.mail.server.host=smtp.163.com
    kafka.eagle.mail.server.port=25

    ######################################
    # delete kafka topic token
    ######################################
    kafka.eagle.topic.token=keadmin

    ######################################
    # kafka sasl authenticate
    ######################################
    kafka.eagle.sasl.enable=false
    kafka.eagle.sasl.protocol=SASL_PLAINTEXT
    kafka.eagle.sasl.mechanism=PLAIN

    ######################################
    # kafka jdbc driver address
    ######################################
    kafka.eagle.driver=org.sqlite.JDBC
    kafka.eagle.url=jdbc:sqlite:/opt/modules/kafka-eagle-bin-1.2.3/kafka-eagle-web-1.2.3/db/ke.db
    kafka.eagle.username=root
    kafka.eagle.password=smartloli
  • 启动

  • bin/ke.sh start
    
  • 访问:http://ip:port/ke