本文共 3342 字,大约阅读时间需要 11 分钟。
kafka集群是把状态保存在zookeeper中的,首先要搭建zookeeper集群。
wget http://xxxxx.oss-cn-xxxx.aliyuncs.com/xxxx/jdk-8u171-linux-x64.rpmyum localinstall jdk-8u171-linux-x64.rpm -y
wget http://xxx-xx.oss-cn-xxx.aliyuncs.com/xxx/kafka_2.12-1.1.0.tgz官网下载链接:http://kafka.apache.org/downloads
解压kafka
tar -zxvf kafka_2.12-1.1.0.tgz
mv kafka_2.12-1.1.0 kafka
直接使用kafka自带的zookeeper建立zk集群
cd /data/kafkavim conf/zookeeper.properties
#tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。#initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒#syncLimit:这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒#dataDir:快照日志的存储路径#dataLogDir:需手动创建事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多#clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
进入dataDir目录,将三台服务器上的myid文件分别写入1、2、3。
myid是zk集群用来发现彼此的标识,必须创建,且不能相同。echo "1" > /data/kafka/zk/myid
echo "2" > /data/kafka/zk/myid echo "3" > /data/kafka/zk/myid
zookeeper不会主动的清除旧的快照和日志文件,需要定期清理。
#!/bin/bash #snapshot file dir dataDir=/data/kafka/zk/version-2#tran log dir dataLogDir=/data/kafka/log/zk/version-2#Leave 66 files count=66 count=$[$count+1] ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f #以上这个脚本定义了删除对应两个目录中的文件,保留最新的66个文件,可以将他写到crontab中,设置为每天凌晨2点执行一次就可以了。
进入kafka目录,执行zookeeper命令
cd /data/kafkanohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > logs/zookeeper.log 2>&1 &
没有报错,而且jps查看有zk进程就说明启动成功了。
vim conf/server.properties
部分参数含义:
先每台设置host,listeners里要设置,否则后面消费消息会报错。 broker.id 每台都不能相同num.network.threads 设置为cpu核数num.partitions 分区数设置视情况而定,上面有讲分区数设置default.replication.factor kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
nohup ./bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>&1 &
执行jps检查
./bin/kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 2 --partitions 1 --topic test1--replication-factor 2 #复制两份--partitions 1 #创建1个分区--topic #主题为test1
#模拟客户端去发送消息,生产者./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test1#模拟客户端去接受消息,消费者./bin/kafka-console-consumer.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --from-beginning --topic test1#然后在生产者处输入任意内容,在消费端查看内容。
./bin/kafka-topics.sh --list --zookeeper xxxx:2181#显示创建的所有topic./bin/kafka-topics.sh --describe --zookeeper xxxx:2181 --topic test1#Topic:ssports PartitionCount:1 ReplicationFactor:2 Configs:# Topic: test1 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1#分区为为1 复制因子为2 他的 test1的分区为0 #Replicas: 0,1 复制的为0,1
修改配置文件server.properties添加如下配置:
delete.topic.enable=true 配置完重启kafka、zookeeper。
如果不想修改配置文件可删除topc及相关数据目录
#删除kafka topic./bin/kafka-topics.sh --delete --zookeeper xxxx:2181,xxxx:2181 --topic test1#删除kafka相关数据目录rm -rf /data/kafka/log/kafka/test*#删除zookeeper相关路径rm -rf /data/kafka/zk/test*rm -rf /data/kafka/log/zk/test*
转载于:https://blog.51cto.com/somethingshare/2407063