首页
About Me
推荐
weibo
github
Search
1
linuxea:gitlab-ci之docker镜像质量品质报告
49,197 阅读
2
linuxea:如何复现查看docker run参数命令
21,468 阅读
3
Graylog收集文件日志实例
18,257 阅读
4
git+jenkins发布和回滚示例
17,882 阅读
5
linuxea:jenkins+pipeline+gitlab+ansible快速安装配置(1)
17,778 阅读
ops
Openvpn
Sys Basics
rsync
Mail
NFS
Other
Network
HeartBeat
server 08
Code
Awk
Shell
Python
Golang
virtualization
KVM
Docker
openstack
Xen
kubernetes
kubernetes-cni
Service Mesh
Data
Mariadb
PostgreSQL
MongoDB
Redis
MQ
Ceph
TimescaleDB
kafka
surveillance system
zabbix
ELK Stack
Open-Falcon
Prometheus
victoriaMetrics
Web
apache
Tomcat
Nginx
自动化
Puppet
Ansible
saltstack
Proxy
HAproxy
Lvs
varnish
更多
互联咨询
最后的净土
软件交付
持续集成
gitops
devops
登录
Search
标签搜索
kubernetes
docker
zabbix
Golang
mariadb
持续集成工具
白话容器
linux基础
nginx
elk
dockerfile
Gitlab-ci/cd
最后的净土
基础命令
jenkins
docker-compose
gitops
haproxy
saltstack
Istio
marksugar
累计撰写
676
篇文章
累计收到
140
条评论
首页
栏目
ops
Openvpn
Sys Basics
rsync
Mail
NFS
Other
Network
HeartBeat
server 08
Code
Awk
Shell
Python
Golang
virtualization
KVM
Docker
openstack
Xen
kubernetes
kubernetes-cni
Service Mesh
Data
Mariadb
PostgreSQL
MongoDB
Redis
MQ
Ceph
TimescaleDB
kafka
surveillance system
zabbix
ELK Stack
Open-Falcon
Prometheus
victoriaMetrics
Web
apache
Tomcat
Nginx
自动化
Puppet
Ansible
saltstack
Proxy
HAproxy
Lvs
varnish
更多
互联咨询
最后的净土
软件交付
持续集成
gitops
devops
页面
About Me
推荐
weibo
github
搜索到
1
篇与
的结果
2022-03-11
linuxea:kafka在私有云DNAT环境中集群的典型应用
今天配置的是一个2.5.0的一个kafka集群,新的版本将废弃zookeeper,今天不讨论新版本有一个私有的云环境,业务需求希望通过公网向kafka发送数据,使用SCRAM-SHA-256加密,内网仍然需要能够正常访问, 而其中,需要通过DNAT的方式来映射内网端口暴漏给互联网。而在做映射的时候,必然是一个端口对一个端口的,于是,大致的示意拓扑如下如果你不是这种方式,可以尝试Kafka 实现内外网访问流量分离来解决问题而在实际的生产中,你会发现,内网采用内网IP进行访问的时候,kafka是可以正常协商进行处理请求而在公网通过6.78.5.32的9092,9093,9094端口访问的时候会出现出现一个问题,客户端当请求A通过6.78.5.32:9092发送,经过防火墙DNAT层后,发给后端kafka,而此时kafka收到消息后回复给发送者,而回复的时候是使用的172.16.100.7:9092端口,你的客户端根本就不认识172.16.100.7,因此发送失败而这个现象在你只是向kafka发送消息,而不在乎他是否返回的时候,代码层面显示是成功的,但是数据并未成功插入。于是,就有了另外一种方式消息发送后需要返回,服务端和客户端都分别写ip和hostname,通过域名和本地hosts的方式解析出ip,分别发送到代理服务器和客户端,而不是某一个固定的ip。无论来自公网的访问还是内网的访问,最终在本地的hosts各自指向一个可以被访问到的一个ip,从而完成响应。这种形式在官网的某些字段中被解读为“防止中间人攻击”如下version: kafka_2.12-2.5.0jdk: 1.8.0_211先决条件:同步时间10 * * * * ntpdate ntp.aliyun.com修改hosts并本地hosts#172.16.100.7 hostnamectl set-hostname kafka1 #172.16.100.8 hostnamectl set-hostname kafka2 #172.16.100.9 hostnamectl set-hostname kafka3172.16.100.7 kafka1 172.16.100.8 kafka2 172.16.100.9 kafka3准备工作二进制安装java,或者rpm安装即可tar xf jdk-8u211-linux-x64.tar.gz -C /usr/local/ cd /usr/local && ln -s jdk1.8.0_211 java cat > /etc/profile.d/java.sh <<EOF export JAVA_HOME=/usr/local/java export PATH=\$JAVA_HOME/bin:\$PATH export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar EOF source /etc/profile.d/java.sh准备工作,所有节点创建目录和用户DPATH=/data mkdir ${DPATH}/zookeeper/logs ${DPATH}/kafka/ ${DPATH}/logs/ -p groupadd -r -g 699 kafka useradd -u 699 -s /sbin/nologin -c 'kafka server' -g kafka kafka -M chown -R kafka.kafka ${DPATH}下载kafka_2.12-2.5.0,解压到/usr/local/下,创建软连接到当前的kafkatar xf kafka_2.12-2.5.1.gz -C /usr/local/ cd /usr/local/ ln -s kafka_2.12-2.5.1 kafka tar xf kafka_2.12-2.5.0.tgz -C /usr/local/ cd /usr/local/ ln -s kafka_2.12-2.5.0 kafka/usr/local/kafka/config准备两个认证文件作为kafka认证kafka_client_jaas.conf#kafka客户端连接方式及生产者、消费者连接集群的用户密码 cat > /usr/local/kafka/config/kafka_client_jaas.conf << EOF KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="marksugar" password="linuxea.com"; }; EOF#kafka客户端连接方式及生产者、消费者连接集群的用户密码 cat > /usr/local/kafka/config/kafka_client_jaas.conf << EOF KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="marksugar" password="linuolxloADMINXP[QP[1]]"; }; EOFkafka_server_jaas.confmarkadmin用户作为超级管理员,这里的用户和密码文件是为了后面启动使用cat > /usr/local/kafka/config/kafka_server_jaas.conf << EOF KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="markadmin" password="MwMzA0MGZGFmOG" user_markadmin="markadmin"; }; EOFzookeeper备份mv /usr/local/kafka/config/zookeeper.properties /usr/local/kafka/config/zookeeper.properties.bak1, 在10.100.63.7修改配置文件cat > /usr/local/kafka/config/zookeeper.properties << EOF tickTime=2000 initLimit=10 syncLimit=5 dataDir=/data/zookeeper dataLogDir=/data/zookeeper/logs clientPort=2181 server.0=172.16.100.7:2888:3888 server.1=172.16.100.8:2888:3888 server.2=172.16.100.9:2888:3888 EOF2, 创建id。每台节点不一样echo "0" > /data/zookeeper/myid3, 启动脚本cat > /etc/systemd/system/zookeeper.service << EOF [Unit] Description=ZooKeeper Service After=network.target After=network-online.target Wants=network-online.target [Service] Environment=ZOO_LOG_DIR=/u01/data/zookeeper/logs PIDFile=/data/zookeeper/zookeeper_server.pid User=kafka Group=kafka ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties #RestartSec=15 #LimitNOFILE=65536 #OOMScoreAdjust=-999 Type=simple Restart=on-failure [Install] WantedBy=default.target EOFkafkadefault.replication.factor=2 1不备份,2备份 num.network.threads=3 大于CPU+1 num.io.threads=8 cpu的两倍1, 在10.100.63.7修改配置文件我们新创建一个文件,不用原来的文件我们直接配置一个advertised.listeners=SASL_PLAINTEXT://kakfa.linuxea.com:9092,kakfa.linuxea.com在本地hosts写入,写入的ip是代理的ip地址假设不需要代理,而只是集群访问,则配置为当前的IP 即可cat > /usr/local/kafka/config/server-scram.properties << EOF broker.id=1 ##### Socket Server Settings 监听协议和端口####### listeners=SASL_PLAINTEXT://172.16.100.7:9092 advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9092 ######### Log Basics ########## #日志路径 log.dirs=/data/kafka/ #num.partitions=16 ######## Zookeeper 集群信息 ########## zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181 ###### SCRAM Settings 认证部分######## sasl.enabled.mechanisms=SCRAM-SHA-256 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 security.inter.broker.protocol=SASL_PLAINTEXT authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:markadmin;User:marksugar num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 # 创建三个副本和分区 num.partitions=3 #auto.create.topics.enable=true default.replication.factor=2 EOF执行cat > /usr/local/kafka/config/server-scram.properties << EOF broker.id=0 listeners=SASL_PLAINTEXT://172.16.100.7:9092 advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9092 log.dirs=/data/kafka/ zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181 sasl.enabled.mechanisms=SCRAM-SHA-256 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 security.inter.broker.protocol=SASL_PLAINTEXT authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:markadmin;User:marksugar num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 num.partitions=3 #auto.create.topics.enable=true default.replication.factor=2 EOF启动脚本cat > /etc/systemd/system/kafka.service << EOF [Unit] Description=kafka Service After=network.target syslog.target [Service] Environment=ZOO_LOG_DIR=/data/kafka/logs SyslogIdentifier=kafka # 添加limit参数 LimitFSIZE=infinity LimitCPU=infinity LimitAS=infinity LimitMEMLOCK=infinity LimitNOFILE=64000 LimitNPROC=64000 User=kafka Group=kafka Type=simple Restart=on-failure Environment="KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf" Environment="PATH=${PATH}:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server-scram.properties ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh [Install] WantedBy=default.target EOF修改java配置,bin下的kafka-server-start.sh,配置内存大小,并且配置9999端口eagleif [ "x$KAFKA_HEAP_OPTS" = "x" ]; then #export KAFKA_HEAP_OPTS="-server -Xms4G -Xmx4G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParalupancyPercent=70" export KAFKA_HEAP_OPTS="-server -Xms4G -Xmx4G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200" export JMX_PORT="9999" fi配置----- 将kafka程序目录复制和启动脚本到172.16.100.8和172.16.100.9的/usr/local下,修改如下:scp -r kafka_2.12-2.5.1 172.16.100.8:/usr/local/ scp -r kafka_2.12-2.5.1 172.16.100.9:/usr/local/ scp /etc/systemd/system/zookeeper.service 172.16.100.8:/etc/systemd/system/ scp /etc/systemd/system/zookeeper.service 172.16.100.9:/etc/systemd/system/ scp /etc/systemd/system/kafka.service 172.16.100.8:/etc/systemd/system/ scp /etc/systemd/system/kafka.service 172.16.100.9:/etc/systemd/system/登录到172.16.100.8,172.16.100.9创建目录,做软连接cd /usr/local && ln -s kafka_2.12-2.5.1 kafka mkdir /u01/data/zookeeper/logs -p mkdir -p /u01/data/kafka/ groupadd -r -g 699 kafka useradd -u 699 -s /sbin/nologin -c 'kafka server' -g kafka kafka -M mkdir /u01/data/logs/ -p chown -R kafka.kafka /u01/data/ chown -R /usr/local/kafka_2.12-2.5.1/ kafka.kafka*根据server.0=172.16.100.7:2888:3888 server.1=172.16.100.8:2888:3888 server.2=172.16.100.9:2888:3888对应修改172.16.100.8echo "1" > /u01/data/zookeeper/myid172.16.100.9echo "2" > /u01/data/zookeeper/myidkafka修改server-scram.properties文件内容,这四项修改172.16.100.8broker.id=2 ##### Socket Server Settings 监听协议和端口####### listeners=SASL_PLAINTEXT://172.16.100.8:9093 advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9093172.16.100.9broker.id=3 ##### Socket Server Settings 监听协议和端口####### listeners=SASL_PLAINTEXT://172.16.100.9:9094 advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9094172.16.100.8 kafka配置修改后如下broker.id=2 ##### Socket Server Settings 监听协议和端口####### listeners=SASL_PLAINTEXT://172.16.100.8:9093 advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9093 ######### Log Basics ########## #日志路径 log.dirs=/u01/data/kafka/ #num.partitions=16 ######## Zookeeper 集群信息 ########## zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181 ###### SCRAM Settings 认证部分######## sasl.enabled.mechanisms=SCRAM-SHA-256 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 security.inter.broker.protocol=SASL_PLAINTEXT authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:markadmin;User:marksugar num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 num.partitions=3 #auto.create.topics.enable=true default.replication.factor=2172.16.100.9 kafka配置修改后如下broker.id=3 ##### Socket Server Settings 监听协议和端口####### listeners=SASL_PLAINTEXT://172.16.100.9:9094 advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9094 ######### Log Basics ########## #日志路径 log.dirs=/u01/data/kafka/ #num.partitions=16 ######## Zookeeper 集群信息 ########## zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181 ###### SCRAM Settings 认证部分######## sasl.enabled.mechanisms=SCRAM-SHA-256 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 security.inter.broker.protocol=SASL_PLAINTEXT authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:markadmin;User:marksugar num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 num.partitions=3 #auto.create.topics.enable=true default.replication.factor=2zookeeper授权先启动整个zookeeper集群,分别授权两个用户如果环境变量有问题可以在脚本/usr/local/kafka/bin/kafka-run-class.sh里面添加JAVA_HOME=/usr/local/javasystemctl start zookeeper systemctl enable zookeeper systemctl status zookeeper开始创建用户创建语句/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \ --add-config 'SCRAM-SHA-256=[iterations=8192,password=linuxea.com],SCRAM-SHA-256=[password=linuxea.com]' \ --entity-type users --entity-name marksugar /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \ --add-config 'SCRAM-SHA-256=[iterations=8192,password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG],SCRAM-SHA-256=[password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG]' \ --entity-type users --entity-name markadmin# 开始创建markadmin [root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \ --add-config 'SCRAM-SHA-256=[iterations=8192,password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG],SCRAM-SHA-256=[password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG]' \ --entity-type users --entity-name markadmin Warning: --zookeeper is deprecated and will be removed in a future version of Kafka. Use --bootstrap-server instead to specify a broker to connect to. Completed updating config for entity: user-principal 'markadmin' # 开始创建marksugar [root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \ --add-config 'SCRAM-SHA-256=[iterations=8192,password=linuxea.com],SCRAM-SHA-256=[password=linuxea.com]' \ --entity-type users --entity-name marksugar Warning: --zookeeper is deprecated and will be removed in a future version of Kafka. Use --bootstrap-server instead to specify a broker to connect to. Completed updating config for entity: user-principal 'marksugar'.查看所有SCRAM证书/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users如下[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users Warning: --zookeeper is deprecated and will be removed in a future version of Kafka. Use --bootstrap-server instead to specify a broker to connect to. Configs for user-principal 'markadmin' are SCRAM-SHA-256=salt=MWtxOG56cHNybGhoank1Nmg2M3dsa2JwZGw=,stored_key=G6nlglpSF0uQDskBmV3uOrpuGwEcFKfeTOcaIpuqINY=,server_key=pBEXAihvOLqAGzns2fbu2p96LqLVLM78clUAyftpMjg=,iterations=4096 Configs for user-principal 'marksugar' are SCRAM-SHA-256=salt=MTJnbGxpMWRzajZoMXRvcnBxcXF3b241MDY=,stored_key=mCocSbPBI0yPp12Kr9131nFDA6GIP11p++FQwp0+Ri4=,server_key=vzMqKkT+ZwaVWOryD2owVlMk5gMEaSW2wZI+s1x9Fd8=,iterations=4096查看单个用户的证书/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name marksugar /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name markadmin查看[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name marksugar Warning: --zookeeper is deprecated and will be removed in a future version of Kafka. Use --bootstrap-server instead to specify a broker to connect to. Configs for user-principal 'marksugar' are SCRAM-SHA-256=salt=MTJnbGxpMWRzajZoMXRvcnBxcXF3b241MDY=,stored_key=mCocSbPBI0yPp12Kr9131nFDA6GIP11p++FQwp0+Ri4=,server_key=vzMqKkT+ZwaVWOryD2owVlMk5gMEaSW2wZI+s1x9Fd8=,iterations=4096 [root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 10.100.63.9:2181 --describe --entity-type users --entity-name markadmin Warning: --zookeeper is deprecated and will be removed in a future version of Kafka. Use --bootstrap-server instead to specify a broker to connect to. Configs for user-principal 'markadmin' are SCRAM-SHA-256=salt=MWtxOG56cHNybGhoank1Nmg2M3dsa2JwZGw=,stored_key=G6nlglpSF0uQDskBmV3uOrpuGwEcFKfeTOcaIpuqINY=,server_key=pBEXAihvOLqAGzns2fbu2p96LqLVLM78clUAyftpMjg=,iterations=4096启动kafka授权完成,启动第一台kafka。对目录进行授权chown -R kafka.kafka /usr/local/kafka*先手动启动测试是否正常sudo -u kafka KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server-scram.properties观察日志/usr/local/kafka/logs/server.log,正常情况下能够看到如下提示已经启动[2021-05-21 17:12:03,524] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)此时kafka需要配置hosts,hosts包含所有的主机名和代理主机名172.16.100.7 kafka1 172.16.100.8 kafka2 172.16.100.9 kafka3 172.16.100.10 kafka.linuxea.com如果没用问题配置开启启动systemctl enable kafka systemctl start kafka systemctl status kafka并且以此启动其他两台验证用户创建主题/usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.100.63.7:2181 --create --topic test --partitions 12 --replication-factor 3发送消息export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"cat producer.confsecurity.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="xyt#*admin.com&!k4";内网访问/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.100.7:9092,172.16.100.8:9093,172.16.100.9:9094 --topic test --producer.config producer.conf > hello远程/usr/local/kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test --producer.config producer.conf消费消息cat consumer.confsecurity.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin.com&!k4";内网访问/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.100.7:9092,172.16.100.8:9093,172.16.100.9:9094 --topic test --from-beginning --consumer.config consumer.conf hello远程/usr/local/kafka/bin/kafka-console-consumer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test --from-beginning --consumer.config consumer.conf构建代理层nginx stream配置stream { log_format proxy '$remote_addr [$time_local]' '$protocol $status $bytes_sent $bytes_received' '$session_time "$upstream_addr" ' '"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"'; upstream kafka1 { server kafka1:9092 weight=1; } server { listen 9092; proxy_pass kafka1; access_log /data/logs/9092.log proxy ; } upstream kafka2 { server kafka2:9093 weight=1; } server { listen 9093; proxy_pass kafka2; access_log /data/logs/9093.log proxy ; } upstream kafka3 { server kafka3:9094 weight=1; } server { listen 9094; proxy_pass kafka3; access_log /data/logs/9094.log proxy ; } }添加hosts,并且kafka节点也要如下配置172.16.100.7 kafka1 172.16.100.8 kafka2 172.16.100.9 kafka3 172.16.100.10 kafka.linuxea.com测试kafka连通性测试节点也需要配置hosts指向proxy172.16.100.10 kafka.linuxea.com安装 python 3.8 ,并且安装confluent_kafkapip install -i https://pypi.tuna.tsinghua.edu.cn/simple confluent_kafka Andpip install -i https://mirrors.aliyun.com/pypi/simple confluent_kafka脚本如下# !/usr/bin/python # #encoding=utf-8 from confluent_kafka import Producer import json from datetime import datetime """ def producer_demo(): # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json producer = KafkaProducer(bootstrap_servers=['IP:9092'], security_protocol='SASL_PLAINTEXT', #sasl_mechanism="SCRAM-SHA-256", sasl_mechanism='PLAIN', #sasl_kerberos_service_name='admin', #sasl_kerberos_domain_name='hadoop.hadoop.com', sasl_plain_username='admin', sasl_plain_password="*admin.com", #key_serializer=lambda k: json.dumps(k).encode('utf-8'), value_serializer=lambda v: json.dumps(v).encode('utf-8') # ,api_version=(0, 10) ) # 连接kafka msg_dict = "Hello World".encode('utf-8') # 发送内容,必须是bytes类型 for i in range(0, 3): #msg = json.dumps(msg_dict) future = producer.send('test', msg_dict, partition=0) try: future.get(timeout=10) # 监控是否发送成功 except kafka_errors: # 发送失败抛出kafka_errors traceback.format_exc() producer.close() """ def confluentKafkaDemo(): topic_name = 'test' ## count = 100 start = 0 conf = { 'bootstrap.servers': 'kafka.linuxea.com:9092,kafka.linuxea.com:9093,kafka.linuxea.com:9094', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanisms': 'SCRAM-SHA-256', 'sasl.username': 'linuxea', ## 用户名 'sasl.password': 'MwMzA0MGFmOG' ## 密码 } producer = Producer(**conf) data = { 'name': 'test1 is ok', 'time': str(datetime.now()) } try: while start < count: producer.produce(topic_name, (json.dumps(data)).encode(), callback=delivery_report) producer.flush() start = start+1 except Exception as e: print(e) def delivery_report(err, msg): if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) if __name__ == '__main__': #producer_demo() confluentKafkaDemo()执行脚本查看是否插入成功kafka-eaglekafka-eagle在被使用了用户验证的集群将能不能够正常使用,总会有一些瑕疵kafka-eagle上仍然需要做hosts解析172.16.100.7 kafka1 172.16.100.8 kafka2 172.16.100.9 kafka3下载2.0.5tar xf kafka-eagle-bin-2.0.5.tar.gz -C /usr/local/ cd /usr/local/kafka-eagle-bin-2.0.5 tar xf kafka-eagle-web-2.0.5-bin.tar.gz ln -s /usr/local/kafka-eagle-bin-2.0.5/ /usr/local/kafka-eagle cp /usr/local/kafka-eagle/kafka-eagle-web-2.0.5/conf/system-config.properties /usr/local/kafka-eagle/kafka-eagle-web-2.0.5/conf/system-config.properties.bak mkdir /data/kafka-eagle/db/ -p为了方便,kafka-eagle必须修改hostname为ip地址hostnamectl set-hostname 192.168.3.6配置环境变量cat > /etc/profile.d/kafka-eagle.sh <<EOF export KE_HOME=/usr/local/kafka-eagle/kafka-eagle-web-2.0.5 export PATH=\$PATH:\$JAVA_HOME/bin:\$KE_HOME/bin EOF source /etc/profile.d/kafka-eagle.shjavaexport JAVA_HOME=/usr/local/jdk1.8.0_211 export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH创建ke.dbcd /data/kafka-eagle/db/ && sqlite3 ke.db配置文件中删掉cluster2的配置。修改zk地址,sasl开启验证###################################### # multi zookeeper & kafka cluster list ###################################### kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181 ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest cluster1.zk.acl.username=test cluster1.zk.acl.password=test123 ###################################### # broker size online list ###################################### cluster1.kafka.eagle.broker.size=20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=32 ###################################### # kafka eagle webui port ###################################### kafka.eagle.webui.port=8048 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.kafka.eagle.jmx.acl=false cluster1.kafka.eagle.jmx.user=keadmin cluster1.kafka.eagle.jmx.password=keadmin123 cluster1.kafka.eagle.jmx.ssl=false cluster1.kafka.eagle.jmx.truststore.location=/Users/dengjie/workspace/ssl/certificates/kafka.truststore cluster1.kafka.eagle.jmx.truststore.password=ke123456 ###################################### # kafka offset storage ###################################### cluster1.kafka.eagle.offset.storage=kafka ###################################### # kafka jmx uri ###################################### cluster1.kafka.eagle.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi ###################################### # kafka metrics, 15 days by default ###################################### kafka.eagle.metrics.charts=true kafka.eagle.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### kafka.eagle.sql.topic.records.max=5000 ###################################### # delete kafka topic token ###################################### kafka.eagle.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.kafka.eagle.sasl.enable=true cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256 cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="markadmin" password="markadmin.com"; cluster1.kafka.eagle.sasl.client.id= cluster1.kafka.eagle.blacklist.topics= cluster1.kafka.eagle.sasl.cgroup.enable=false cluster1.kafka.eagle.sasl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### kafka.eagle.driver=org.sqlite.JDBC kafka.eagle.url=jdbc:sqlite:/data/kafka-eagle/db2.0.5/ke.db kafka.eagle.username=root kafka.eagle.password=www.kafka-eagle.org启动/usr/local/kafka-eagle/kafka-eagle-web-2.0.5/bin/ke.sh startkafka增加副本{ "topics": [ {"topic": "linuxea_position_shaanxi_1"} ], "version": 1 }[root@linuxea06 bin]# ./kafka-reassign-partitions.sh --zookeeper 172.16.100.9:2181 --topics-to-move-json-file linuxea_position_shaanxi_1.json --broker-list "0,1,2,3,4,5" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"linuxea_position_shaanxi_1","partition":3,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":5,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":4,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":2,"replicas":[2,1,3],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"linuxea_position_shaanxi_1","partition":3,"replicas":[3,2,1],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":5,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":2,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":4,"replicas":[1,3,2],"log_dirs":["any","any","any"]}]}复制{"version":1,"partitions":[ {"topic":"linuxea_position_shaanxi_1","partition":3,"replicas":[3,2,1],"log_dirs":["any","any","any"]}, {"topic":"linuxea_position_shaanxi_1","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]}, {"topic":"linuxea_position_shaanxi_1","partition":5,"replicas":[2,1,3],"log_dirs":["any","any","any"]}, {"topic":"linuxea_position_shaanxi_1","partition":2,"replicas":[2,3,1],"log_dirs":["any","any","any"]}, {"topic":"linuxea_position_shaanxi_1","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}, {"topic":"linuxea_position_shaanxi_1","partition":4,"replicas":[1,3,2],"log_dirs":["any","any","any"]}]}执行./kafka-reassign-partitions.sh --zookeeper 172.16.100.9:2181 --reassignment-json-file linuxea_position_shaanxi_1.json -execute查看[root@linuxea06 bin]# ./kafka-reassign-partitions.sh --zookeeper 172.16.100.9:2181 --reassignment-json-file linuxea_position_shaanxi_1.json --verify Status of partition reassignment: Reassignment of partition linuxea_position_shaanxi_1-3 completed successfully Reassignment of partition linuxea_position_shaanxi_1-0 completed successfully Reassignment of partition linuxea_position_shaanxi_1-5 completed successfully Reassignment of partition linuxea_position_shaanxi_1-4 completed successfully Reassignment of partition linuxea_position_shaanxi_1-1 completed successfully Reassignment of partition linuxea_position_shaanxi_1-2 completed successfully [root@linuxea06 bin]#kafka删除topicsleder列出/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181删除/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 17.168.0.174:2181 --topic test1 Topic test1 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.创建节点数:--replication-factor 3 分区: --partitions 18/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 18 --topic topic1删除group我们使用kafka-consumer-groups.sh列出consumer[root@linuxea06 bin]# ./kafka-consumer-groups.sh --bootstrap-server 172.16.100.9:9092 --list --command-config ../config/admin.conf linuxea-python-consumer-group-position_3 spring-boot-group-position-agg-linuxea-pre spring-boot-group-position-agg-linuxea-1 linuxea-consumer-third-party spring-boot-alarm-outage-shaanxi-group1 spring-boot-alarm-offlinenotask-linuxea-group1 spring-boot-group-position-linuxea-2 spring-boot-alarm-outage-linuxea-group1开始删除./kafka-consumer-groups.sh \ --bootstrap-server <bootstrap-server-url> \ --delete-offsets \ --group linuxea_position_shaanxi_1 \ --topic spring-boot-group-position-agg-linuxea-1报错解决Error: Executing consumer group command failed due to org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:180) at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:61) at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) Caused by: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups at org.apache.kafka.clients.admin.KafkaAdminClient$23.handleFailure(KafkaAdminClient.java:2773) at org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:641) at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:757) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutPendingCalls(KafkaAdminClient.java:825) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1119) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.添加认证信息 --command-config ../config/admin.conf参考kafka-consumer-group-script-to-see-all-consumer-group-not-workinghow-to-remove-a-kafka-consumer-group-from-a-specific-topickafka-consumer-groups.sh消费者组管理Kafka 实现内外网访问流量分离
2022年03月11日
1,665 阅读
0 评论
0 点赞