Kafka——消息存储与处理-3

消息存储

存储路径

kakfa的消息都会持久化到磁盘,并以日志文件的方式存储。日志文件的保存路径配置在config/server.properties中

# A comma seperated list of directories under which to store log files
log.dirs=/usr/local/var/lib/kafka-logs

每个topic的每个partition的数据都会存储在对应的子目录下,目录的命名规则为

{topic_name}-{partition-id}

假设我们创建一个名为test-topic的topic,它有3个分区,那么可以在kafka的数据目录下看下以下三个子目录

test-topic-0
test-topic-1
test-topic-2

文件管理

kafka是如何来管理日志文件的呢?如果我们进入到上述test-topic-0这个目录下,可以看到如下文件

-rw-r--r--  1 zj-db0741  staff  0 Dec 22  2018 00000000000000000000.log
-rw-r--r--  1 zj-db0741  staff  0 Oct 18 20:02 00000000000000000000.index
-rw-r--r--  1 zj-db0741  staff  0 Oct 18 20:02 00000000000000000000.timeindex

我们知道Producer在不断的生成消息,如果不对日志文件进行分割,那么日志文件将会不断膨胀,这样对维护日志文件以及日志清理都会带来很大的困难。

kafka为了防止上述情况的发生,通过分段的方式将一个Log分割为多个LogSegment。Logsegment是逻辑上的概念,一个LogSegment对应磁盘上的日志文件和一个索引文件,其中日志文件用来记录消息。索引文件是用来保存消息的索引。LogSegment由两部分组成,index file和data file。其中后缀为.index和.timeindex的是索引文件,后缀为.log的是数据文件。

segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个 segment文件最后一条消息的offset值进行递增,例如如下三个文件(假设每个log存储5000消息):

00000000000000000000.log

00000000000000005001.log

00000000000000010001.log

在实际中,每个LogSegment的大小相等,但是消息的条数不一定相等。

如果想查看.log文件的内容,可以用下面的命令

sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test-0/00000000000000000019.log --print-data-log

为什么还要有索引文件呢?当然是为了提高查找消息的效率了。其中index索引文件存储了offset对应的物理偏移量,timeindex文件映射了时间戳和相对的offset.

消息查找

有了索引文件之后,kafka是如何利用索引来查找消息的呢?kafka可以根据offset来查找消息,也可以根据时间戳来查找。

offset查找


假设我们要查询offset为7为消息

1. 首先确定消息在哪个logSegment中,显而易见是在第一个logSegment中(logSegment以文件中第一条消息的offset来命名)。
2. 打开这个logSegment的index文件,使用二分查找,找到索引文件中小于或等于目标offset的最大offset,在图中为offset=6,同时知道了offset为6的消息在文件中的位置为1407
3. 打开数据文件,从位置1407开始扫描直到找到offset为7的那条消息

在index文件中[6,1407]表示offset为6的消息,它在数据文件中的起始位置为1407。

通过上述查找过程,我们可以知道索引的疏密会影响查询效率,kafka通过以下配置来控制索引的疏密,默认是4K,数据文件的大小每增加4K就增加一个索引

# The interval with which we add an entry to the offset index
log.index.interval.bytes=4096

时间戳查找

timeindex文件存储是时间戳与offset的对应关系,所以首先需要找到对应的offset,然后在按照offset查找的方式查找消息。

假设我们要查询时间戳为1581836400000的消息,

1. 首先确定消息在哪个logSegment中,将时间戳1581836400000与每个logSegment的最大时间戳largestTimestamp比较,直到找到不小于目标时间戳的logSegment
2. 打开这个logSegment的timeindex文件,也是使用二分查找,找到不大于目标时间戳的最大时间戳,拿到这个时间戳所对应的offset
3. 然后按照offset查找的方式进行查找就行

在timeindex文件中[1581836400000,45]表示在1581836400000这个时刻写入的消息的offset为45

文件切分

我们已经知道一个partition的数据会分隔成多个logSegment,那么具体切分规则是怎么样的呢?我们可以通过以下配置项来控制logSegment的切分

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824(1G)

#The maximum time before a new log segment is rolled out (in hours), secondary to log.roll.ms property
log.roll.hours=168(7天)

# The maximum time before a new log segment is rolled out (in milliseconds). If not set, the value in log.roll.hours is used
log.roll.ms

# The maximum size in bytes of the offset index
log.index.size.max.bytes=10485760

当满足以下几个条件之一时,就会触发文件的切分

  • 数据文件(后缀为.log)的大小超过log.segment.bytes配置的值,默认是1G
  • 当logSegment的最大时间戳与当前时间的差值大于log.roll.hourslog.roll.ms配置的值。如果同时配置两个参数log.roll.ms的优先级更高。默认情况下,只配置了log.roll.hours,默认是7天
  • index文件(后缀为.index)的大小超过log.index.size.max.bytes配置的值,默认是10M

日志清理

