博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka集群部署及命令行操作
阅读量:3965 次
发布时间:2019-05-24

本文共 4226 字,大约阅读时间需要 14 分钟。

前期准备操作

关闭防火墙、普通的用户及文件权限

防火墙的相关操作

关闭防火墙#第一步: 停止firewallsystemctl stop firewalld.service#第二步:禁止firewall开机启动systemctl disable firewalld.service#第三步:查看防火墙状态systemctl status firewalld.servic

普通的用户及文件权限

创建用户groupadd realtime && useradd realtime && usermod -a -G realtime realtime创建工作目录并赋权mkdir /exportmkdir /export/serverschmod 755 -R /export切换到realtime用户下su realtime

安装kafka

下载安装包

http://kafka.apache.org/downloads.html

在linux中使用wget命令下载安装包
wget http://mirrors.hust.edu.cn/apache/kafka/0.8.2.2/kafka_2.11-0.8.2.2.tgz
解压安装包

[atguigu@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

修改解压后的文件名称

[atguigu@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka

在/opt/module/kafka目录下创建logs文件夹

[atguigu@hadoop102 kafka]$ mkdir logs

修改配置文件

[atguigu@hadoop102 kafka]$ cd config/[atguigu@hadoop102 config]$ vi server.properties

修改配置文件

cp /home/hadoop/apps/kafka/config/server.properties
/home/hadoop/apps/kafka/config/server.properties.bak
vi /home/hadoop/apps/kafka/config/server.properties
输入以下内容:

输入以下内容:#broker的全局唯一编号,不能重复broker.id=0#删除topic功能使能delete.topic.enable=true#处理网络请求的线程数量num.network.threads=3#用来处理磁盘IO的现成数量num.io.threads=8#发送套接字的缓冲区大小socket.send.buffer.bytes=102400#接收套接字的缓冲区大小socket.receive.buffer.bytes=102400#请求套接字的缓冲区大小socket.request.max.bytes=104857600#kafka运行日志存放的路径	log.dirs=/opt/module/kafka/logs#topic在当前broker上的分区个数num.partitions=1#用来恢复和清理data下数据的线程数量num.recovery.threads.per.data.dir=1#segment文件保留的最长时间,超时将被删除log.retention.hours=168#配置连接Zookeeper集群地址zookeeper.connect=bqy01:2181,bqy02:2181,bqy03:2181

配置环境变量(可不用操作)

[atguigu@hadoop102 module]$ sudo vi /etc/profile#KAFKA_HOMEexport KAFKA_HOME=/opt/module/kafkaexport PATH=$PATH:$KAFKA_HOME/bin[atguigu@hadoop102 module]$ source /etc/profile

分发到其他节点

[root@bqy01 kafka]# scp -r kafka bqy02:/data/module/[root@bqy01 kafka]# scp -r kafka bqy03:/data/module/

注:修改配置文件中的broker.id,broker.id不得重复

启动集群

依次在bqy01 、bqy02 、bqy03 节点上启动kafka

0.11版本

[root@bqy01 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties[root@bqy02 kafka]$ bin/kafka-server-start.sh -daemon  config/server.properties[root@bqy03 kafka]$ bin/kafka-server-start.sh -daemon  config/server.properties

2.11版本

[root@bqy01 kafka]$./bin/kafka-server-start.sh  ./config/server.properties

后台启动

后台启动://第一种./bin/kafka-server-start.sh  ./config/server.properties &//第二种nohup ./bin/kafka-server-start.sh  ./config/server.properties &

前台和后台进程有什么区别?

前台相当于一个会话:程序退出就没了

关闭集群

0.11版本

[root@bqy01 kafka]$ bin/kafka-server-stop.sh stop[root@bqy02 kafka]$ bin/kafka-server-stop.sh stop[root@bqy03 kafka]$ bin/kafka-server-stop.sh stop

2.11版本

[root@bqy01 kafka]$ ./bin/kafka-server-stop.sh  ./config/server.properties

命令行操作

进入zkClick:

在这里插入图片描述
查看当前服务器所有的topic
在这里插入图片描述
上面两种都可以
创建topic

[root@bqy02 kafka]# bin/kafka-topics.sh --zookeeper bqy02:2181 --create --replication-factor 3 --partitions 1 --topic first
[root@bqy02 kafka]#./bin/kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 1 --partitions 1 --topic test01

两种都可以

选项说明:

  • topic 定义topic名
  • replication-factor 定义副本数
  • partitions 定义分区数

当副本数大于1时,记得要打开其他的节点,一个副本对应1个节点

删除topic

[root@bqy02 kafka]# bin/kafka-topics.sh --zookeeper bqy02:2181 \> --delete --topic firstTopic first is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

通过shell命令发送消息 开启 生产

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test01

通过shell消费消息 消费

./bin/kafka-console-consumer.sh --zookeeper bqy02:2181 --from-beginning --topic test01

Kafka 2.2.1版本启动消费者:

开三个节点

./bin/kafka-console-consumer.sh --bootstrap-server bqy01:9092,bqy02:9092,bqy03:9092  --from-beginning --topic test01

开一个节点

./bin/kafka-console-consumer.sh --bootstrap-server bqy02:9092  --from-beginning --topic test01

查看某个Topic的详情

例如test01

./bin/kafka-topics.sh --topic test01 --describe --zookeeper bqy02:2181

新增配置

./bin/kafka-topics.sh --zookeeper bqy02:2181 --create --topic test --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

修改配置

./bin/kafka-topics.sh --zookeeper bqy02:2181 --alter --topic test --config max.message.bytes=128000

删除配置:

./bin/kafka-topics.sh --zookeeper bqy02:2181 --alter --topic test --delete-config max.message.bytes

转载地址:http://pwgzi.baihongyu.com/

你可能感兴趣的文章
RapidIO协议(1)
查看>>
RapidIO协议(2)
查看>>
DM8168 EMAC/MDIO模块(2)
查看>>
DM8168 EMAC/MDIO模块(3)
查看>>
DM8168 EMAC/MDIO模块(4)
查看>>
DM8168 EMAC/MDIO模块(5)
查看>>
DM8168 EMAC/MDIO模块(6)
查看>>
DM8168 EMAC/MDIO模块(7)
查看>>
DM8168 EMAC/MDIO模块(8)
查看>>
TVP5158的多路复用技术
查看>>
DM8168 HDVPSS的VIP Parser模块(1)
查看>>
DM8168 HDVPSS的VIP Parser模块(2)
查看>>
DM8168 HDVPSS的VIP Parser模块(5)
查看>>
ADIS16400/ADIS16405带磁力计的三轴惯性传感器(1)
查看>>
ADIS16400/ADIS16405带磁力计的三轴惯性传感器(2)
查看>>
ADIS16400/ADIS16405带磁力计的三轴惯性传感器(3)
查看>>
ADIS16400/ADIS16405带磁力计的三轴惯性传感器(4)
查看>>
DM8168的系统MMU(1)
查看>>
U-Boot SD (Secured Digital card) Support
查看>>
编译uboot-TI DM8186<<TI81XX PSP U-Boot>>
查看>>