Kafka_安装使用

By timebusker on January 31, 2018

image

Single Node-Single Broker

单机环境部署——在一台主机上安装kafka集群:

# 安装目录
cd /root/kafka_2.11-2.0.0

# 启动kafka自带的zookeeper服务器
bin/zookeeper-server-start.sh config/zookeeper.properties
# 使用zookeeper客户端连接测试
sh zkCli.sh -server hdp-cluster-21:2181

# 启动kafka服务
bin/kafka-server-start.sh config/server.properties
nohup bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

# 创建消息主题(主题名称不能包含非法字符)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic timebusker
# 查看主题列表
bin/kafka-topics.sh --list --zookeeper localhost:2181

# 向主题推送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic timebusker

# 消费主题消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic timebusker --from-beginning

Single Node-Multiple Broker

伪分布式部署——在单个节点上安装多个broker:

# 同Single Node-Single Broker启动zookeeper

# 编辑服务端配置文件server.properties
# 一个Broker对应一个配置文件,在启动Broker时指定配置文件
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2
	
nohup bin/kafka-server-start.sh config/server-1.properties 1>/dev/null 2>&1 &
nohup bin/kafka-server-start.sh config/server-2.properties 1>/dev/null 2>&1 &

# 创建有两个副本的topic 
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic timebusker 

# 查看topic在Broker上的运行状况
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic timebusker

# 指定broker和topic发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic timebusker

# 从指定的broker中消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic timebusker --from-beginning

Multiple Node-Multiple Broke

集群环境(完全分布式)部署——在多个节点上配置多个Broker组件:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic timebusker 

bin/kafka-console-producer.sh --broker-list hdp-cluster-21:9092,hdp-cluster-22:9092,hdp-cluster-23:9092 --topic timebusker
  
bin/kafka-console-consumer.sh --bootstrap-server hdp-cluster-21:9092,hdp-cluster-22:9092,hdp-cluster-23:9092 --topic timebusker --from-beginning
# 集群配置
# server.properties 

# kafka集群服务唯一编号
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# 处理IO读写的线程数,不得小于log.dirs配置的目录数
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# 配置消息日志目录,多个可以逗号分隔,kafka会根据目录下的分区数做均衡
log.dirs=/tmp/kafka-logs

分区数\副本数\.....
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# 优化配置方案
# broker处理消息的最大线程数(默认为3)
num.network.threads=cpu核数+1
# broker处理磁盘IO的线程数 
num.io.threads=cpu核数*2

# 每当producer写入10000条消息时,刷数据到磁盘 
log.flush.interval.messages=10000
# 每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000

# 保留三天,也可以更短 (log.cleaner.delete.retention.ms)
log.retention.hours=72
# 段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件
log.segment.bytes=1073741824

default.replication.factor:3
# 这个参数指新创建一个topic时,默认的Replica数量,Replica过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在2~3为宜。

JAVA API 
zookeeper.session.timeout.ms 
解释:配置的超时时间太短,Zookeeper没有读完Consumer的数据,连接就被Consumer断开了!
参数:5000 

zookeeper.sync.time.ms
解释:ZooKeeper集群中leader和follower之间的同步的时间
参数:2000

auto.commit.enable=true 
解释:注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交

auto.commit.interval.ms
解释:自动提交offset到zookeeper的时间间隔
参数:1000

zookeeper.connection.timeout.ms 
解释:确认zookeeper连接建立操作客户端能等待的最长时间
参数:10000

rebalance.backoff.ms
解释:消费均衡两次重试之间的时间间隔
参数:2000

rebalance.max.retries
解释:消费均衡的重试次数
参数:10
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
--------------------------------------
# 属于客户端程序配置项
# consumer.properties
# producer.properties
--------------------------------------
//当前机器在集群中的唯一标识,和zookeeper的myid性质一样
broker.id=0
//当前kafka对外提供服务的端口默认是9092
port=9092
//这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
host.name=hadoop1
//这个是borker进行网络处理的线程数
num.network.threads=3
//这个是borker进行I/O处理的线程数
num.io.threads=8
//发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.send.buffer.bytes=102400
//kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=102400
//这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
socket.request.max.bytes=104857600
//消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,
//如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
log.dirs=/home/hadoop/log/kafka-logs
//默认的分区数,一个topic默认1个分区数
num.partitions=1
//每个数据目录用来日志恢复的线程数目
num.recovery.threads.per.data.dir=1
//默认消息的最大持久化时间,168小时,7天
log.retention.hours=168
//这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.segment.bytes=1073741824
//每隔300000毫秒去检查上面配置的log失效时间
log.retention.check.interval.ms=300000
//是否启用log压缩,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false
//设置zookeeper的连接端口
zookeeper.connect=192.168.123.102:2181,192.168.123.103:2181,192.168.123.104:2181
//设置zookeeper的连接超时时间
zookeeper.connection.timeout.ms=6000

常用指令

# 后台进程启动运行kafka
./kafka-server-start.sh  -daemon ../config/server.properties

# 创建topic  创建副本数为1,分区数为1的 消息主题  test
./kafka-topics.sh --create --zookeeper test-hdp-21:2181 --replication-factor 1 --partitions 1 --topic timebusker

# 查看kafka消息主题
./kafka-topics.sh --list --zookeeper test-hdp-22:2181

# 开启发送者并发送消息
./kafka-console-producer.sh --broker-list test-hdp-22:9092 --topic timebusker

# 开启消费者并接收消息
./kafka-console-consumer.sh --topic timebusker --from-beginning --bootstrap-server test-hdp-22:9092

# 增加消息主题副本数

# 增加消息主题分区数

# 删除消息主题
/kafka-topics.sh --zookeeper test-hdp-22:2181 --delete --topic timebusker

# 查看消费组消费消息偏移量 offset