您的位置:首页 > 新闻 > 会展 > zookeeper+kafka消息队列集群部署

zookeeper+kafka消息队列集群部署

2024/10/14 10:14:57 来源:https://blog.csdn.net/henanxiaoman/article/details/140439734  浏览:    关键词:zookeeper+kafka消息队列集群部署

一.消息队列

1、什么是消息队列

        消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

        消息队列(MessageQueue)是一种在软件系统中用于传递消息和实现异步通信的技术。它通常被用来解耦不同组件或服务之间的通信,从而提高系统的可靠性、扩展性和灵活性。消息发布者只管把消息发布到MQ中而不用管谁来取,消息使用者只管从MQ中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

2、特征

(1)存储:消息队列作为缓冲层,可以处理生产者和消费者的速度差异,防止消息丢失或处理能力不足的问题。

(2)异步:消息队列实现了生产者和消费者的异步通信,生产者不需要等待消费者处理完消息就可以继续工作。

(3)解藕:消息队列能够解耦生产者和消费者,使它们不需要直接知道彼此的存在,从而降低系统组件之间的耦合度。

3、为什么使用消息队列

(1)解耦

        允许你独立的扩展或修改两边的处理过程,只要确保它们遵守相同的接口约束。

(2)冗余

        消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。

(3)扩展性

        因为消息队列解耦了你的处理过程,所以增大消息入队的处理的频率是很容易的,只要另外增加处理过程即可

(4)实现削峰填谷

        在系统的流量剧增时,消息队列能够缓冲请求,平滑处理峰值流量,避免直接将大量请求发送给后端服务导致服务不可用或性能下降。

(5)可恢复性

        系统的一部分组件失效后,不会影响整个系统,消息队列降低了进程间的耦合度,所以即时一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理

(6)顺序保证

        消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(kafka保证一个partition内的消息的有序性)

(7)缓冲

        有助于控制和优化数据流经过系统的速度,解决生产消息和处理消息速度不一致的问题。

(8)异步通信

        提供了异步处理机制,允许用户把一个消息放入队列,但不立即处理它,在需要的时候处理。

二.kafka

1.概念

Kafka 是一个高性能、分布式的消息队列系统,最初由LinkedIn开发并开源。它具有以下几个显著的特点和优势:

  1. 高吞吐量:Kafka 能够处理非常高的消息吞吐量。它的设计目标是每秒处理数百万条消息。

  2. 分布式:Kafka 是一个分布式系统,消息被分布存储在多个节点上,支持水平扩展。这使得它能够处理大规模数据和流量。

  3. 持久性:Kafka 将消息持久化到磁盘,保证了消息在发送和消费过程中的持久性和可靠性。即使消费者离线一段时间,也能确保不丢失消息。

  4. 多订阅者支持:Kafka 的消息可以被多个消费者组消费,每个消费者组可以独立消费消息,支持发布-订阅和队列模式。

  5. 可扩展性:Kafka 可以通过增加节点来水平扩展集群,以支持更大的负载和更高的吞吐量。

  6. 低延迟:Kafka 的设计追求低延迟,适用于需要快速数据传输和处理的场景。

  7. 消息保留:Kafka 允许消息在一定时间内保留在系统中,消费者可以根据需要重放过去的消息。

  8. 社区活跃:作为开源项目,Kafka 拥有一个活跃的社区,提供了丰富的文档、示例和支持。

 2.kafka角色术语

(1)broker:kafka集群包含一个或多个服务器,每个服务器被称为broker(经纪人)。

(2)Topic:每条发布到kafka集群的消息都有一个分类,这个分类被称为Topic(主题)。

(3)Producer:消息的生产者,负责发布消息到kafka broker。

(4)Consumer:指消息的消费者,从kafka broker拉取数据,并消费这些已发布的消息

(5)partition:partition是物理上的概念,每个Topic包含一个或多个Partition,每个Partition都是一个有序队列,Partition中的每条消息都会被分配一个有序的id(offset)

(6)Consumer Group:消费者组,默认属于group组。

(7)Message:消息,通信的基本单位,每个producer可以向一个topic发布一些信息

三.zookeeper 

