安装
下载源码安装
我用的kafka的版本是0.10.2.0,后续的例子都是使用这个版本
tar -zxvf kafka-0.10.2.0-src.tgz
cd kafka-0.10.2.0
Mac Homebrew安装
brew install kafka
kafka的启动需要依赖zookeeper,用homebrew安装时,会自动安装zookeeper。安装完成之后,可以用以下命令查看安装信息
brew info kafka
kafka的安装路径,可以用以下命令查看
brew list kakfa
一般情况下,brew安装的项目路径为
/usr/local/Cellar
启动
1. 启动zookeeper
cd /usr/local/Cellar/kafka/0.10.2.0/libexec/bin
sh zookeeper-server-start.sh ../config/zookeeper.properties
2. 启动Kafka Server
sh kafka-server-start.sh ../config/server.properties
3. 创建Topic
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
4. 启动Producer
sh kafka-console-producer.sh --broker-list localhost:9092 --topic test
5. 启动Consumer
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
对于新的kafka版本,可以使用如下的命令
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
kafka配置
broker 配置
基本配置
配置名称 | 配置说明 |
---|---|
broker.id | broker在集群中的唯一标识 |
listeners | kafka监听的地址,如果没有配置,则使用java.net.InetAddress.getCanonicalHostName()获取的值 |
num.network.threads | 处理网络请求的线程数 |
num.io.threads | 处理I/O的线程数 |
socket.send.buffer.bytes | 发送缓存区的大小 |
socket.receive.buffer.bytes | 接收缓冲区的大小 |
socket.request.max.bytes | kafka允许接收或发送消息的最大字节数 |
zookeeper 配置
配置名称 | 配置说明 |
---|---|
zookeeper.connect | zookeeper的连接地址,多个Server间以逗号分隔 |
zookeeper.connection.timeout.ms | 连接zookeeper的超时时间 |
日志刷新策略
配置名称 | 配置说明 |
---|---|
log.flush.interval.messages | 每次刷新至磁盘的消息数 |
log.flush.interval.ms | 在数据被写入到硬盘前的最大时间 |
日志持久化策略
配置名称 | 配置说明 |
---|---|
log.retention.hours | 日志保留的最长时间 |
log.retention.bytes | 日志最大字节数 |
log.segment.bytes | 单个log segment文件的大小 |
log.retention.check.interval.ms | 检查log失效的间隔 |
producer 配置
配置名称 | 配置说明 |
---|---|
bootstrap.servers | broker地址 |
compression.type | 数据压缩策略,none,gzip,snappy,lz4 |
partitioner.class | 处理分区的类,默认根据key的hash分发到对应的分区 |
request.timeout.ms | 请求的超时时间 |
consumer 配置
配置名称 | 配置说明 |
---|---|
zookeeper.connect | zookeeper连接地址 |
zookeeper.connection.timeout.ms | zookeeper连接超时时间 |
group.id | 消费组id |
consumer.timeout.ms | 消费者超时时间 |
kafka脚本参数说明
kafka-config
用于查看并修改kafka的配置,–describe 查看配置, –alter 修改配置
参数名称 | 参数说明 |
---|---|
entity-type | 配置类型,有topics/clients/users/brokers |
entiey-name | 配置名称,对于topics就是topic的名称 |
可以通过以下命令查看可管理的配置
sh kafka-config.sh --help
describe
sh kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics
alter
sh kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test --add-config retention.ms=600000
这个时候在看topic test的配置,发现配置已修改
kafka-console-consumer
启动一个consumer
参数名称 | 参数说明 |
---|---|
bootstrap-server | broker地址,localhost:9092 |
zookeeper | zookeeper地址,localhost:2181 |
topic | topic名称 |
formatter | 格式化消息的类的名称 |
from-beginning | 如果consumer没有设置offset,则从最开始的消息开始消费,而不是最新的数据 |
offset | 指定offset的位置,可以是正整数,也可以是earliest/latest,默认是latest |
partition | 指定从哪个partition开始消费数据 |
kafka-topics
创建,删除,修改topic
参数名称 | 参数说明 |
---|---|
config | topic配置 |
delete-config | 删除配置 |
create | 创建topic |
delete | 删除topic |
partitions | topic的分区数 |
replication-factor | topic备份的数 |
topic | topic名称 |
create topic
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic
delete topic
sh kafka-topics.sh --delete -zookeeper localhost:2181 --topic test-topic
describe topic
sh kafka-topics.sh --zookeeper localhost:2181 --describe --topic test-topic
alter topic
修改partitions和replica的个数,只能增加
sh kafka-topics.sh --alter -zookeeper localhost:2181 --topic test-topic --partitions 3