安装
下载源码安装
我用的kafka的版本是0.10.2.0,后续的例子都是使用这个版本
tar -zxvf kafka-0.10.2.0-src.tgz
cd kafka-0.10.2.0
Mac Homebrew安装
brew install kafka
kafka的启动需要依赖zookeeper,用homebrew安装时,会自动安装zookeeper。安装完成之后,可以用以下命令查看安装信息
brew info kafka
kafka的安装路径,可以用以下命令查看
brew list kakfa
一般情况下,brew安装的项目路径为
/usr/local/Cellar
启动
1. 启动zookeeper
cd /usr/local/Cellar/kafka/0.10.2.0/libexec/bin
sh zookeeper-server-start.sh ../config/zookeeper.properties

2. 启动Kafka Server
sh kafka-server-start.sh ../config/server.properties
3. 创建Topic
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
4. 启动Producer
sh kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
5. 启动Consumer
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

对于新的kafka版本,可以使用如下的命令
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
kafka配置
broker 配置
基本配置
| 配置名称 | 配置说明 | 
|---|---|
| broker.id | broker在集群中的唯一标识 | 
| listeners | kafka监听的地址,如果没有配置,则使用java.net.InetAddress.getCanonicalHostName()获取的值 | 
| num.network.threads | 处理网络请求的线程数 | 
| num.io.threads | 处理I/O的线程数 | 
| socket.send.buffer.bytes | 发送缓存区的大小 | 
| socket.receive.buffer.bytes | 接收缓冲区的大小 | 
| socket.request.max.bytes | kafka允许接收或发送消息的最大字节数 | 
zookeeper 配置
| 配置名称 | 配置说明 | 
|---|---|
| zookeeper.connect | zookeeper的连接地址,多个Server间以逗号分隔 | 
| zookeeper.connection.timeout.ms | 连接zookeeper的超时时间 | 
日志刷新策略
| 配置名称 | 配置说明 | 
|---|---|
| log.flush.interval.messages | 每次刷新至磁盘的消息数 | 
| log.flush.interval.ms | 在数据被写入到硬盘前的最大时间 | 
日志持久化策略
| 配置名称 | 配置说明 | 
|---|---|
| log.retention.hours | 日志保留的最长时间 | 
| log.retention.bytes | 日志最大字节数 | 
| log.segment.bytes | 单个log segment文件的大小 | 
| log.retention.check.interval.ms | 检查log失效的间隔 | 
producer 配置
| 配置名称 | 配置说明 | 
|---|---|
| bootstrap.servers | broker地址 | 
| compression.type | 数据压缩策略,none,gzip,snappy,lz4 | 
| partitioner.class | 处理分区的类,默认根据key的hash分发到对应的分区 | 
| request.timeout.ms | 请求的超时时间 | 
consumer 配置
| 配置名称 | 配置说明 | 
|---|---|
| zookeeper.connect | zookeeper连接地址 | 
| zookeeper.connection.timeout.ms | zookeeper连接超时时间 | 
| group.id | 消费组id | 
| consumer.timeout.ms | 消费者超时时间 | 
kafka脚本参数说明
kafka-config
用于查看并修改kafka的配置,–describe 查看配置, –alter 修改配置
| 参数名称 | 参数说明 | 
|---|---|
| entity-type | 配置类型,有topics/clients/users/brokers | 
| entiey-name | 配置名称,对于topics就是topic的名称 | 
可以通过以下命令查看可管理的配置
sh kafka-config.sh --help
    
describe
sh kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics

alter
sh kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test --add-config retention.ms=600000
这个时候在看topic test的配置,发现配置已修改
    
kafka-console-consumer
启动一个consumer
| 参数名称 | 参数说明 | 
|---|---|
| bootstrap-server | broker地址,localhost:9092 | 
| zookeeper | zookeeper地址,localhost:2181 | 
| topic | topic名称 | 
| formatter | 格式化消息的类的名称 | 
| from-beginning | 如果consumer没有设置offset,则从最开始的消息开始消费,而不是最新的数据 | 
| offset | 指定offset的位置,可以是正整数,也可以是earliest/latest,默认是latest | 
| partition | 指定从哪个partition开始消费数据 | 
kafka-topics
创建,删除,修改topic
| 参数名称 | 参数说明 | 
|---|---|
| config | topic配置 | 
| delete-config | 删除配置 | 
| create | 创建topic | 
| delete | 删除topic | 
| partitions | topic的分区数 | 
| replication-factor | topic备份的数 | 
| topic | topic名称 | 
create topic
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic
delete topic
sh kafka-topics.sh --delete -zookeeper localhost:2181 --topic test-topic
describe topic
sh kafka-topics.sh --zookeeper localhost:2181 --describe --topic test-topic
alter topic
修改partitions和replica的个数,只能增加
sh kafka-topics.sh --alter -zookeeper localhost:2181 --topic test-topic --partitions 3