1、概述

        zookeeper是一种分布式协调技术,所谓分布式协调技术主要是用来解决分布式环境当中多个进程之间的同步控制,让它们有序的去访问某个共享资源,防止造成资源竞争(脑裂)的后果。

2、zookeeper工作原理

(1)master启动

        在分布式系统中引入Zookeeper以后,就可以配置多个主节点,这里以配置两个主节点为例,假定它们是主节点A和主节点B,当两个主节点都启动后,它们都会向Zookeeper中注册节点信息。我们假设主节点A锁注册的节点信息是master001,主节点B注册的节点信息是master002,注册完以后会进行选举,选举有多种算法,这里以编号最小作为选举算法为例,编号最小的节点将在选举中获胜并获得锁成为主节点,也就是主节点A将会获得锁成为主节点,然后主节点B将被阻塞成为一个备用节点。这样,通过这种方式Zookeeper就完成了对两个Master进程的调度。完成了主、备节点的分配和协作

(2)master故障

        如果主节点A发生了故障,这时候它在Zookeeper所注册的节点信息会被自动删除,而Zookeeper会自动感知节点的变化,发现主节点A故障后,会再次发出选举,这时候主节点B将在选举中获胜,替代主节点A成为新的主节点,这样就完成了主、被节点的重新选举

(3)master恢复 

        如果主节点恢复了,它会再次向zookeeper注册自身的节点信息,只不过这时候它注册的节点信息将会变成master003,而不是原来的信息。zookeeper会感知节点的变化再次发动选举,这时候主节点B在选举过程中再次获胜继续担任主节点,节点A担任备份节点。

3.zookeeper集群架构

Leader:领导者角色,主要负责投票的发起和决议,以及更新系统状态。

follower:跟随着角色,用于接收客户端的请求并返回结果给客户端,在选举过程中参与投票。

observer:观察者角色,用户接收客户端的请求,并将写请求转发给leader,同时同步leader状态,但是不参与投票。bserver日的是扩展系统,提高伸缩性。

client:客户端角色,用于向zookeeper发起请求 。

4.zookeeper工作流程

        Zookeeper修改数据的流程:Zookeeper集群中每个Server在内存中存储了一份数据,在Zookeeper启动时,将从实例中选举一个Server作为1eader,Leader负责处理数据更新等操作,当且仅当大多数Server在内存中成功修改数据,才认为数据修改成功。

        Zookeeper写的流程为:客户端Client首先和一个Server或者bserve通信,发起写请求,然后Server将写请求转发给Leader,Leader再将写请求转发给其它Server,其它Server在接收到写请求后写入数据并响应Leader,Leader在接收到大多数写成功回应后,认为数据写成功,最后响应client,完成一次写操作过程。

四、zookeeper在kafka中的作用

1.Broker注册

        Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:
/brokers/ids
        每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0……N]
        Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的BrokerID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker岩机,则对应的临时节点也会被自动删除。

2.Topic注册

        在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录。

3.生产者负载均衡

        由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡

(1)四层负载均衡

        根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker的新增和删除

(2)使用Zookeeper进行负载均衡

        由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制

4.消费者负载均衡

        与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的个消费者,不同的消费者分组消费白己特定的Topic下面的消息,互不干扰

5.记录信息分区与消费者关系

      消费组(ConsumerGroup)下有多个Consumer(消费者)。对于每个消费者组(ConsumerGroup),Kafka都会为其分配一个全局唯一的GroupID,Group内部的所有消费者共享该ID。订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)。
      同时,Kafka为每个消费者分配一个ConsumerID,通常采用Hostname:UUID形式表示
      在Kafka中,规定了每个消总分区只能被同组的一个消费者进行消费,因此,需要在
 Zookeeper上记录消息分区与consumer之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其consumer ID写入到Zookeeper对应消息分区的临时节点上,例如:
 /consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一个消息分区的标识,节点内容就是该消息分区上消费者的ConsumerID。

6、消息消费进度Offset记录

        在消费者对指定消分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。ffset在Zookeeper中由一个专门节点进行记录,其节点路径为:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值

7.消费者注册

