Kafka——QuickStart(2)

安装

下载源码安装

下载地址

我用的kafka的版本是0.10.2.0,后续的例子都是使用这个版本

tar -zxvf kafka-0.10.2.0-src.tgz
cd kafka-0.10.2.0

Mac Homebrew安装

brew install kafka

kafka的启动需要依赖zookeeper,用homebrew安装时,会自动安装zookeeper。安装完成之后,可以用以下命令查看安装信息

brew info kafka

kafka的安装路径,可以用以下命令查看

brew list kakfa

一般情况下,brew安装的项目路径为

/usr/local/Cellar

启动

1. 启动zookeeper

cd /usr/local/Cellar/kafka/0.10.2.0/libexec/bin
sh zookeeper-server-start.sh ../config/zookeeper.properties

2. 启动Kafka Server

sh kafka-server-start.sh ../config/server.properties

3. 创建Topic

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

4. 启动Producer

sh kafka-console-producer.sh --broker-list localhost:9092 --topic test

5. 启动Consumer

sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

对于新的kafka版本,可以使用如下的命令

sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

kafka配置

broker 配置

基本配置

配置名称 配置说明
broker.id broker在集群中的唯一标识
listeners kafka监听的地址,如果没有配置,则使用java.net.InetAddress.getCanonicalHostName()获取的值
num.network.threads 处理网络请求的线程数
num.io.threads 处理I/O的线程数
socket.send.buffer.bytes 发送缓存区的大小
socket.receive.buffer.bytes 接收缓冲区的大小
socket.request.max.bytes kafka允许接收或发送消息的最大字节数

zookeeper 配置

配置名称 配置说明
zookeeper.connect zookeeper的连接地址,多个Server间以逗号分隔
zookeeper.connection.timeout.ms 连接zookeeper的超时时间

日志刷新策略

配置名称 配置说明
log.flush.interval.messages 每次刷新至磁盘的消息数
log.flush.interval.ms 在数据被写入到硬盘前的最大时间

日志持久化策略

配置名称 配置说明
log.retention.hours 日志保留的最长时间
log.retention.bytes 日志最大字节数
log.segment.bytes 单个log segment文件的大小
log.retention.check.interval.ms 检查log失效的间隔

producer 配置

配置名称 配置说明
bootstrap.servers broker地址
compression.type 数据压缩策略,none,gzip,snappy,lz4
partitioner.class 处理分区的类,默认根据key的hash分发到对应的分区
request.timeout.ms 请求的超时时间

consumer 配置

配置名称 配置说明
zookeeper.connect zookeeper连接地址
zookeeper.connection.timeout.ms zookeeper连接超时时间
group.id 消费组id
consumer.timeout.ms 消费者超时时间

kafka脚本参数说明

kafka-config

用于查看并修改kafka的配置,–describe 查看配置, –alter 修改配置

参数名称 参数说明
entity-type 配置类型,有topics/clients/users/brokers
entiey-name 配置名称,对于topics就是topic的名称

可以通过以下命令查看可管理的配置

sh kafka-config.sh --help

describe

sh kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics

alter

sh kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test --add-config retention.ms=600000

这个时候在看topic test的配置,发现配置已修改

kafka-console-consumer

启动一个consumer

参数名称 参数说明
bootstrap-server broker地址,localhost:9092
zookeeper zookeeper地址,localhost:2181
topic topic名称
formatter 格式化消息的类的名称
from-beginning 如果consumer没有设置offset,则从最开始的消息开始消费,而不是最新的数据
offset 指定offset的位置,可以是正整数,也可以是earliest/latest,默认是latest
partition 指定从哪个partition开始消费数据

kafka-topics

创建,删除,修改topic

参数名称 参数说明
config topic配置
delete-config 删除配置
create 创建topic
delete 删除topic
partitions topic的分区数
replication-factor topic备份的数
topic topic名称

create topic

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic

delete topic

sh kafka-topics.sh --delete -zookeeper localhost:2181 --topic test-topic

describe topic

sh kafka-topics.sh --zookeeper localhost:2181 --describe --topic test-topic

alter topic

修改partitions和replica的个数,只能增加

sh kafka-topics.sh --alter -zookeeper localhost:2181 --topic test-topic --partitions 3                

参考文献


Reprint please specify: wbl Kafka——QuickStart(2)

Previous
Kafka源码分析——Producer Kafka源码分析——Producer
Producer 使用示例kafka-console-producersh kafka-console-producer.sh --broker-list localhost:9092 --topic test producer clien
2019-01-05
Next
Kafka—-入门介绍(1) Kafka—-入门介绍(1)
Kafka介绍kafka是一个分布式的,基于发布/订阅的消息系统。简单的可以理解kafka是一个消息队列,可以往队列里面写入数据,也可以从队列里面取出数据进行处理。 kafka关键概念我以自来水厂的例子来解释kafka的相关概念,可能不够严
2018-12-16