kafka提供了两种清理策略

  • 日志删除:按照一定的删除策略,将不满足条件的数据进行数据删除
  • 日志压缩:针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版本

可以通过配置log.cleanup.policy来配置,默认是delete,也可以选择compact

日志删除

kafka通过以下配置来控制日志删除

# The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion
# 日志删除检测频率,默认为5分钟,也即每隔5分钟,检测日志是否需要删除
log.retention.check.interval.ms=300000

#The number of hours to keep a log file before deleting it (in hours)
# 日志保存时间,默认是7天,也即7天前的日志需要删除
log.retention.hours=168
log.retention.minutes
log.retention.ms

# The time to wait before deleting a file from the filesystem
# 延迟删除时间,默认为1分钟,也即对于待删除的日志,需要等待一分钟之后在删除日志
file.delete.delay.ms=60000

# The maximum size of the log before deleting it
# 日志最大值,当所有日志文件大小的总和超过该配置就需要被删除
log.retention.bytes

基于时间

  1. 根据log.retention.hours等配置,找出过期的logSegment
  2. 在这些logSegment的所有文件添加.delete的后缀
  3. 交由一个以 “delete-file” 命名的延迟任务来删除这些 .delete 为后缀的文件。延迟执行时间可以通过file.delete.delay.ms进行设置

基于日志大小

  1. 计算需要被删除的日志总大小(total - log.retention.bytes)
  2. 从第一个LogSegment开始查找可删除的日志分段的文件集合
  3. 执行删除

零拷贝

我们已经知道kafka是将数据以文件的形式进行保存,也即保存在磁盘中。当consumer来消费数据时,server需要将数据从磁盘拷贝到内存中,在把内存中数据通过socket的方式发送给消费者,具体流程如下

1. 操作系统将数据从磁盘读入到内核空间的页缓存
2. 应用程序将数据从内核空间读入到用户空间缓存中
3. 应用程序将数据写回到内核空间到 socket 缓存中
4. 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出

这个过程涉及到 4 次上下文切换以及 4 次数据复制,但过程中数据没有变化,仅仅是从磁盘复制到网卡缓冲区。

为了提升性能,kafka进行了所谓的”零拷贝”,可以直接内核空间页缓存的数据直接复制到网卡缓冲区,这样就减少了复制次数,也减少了上下文切换的次数。

Read Buffer(Kernel)  ---copy---> NIC Buffer(Kernel)

Linux通过sendfile的系统调用来完成上述操作。

消息消费

kafka以comsumerGroup(消费者)的形式来消费消息。一个topic下的所有partition只能被group中的一个consumer消费,不可能出现一个group中的两个consumer消费同一个partition。

假设一个topic分了3个partition,分别是0,1,2,同时有一个消费组,里面分别由3个consumer,那么每个consumer仅会分配一个partition来消费。

如果group只有2个consumer,那么其中一个consumer会消费两个partition,另外一个consumer消费最后一个partition。

如果group有4个consumer,那么有一个consumer会消费不到任何数据,因为3个partition被分配到另外3个consumer了。

那么,为什么一个分区只能由一个consumer消费呢?

Kafka消息在分区内有序,消费者消费消息时也要按照分区内消息顺序进行消费,有序消费就要保证消息是由消费者主动拉取的(pull),其次还要保证一个分区只能由一个消费者负责。kafka消费者自己可以控制读取消息的offset,如果两个消费者负责同一个分区,就有可能C1读到2,C1还没处理完,C2已经读到3了,因为这就相当于多线程读取同一个消息,造成消息处理的重复,且不能保证消息的顺序,这就跟主动推送(push)无异

在早前的kafka版本中,consumer的offset信息是保存在zookeeper,但是由于zookeeper并不适合大批量的频繁写入操作,新版本的kafka推荐将consumer的offset保存在kafka内部的topic中,也即__consumer_offsets。同时提供了kafka-consumer-groups.sh来查看consumer的offset信息

__consumer_offsets默认有50个分区。那么如何确定一个group的offset信息是保存在哪个partition中呢?可以使用下面的计算公式

groupID 为消费者的ID,numPartitions为__consumer_offsets的分区数,也即50

Math.abs(groupID.hashCode()) % numPartitions

Reprint please specify: wbl Kafka——消息存储与处理-3

Previous
Java Reference--1 Java Reference--1
GC与Reference在JVM进行GC的时候,JVM首先需要判断一个对象是否可以被回收。JVM从GC ROOT的对象开始向下搜索,搜索所走过的路径称为引用链(reference chain),当一个对象没有任何引用时,就认为这个对象不可达
2020-02-22
Next
Java线程池-2 Java线程池-2
上一篇Java线程池-1中,我们了解了线程池的相关参数以及内置的几种线程池,现在我们来研究下如何自己实现一个线程池。 在实现之前,我们先思考下,一个线程池有哪些要素呢? 线程复用 线程管理 拒绝策略 线程复用如何实现线程的复用呢?还记得
2020-02-06 wbl