消费者服务器在初始化启动时加入消费者分组的步骤如下:

(1)注册到消费者分组

        每个消费者服务器启动时,都会到zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。

(2)对消费者分组中的消费者的变化注册监听

        每个消费者都需要关注所属消费者分组中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少就触发消费者的负载均衡。

(3)对Broker服务器变化注册监听

        消费者需要对/broker/ids/[-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡

(4)进行消费者负载均衡

为了让同一个Topic下不同分区的消息尽量均衡地被多个消费者消费而进行消费者与消
息分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡

 五.集群部署kafka

1:修改主机hosts文件(所有主机都配置)

[root@kafka1 ~]# vim /etc/hosts
192.168.10.101 kafka1
192.168.10.102 kafka2
192.168.10.103 kafka3

2:zookeeper的部署

(1)安装zookeeper(三个节点的配置相同)
[root@kafka1 ~]# systemctl stop firewalld
[root@kafka1 ~]# setenforce 0
[root@kafka1 ~]# yum -y install java
[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper
 (2)创建数据保存目录(三个节点的配置相同)
[root@kafka1 ~]# cd /etc/zookeeper/
[root@kafka1 zookeeper]# mkdir zookeeper-data
(3)修改配置文件(三个节点的配置相同)
[root@kafka1 zookeeper]# cd /etc/zookeeper/conf
[root@kafka1 ~]# mv zoo_sample.cfg zoo.cfg
[root@kafka1 ~]# vim zoo.cfg 
dataDir=/etc/zookeeper/zookeeper-data
clientPort=2181
server.1=192.168.10.101:2888:3888
server.2=192.168.10.102:2888:3888
server.3=192.168.10.103:2888:3888

注释:zookeeper只用的端口

2181:对cline端提供服务

3888:选举leader使用

2888:集群内机器通讯使用(Leader监听此端口)

(4)创建节点id文件(按server编号设置这个id,三个机器不同)

节点1:

[root@kafka1 conf]# echo '1' > /etc/zookeeper/zookeeper-data/myid

节点2:

[root@kafka2 conf]# echo '2' > /etc/zookeeper/zookeeper-data/myid

节点3:

[root@kafka3 conf]# echo '3' > /etc/zookeeper/zookeeper-data/myid
(5)三个节点启动zookeeper进程
[root@kafka1 conf]# cd /etc/zookeeper/
[root@kafka1 zookeeper]# ./bin/zkServer.sh start
[root@kafka1 zookeeper]# ./bin/zkServer.sh status

 3:kafka的部署

(1)kafka的安装(三个节点的配置相同)
[root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
(2)修改配置文件
[root@kafka1 ~]# cd /etc/kafka/[root@kafka2 kafka]# vim config/server.propertiesbroker.id=1##21行  修改,注意其他两个的id分别是2和3listeners=PLAINTEXT://192.168.10.101:9092#31行  修改,其他节点改成各自的IP地址log.dirs=/etc/kafka/kafka-logs## 60行  修改num.partitions=1##65行 分片数量,不能超过节点数zookeeper.connect=192.168.10.101:2181,192.168.10.102:2181,192.168.10.103:218##123行,填写集群中各节点的地址和端口

注释:

9092是kafka的监听端口

(3)创建日志目录(三个节点的配置相同)
[root@kafka1 kafka]# mkdir /etc/kafka/kafka-logs
(4)在所有kafka节点上执行开启命令,生成kafka群集(三个节点的配置相同)
[root@kafka1 kafka]# ./bin/kafka-server-start.sh config/server.properties &

如果启动不了,可以将/etc/kafka/kafka-logs中的数据清除再试试

4:测试

创建topic(任意一个节点)

bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test

列出topic(任意一个节点)

bin/kafka-topics.sh --list --zookeeper kafka1:2181bin/kafka-topics.sh --list --zookeeper kafka2:2181bin/kafka-topics.sh --list --zookeeper kafka3:2181

生产消息

bin/kafka-console-producer.sh --broker-list kafka1:9092 -topic test

消费消息

bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test

输入完成会可以去生产者方测试输入,看消费者方有无信息 

删除topic

bin/kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com