消息存储
存储路径
kakfa的消息都会持久化到磁盘,并以日志文件的方式存储。日志文件的保存路径配置在config/server.properties中
# A comma seperated list of directories under which to store log files
log.dirs=/usr/local/var/lib/kafka-logs
每个topic的每个partition的数据都会存储在对应的子目录下,目录的命名规则为
{topic_name}-{partition-id}
假设我们创建一个名为test-topic的topic,它有3个分区,那么可以在kafka的数据目录下看下以下三个子目录
test-topic-0
test-topic-1
test-topic-2
文件管理
kafka是如何来管理日志文件的呢?如果我们进入到上述test-topic-0这个目录下,可以看到如下文件
-rw-r--r-- 1 zj-db0741 staff 0 Dec 22 2018 00000000000000000000.log
-rw-r--r-- 1 zj-db0741 staff 0 Oct 18 20:02 00000000000000000000.index
-rw-r--r-- 1 zj-db0741 staff 0 Oct 18 20:02 00000000000000000000.timeindex
我们知道Producer在不断的生成消息,如果不对日志文件进行分割,那么日志文件将会不断膨胀,这样对维护日志文件以及日志清理都会带来很大的困难。
kafka为了防止上述情况的发生,通过分段的方式将一个Log分割为多个LogSegment。Logsegment是逻辑上的概念,一个LogSegment对应磁盘上的日志文件和一个索引文件,其中日志文件用来记录消息。索引文件是用来保存消息的索引。LogSegment由两部分组成,index file和data file。其中后缀为.index和.timeindex的是索引文件,后缀为.log的是数据文件。
segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个 segment文件最后一条消息的offset值进行递增,例如如下三个文件(假设每个log存储5000消息):
00000000000000000000.log
00000000000000005001.log
00000000000000010001.log
在实际中,每个LogSegment的大小相等,但是消息的条数不一定相等。
如果想查看.log文件的内容,可以用下面的命令
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test-0/00000000000000000019.log --print-data-log
为什么还要有索引文件呢?当然是为了提高查找消息的效率了。其中index索引文件存储了offset对应的物理偏移量,timeindex文件映射了时间戳和相对的offset.
消息查找
有了索引文件之后,kafka是如何利用索引来查找消息的呢?kafka可以根据offset来查找消息,也可以根据时间戳来查找。
offset查找
假设我们要查询offset为7为消息
1. 首先确定消息在哪个logSegment中,显而易见是在第一个logSegment中(logSegment以文件中第一条消息的offset来命名)。
2. 打开这个logSegment的index文件,使用二分查找,找到索引文件中小于或等于目标offset的最大offset,在图中为offset=6,同时知道了offset为6的消息在文件中的位置为1407
3. 打开数据文件,从位置1407开始扫描直到找到offset为7的那条消息
在index文件中[6,1407]表示offset为6的消息,它在数据文件中的起始位置为1407。
通过上述查找过程,我们可以知道索引的疏密会影响查询效率,kafka通过以下配置来控制索引的疏密,默认是4K,数据文件的大小每增加4K就增加一个索引
# The interval with which we add an entry to the offset index
log.index.interval.bytes=4096
时间戳查找
timeindex文件存储是时间戳与offset的对应关系,所以首先需要找到对应的offset,然后在按照offset查找的方式查找消息。
假设我们要查询时间戳为1581836400000的消息,
1. 首先确定消息在哪个logSegment中,将时间戳1581836400000与每个logSegment的最大时间戳largestTimestamp比较,直到找到不小于目标时间戳的logSegment
2. 打开这个logSegment的timeindex文件,也是使用二分查找,找到不大于目标时间戳的最大时间戳,拿到这个时间戳所对应的offset
3. 然后按照offset查找的方式进行查找就行
在timeindex文件中[1581836400000,45]表示在1581836400000这个时刻写入的消息的offset为45
文件切分
我们已经知道一个partition的数据会分隔成多个logSegment,那么具体切分规则是怎么样的呢?我们可以通过以下配置项来控制logSegment的切分
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824(1G)
#The maximum time before a new log segment is rolled out (in hours), secondary to log.roll.ms property
log.roll.hours=168(7天)
# The maximum time before a new log segment is rolled out (in milliseconds). If not set, the value in log.roll.hours is used
log.roll.ms
# The maximum size in bytes of the offset index
log.index.size.max.bytes=10485760
当满足以下几个条件之一时,就会触发文件的切分
- 数据文件(后缀为.log)的大小超过log.segment.bytes配置的值,默认是1G
- 当logSegment的最大时间戳与当前时间的差值大于log.roll.hours或log.roll.ms配置的值。如果同时配置两个参数log.roll.ms的优先级更高。默认情况下,只配置了log.roll.hours,默认是7天
- index文件(后缀为.index)的大小超过log.index.size.max.bytes配置的值,默认是10M
日志清理
kafka提供了两种清理策略
- 日志删除:按照一定的删除策略,将不满足条件的数据进行数据删除
- 日志压缩:针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版本
可以通过配置log.cleanup.policy来配置,默认是delete,也可以选择compact
日志删除
kafka通过以下配置来控制日志删除
# The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion
# 日志删除检测频率,默认为5分钟,也即每隔5分钟,检测日志是否需要删除
log.retention.check.interval.ms=300000
#The number of hours to keep a log file before deleting it (in hours)
# 日志保存时间,默认是7天,也即7天前的日志需要删除
log.retention.hours=168
log.retention.minutes
log.retention.ms
# The time to wait before deleting a file from the filesystem
# 延迟删除时间,默认为1分钟,也即对于待删除的日志,需要等待一分钟之后在删除日志
file.delete.delay.ms=60000
# The maximum size of the log before deleting it
# 日志最大值,当所有日志文件大小的总和超过该配置就需要被删除
log.retention.bytes
基于时间
- 根据log.retention.hours等配置,找出过期的logSegment
- 在这些logSegment的所有文件添加.delete的后缀
- 交由一个以 “delete-file” 命名的延迟任务来删除这些 .delete 为后缀的文件。延迟执行时间可以通过file.delete.delay.ms进行设置
基于日志大小
- 计算需要被删除的日志总大小(total - log.retention.bytes)
- 从第一个LogSegment开始查找可删除的日志分段的文件集合
- 执行删除
零拷贝
我们已经知道kafka是将数据以文件的形式进行保存,也即保存在磁盘中。当consumer来消费数据时,server需要将数据从磁盘拷贝到内存中,在把内存中数据通过socket的方式发送给消费者,具体流程如下
1. 操作系统将数据从磁盘读入到内核空间的页缓存
2. 应用程序将数据从内核空间读入到用户空间缓存中
3. 应用程序将数据写回到内核空间到 socket 缓存中
4. 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出
这个过程涉及到 4 次上下文切换以及 4 次数据复制,但过程中数据没有变化,仅仅是从磁盘复制到网卡缓冲区。
为了提升性能,kafka进行了所谓的”零拷贝”,可以直接内核空间页缓存的数据直接复制到网卡缓冲区,这样就减少了复制次数,也减少了上下文切换的次数。
Read Buffer(Kernel) ---copy---> NIC Buffer(Kernel)
Linux通过sendfile的系统调用来完成上述操作。
消息消费
kafka以comsumerGroup(消费者)的形式来消费消息。一个topic下的所有partition只能被group中的一个consumer消费,不可能出现一个group中的两个consumer消费同一个partition。
假设一个topic分了3个partition,分别是0,1,2,同时有一个消费组,里面分别由3个consumer,那么每个consumer仅会分配一个partition来消费。
如果group只有2个consumer,那么其中一个consumer会消费两个partition,另外一个consumer消费最后一个partition。
如果group有4个consumer,那么有一个consumer会消费不到任何数据,因为3个partition被分配到另外3个consumer了。
那么,为什么一个分区只能由一个consumer消费呢?
Kafka消息在分区内有序,消费者消费消息时也要按照分区内消息顺序进行消费,有序消费就要保证消息是由消费者主动拉取的(pull),其次还要保证一个分区只能由一个消费者负责。kafka消费者自己可以控制读取消息的offset,如果两个消费者负责同一个分区,就有可能C1读到2,C1还没处理完,C2已经读到3了,因为这就相当于多线程读取同一个消息,造成消息处理的重复,且不能保证消息的顺序,这就跟主动推送(push)无异
在早前的kafka版本中,consumer的offset信息是保存在zookeeper,但是由于zookeeper并不适合大批量的频繁写入操作,新版本的kafka推荐将consumer的offset保存在kafka内部的topic中,也即__consumer_offsets。同时提供了kafka-consumer-groups.sh来查看consumer的offset信息
__consumer_offsets默认有50个分区。那么如何确定一个group的offset信息是保存在哪个partition中呢?可以使用下面的计算公式
groupID 为消费者的ID,numPartitions为__consumer_offsets的分区数,也即50
Math.abs(groupID.hashCode()) % numPartitions