您的位置:首页 > 财经 > 产业 > 编写一个android应用程序_python编程代码大全_推广联盟平台_广州抖音推广公司

编写一个android应用程序_python编程代码大全_推广联盟平台_广州抖音推广公司

2025/5/22 21:49:21 来源:https://blog.csdn.net/weixin_50902636/article/details/145991886  浏览:    关键词:编写一个android应用程序_python编程代码大全_推广联盟平台_广州抖音推广公司
编写一个android应用程序_python编程代码大全_推广联盟平台_广州抖音推广公司

文章目录

  • 消息队列
  • kafka
    • kafka传统定义
    • kafka特性
    • Kafka 写流程
    • kafka读消息流程
    • kafka高性能的原因
    • kafka基础组成
    • kafka持久化
      • 如何在文件中找到消息?
    • kafka基础操作
      • Kafka分区数计算
      • Kafka机器数量计算
      • 1、用的较多的命令
      • 2、启动kafka
      • 3、Topic 管理指令
      • 补充:查看topic中消息内容
      • 4、消费组情况指令
      • 重设消费者组位移
      • 消息组消费情况
      • 5、增删节点后的数据均衡
      • 6、设置 Topic 过期时间
      • 7、kafka性能测试
      • 8、使用脚本生产/消费消息
      • 9、查询所有的groupid && 查询详情(Lag查询)
      • 10、动态删除zookeeper日志和快照数据
      • 11、查询一个topic有哪些consumer group在消费
      • 12、加鉴权命令
      • 13、查询消息是否发送成功
      • 14、查询消息生产时间
      • 15、kafka partition扩容 && kafka broker扩容
      • 16、平衡leader
      • 17、kafka 动态修改配置
    • kafka常用性能调优
      • 1、磁盘目录优化
      • 2、JVM参数配置
      • 3、日志数据刷盘策略
      • 4、日志保留时间
      • 5、参数调优
    • 数据可靠性保证方式
    • 什么是ISR?
      • ISR机制介绍
      • ISR作用
      • ISR的优缺点
    • kafka怎么解决高延迟的问题?
    • 消息积压问题排查处理
    • 消息丢失问题
    • 消息重复消费问题
  • zookeeper
    • zookeeper选举流程

消息队列

	消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件主要解决应用耦合、异步消息、流量削峰等问题,实现高性能、高可用,可伸缩的最终一致性架构。常用的消息队列有RocketMq、RabbitMq、kafka等。消息中间件是在消息的传输过程中保存消息的容器,消息中间件再将消息从它的源中继到它的的目标时充当中间人的作用。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功的传递为止,但消息队列保存消息也是有期限的。

消息队列组件

	producer 生产者  发送消息到消息队列consumer 消费者  从消息队列接收消息topic    主题    一种支持消息多个订阅者的机制.不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者message  消息    由消息描述符、消息体组成queue    队列	   队列是消息的安全存放地,队列存储消息直到被应用程序处理broker   队列管理器  负责消息存储、确认、重试等,包含多个队列channel  通道    队列管理器之间传递消息的管道

消息队列的作用

	解耦: 允许独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束削峰/缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。异步通信: 允许用户把一个消息放入队列,但不立即处理它,在需要的时候再去处理它

消息队列的两种模式

1、点对点模式消息持久化到一个队列中,由消费者主动的去拉取消息进行消费,将有一个或多个消费者消费队列中的数据。但一条消息只能被消费一次,当消费者消费了队列中的某条数据之后,该条数据从消息队列中删除.
优点:消费者拉取消息的频率可以由自己控制
缺点:消息队列是否有消息需要被消费,在消费者端是无法感知,所有需要在消费者端进行监控

在这里插入图片描述

2、发布/订阅模式消息被持久化到一个topic中。消费者可以订阅一个或者多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,且被消费完后不会立马删除消息.类型是订阅微信公众号。
优点:消费者被动接受推送,无需感知消息队列是否有需要被消费的消息
缺点:由于消费者机器性能不一样,所有处理消息的能力也会不一样,且消息队列无法感知消费者的消费速度.

在这里插入图片描述

kafka

kafka传统定义

	kafka是一个分布式的基于发布/订阅模式的消息队列系统;是一个开源的分布式事件流平台,通过⾼性能TCP ⽹络协议进⾏通信的 服务器和 客户端组成,基于zookeeper协调的分布式日志系统。它适合离线和在线消费消息。kafka消息保留在磁盘上,并且在集群内复制防止数据丢失。
发布/订阅: 消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息

kafka特性

	1、持久性、可靠性: 以时间复杂度为O(1)的方式提供消息持久化能力。消息被持久化到本地磁盘,并且支持数据备份防止数据丢失,消息被消费仍然不会被立即删除,而是会有过期时间。2、高吞吐率、低延迟: 一个topic分为多个partitionKafka的写入操作实际上只把数据写入操作系统的page cache,再由操作系统决定什么时候将数据写入磁盘,减少了同IO的操作;同时写入操作是append追加的方式,避免磁盘随机写操作;保持为顺序读写操作Kafka消费端的读操作利用了零拷贝技术提升数据传输的效率,同时先尝试从OS的page cache中读取数据,减少了和磁盘的交互3、容错性: 允许集群中节点失败(若副本数量为 n,则允许 n-1 个节点失败)基于Zookeeper实现Kafka服务状态检测和维护,多副本的情况下,只要有一个副本正常,仍可以正常对外提供服务4、高并发: 支持数千个客户端同时读写队列模式:所有 consumer 都在一个队列,这样消息就在队内进行分区并行消费订阅-发布模式:所有 consumer 都不再一个队列,这样 topic 消息可以广播给所有订阅的消费者5、可扩展性: kafka集群支持热扩展,由分布式协调中心Zookeeper统一维护Kafka服务器的状态,扩展Kafka集群只需要启动新的Kafka服务
kafka优点:1、多个生产者2、多个消费者(而且多个消费者之间互不影响)3、基于磁盘的数据存储4、伸缩性5、高性能

Kafka 写流程

	1.连接 zk 集群,从zk中拿到对应的 topic 的 partition 信息和 partition 的 leader 的相关信息。注:Kafka 2.8.0 已移出对 zookeeper 的依赖。2.向对应 broker 发消息3.客户端在发送消息时,必须指定消息所属的 Topic 和消息值 Value,此外还可以指定消息所属的 Partition 以及消息的 Key。4.对消息做序列化处理5.如果消息记录中指定了 Partition,则 Partitioner 不做任何事情;否则,Partitioner 根据消息的 key 得到一个 Partition。这时生产者就知道向哪个 Topic下的哪个 Partition 发送这条消息。6.消息被添加到相应的 batch 中,独立的线程将这些 batch 发送到 Broker 上( 注意: 消息不是一条一条发往 broker 的,而是会在 客户端本地缓存一批数量后,在发出去,因此客户端是以 批-batch (批量)为单位发送消息的,即一批当中包含一条或多条消息;同样,broker 也是以批为单位进行数据存储的)7.broker 收到消息会返回一个响应。如果消息成功写入 Kafka,则返回成功信息,内容包含了 Topic 信息、Patition信息、消息在 Partition 中的 Offset 信息;若失败返回一个错误。

kafka读消息流程

	1.连接 zk 集群,从 zk 中拿到对应的 topic 的 partition 信息和 partition 的 leader 的相关信息2.连接到对应的 leader 对应的 broker3.consumer 通过请求将希望读取的 topic、partition 以及对应的 offset 发送给 leader4.leader 根据 offset 等信息定位到 segment(索引文件和日志文件)5.根据索引文件中的内容,定位到日志文件中该偏移量对应的开始位置读取相应长度的数据并返回给 consumer

kafka高性能的原因

1、分布式存储架构
2、磁盘顺序读写kafka的消息写入文件都是追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘顺序读写
3、读写数据的批量batch处理以及压缩传输
4、数据传输的零拷贝

在这里插入图片描述

kafka基础组成

0.partition 分区同⼀个主题下的分区包含的消息是不同的。消息是以追加的形式存储在分区中。消息在被追加到分区⽇志⽂件的时候都会分配⼀个特定的偏移量(offset),offset是消息在分区中的唯⼀标识。Kafka保证分区有序,并不保证主题消息有序。分区分配策略: round-robin 和range
1.Topic主题一个主题(topic)的消息实际上由多个队列存储,一个队列在kafka上称为一个分区。一个topic可以分布到多个broker服务器上一个topic可以分为多个partition,每个分区是一个有序的队列。分区中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个分区中的顺序将消息发给消费者。简单理解为面向不同业务数据设置的标识符,内部会维持类似于队列的结构,生产者和消费者都是面向主题进行数据传送。Kafka可以将主题划分为多个分区(Partition),会根据分区规则选择把消息存储到哪个分区中,只要分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。另外,多个订阅者可以从一个或者多个分区中同时消费数据,以支撑海量数据处理能力,由于Producer和Consumer都只会与Leader角色的分区副本相连,所以kafka需要以集群的组织形式提供主题下的消息高可用。kafka支持主备复制,所以消息具备高可用和持久性。一个分区可以有多个副本,这些副本保存在不同的broker上。每个分区的副本中都会有一个作为Leader。当一个broker失败时,Leader在这台broker上的分区都会变得不可用,kafka会通过ISR机制自动移除Leader,再其他副本中选一个作为新的Leader。
2. Broker(kafka所在的服务器节点 队列管理器)一个集群由多个broker组成,这样才能实现负载均衡以及容错,一个broker可以维护多个topic(或说更具体点是partition)每个Broker都能够处理读写请求,本质上Broker就是作为⼀个存储层来存放我们的数据, ⽣产者客户端把数据写⼊到Broker中, 然后消费组客户端去Broker中拉取数据进⾏消费broker是无状态的,它们通过Zookeeper来维护集群,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
3.producer(生产者)kafka向broker推送消息的客户端,可以对接数据上游,按批次(batch-size)发送数据
4.consumer(消费者)从kafka broker拉取(pull)消息的客户端,可以对接数据下游,按分区消费数据
5. partition(分区)将一个topic划分为多个partition并将其分布到不同的broker上.多分区解决了I/O性能瓶颈问题
6.consumer group(消费者组)在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据
7.replica(partiton数据副本)可以根据业务需求对topic下的partition设置副本以提高kafka集群数据的可用性。⼀个分区可以有多个副本,⼀个分区的多个副本分为 Leader副本、Follower副本, Leader副本负责提供读写能⼒,Follower副本会从Leader副本 同步数据⽤于备份。多副本解决了⾼可⽤问题
8.leader(副本老大)多个partition副本下负责对外服务的副本称为leader.只有Leader副本才会承担读写职责, Follower副本⽤于同步备份
9.follwer(副本小弟)负责实时从leader中同步数据,leader发生故障时,在ISR队列中的某个follower才会竞选成为新的leader
10.offset 偏移量kafka为每条在分区的消息保存一个偏移量offset,这也是消费者在分区的位置.

在这里插入图片描述

在这里插入图片描述

kafka持久化

	每个topic(主题)将消息分成多个分区,每个分区在存储层面是append log文件。任何发布到此分区的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),分区是以文件的形式存储在文件系统中,log是文件根据broker中的配置保留一定时间后删除来释放磁盘空间。

在这里插入图片描述

	由于每次生产者生产消息都会追加到log文件尾部,为了防止日志文件过大影响消息的查找定位效率,kafka引入了分片和索引的机制,将一个partition又分为若干个segment,每个segment文件为1G左右,每个segment包含一个存储数据的log文件和查找的index索引文件以及timeindex文件,这些文件位于一个文件夹下,文件夹的命名规则为topic名称+分区序号

在这里插入图片描述

如何在文件中找到消息?

	Producer将数据写入kafka后,集群会将数据保存到磁盘中。每个Topic有多个partition,每个partition在服务器上的表现形式为一个个文件夹,每个文件夹下有多个segment组,每组segment文件中又包含.index文件、.log文件、.timeindex文件三个文件,log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

在这里插入图片描述

当查找消息时,配置segment+offset进行查找,如上图所示,查找一个offset为221118的message,过程如下:1、先找到offset为221118的message所在的segment文件(利用二分法查找),这里在第二个segment文件2、找到对应segment的.index文件,起始位置偏移量为221113+1,要查找的offset为221118的message在该index的偏移量为5。同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引3、根据找到的相对offset为4的索引(offset为221117)确定message存储的物理偏移位置为256。打开数据文件log,从位置为256的那个地方开始顺序扫描直到找到offset为221118的那条Message。因此,消费者利用segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来实现高效的查找数据和消费。

kafka基础操作

参考文档: https://www.cnblogs.com/yeyuzhuanjia/p/18005645

Kafka分区数计算

(1)创建一个只有1个分区的topic(2)测试这个topic的producer吞吐量和consumer吞吐量。(3)假设他们的值分别是Tp和Tc,单位可以是MB/s。(4)然后假设总的目标吞吐量是Tt,那么分区数 = Tt / min(Tp,Tc)例如: producer吞吐量 = 20m/s;consumer吞吐量 = 50m/s,期望吞吐量100m/s;
分区数 = 100 / 20 = 5分区
分区数一般设置为: 3-10个

Kafka机器数量计算

#先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。
Kafka机器数量(经验公式)= 2 *(峰值生产速度 * 副本数 / 100)+ 11)峰值生产速度峰值生产速度可以压测得到。
2)副本数副本数默认是1个,在企业里面2-3个都有,2个居多。副本多可以提高可靠性,但是会降低网络传输效率。比如我们的峰值生产速度是50M/s。副本数为2。
Kafka机器数量 = 2 *(50 * 2 / 100)+ 1 = 3台

1、用的较多的命令

   Kafka-console-consumer.sh   --消费者工具kafka-console-producer.sh   --生产者工具kafka-consumer-groups.sh    --消费者组工具Kafka-topics.sh             --topics的增删改查   Kafka-producer-perf-test.sh --生产压力测试Kafka-consumer-perf-test.sh --消费压力测试  

2、启动kafka

bin/kafka-server-start.sh -daemon  conf/server.properties
然后执行jps   看有无kafa进程多一个  出来 

3、Topic 管理指令

在这里插入图片描述

管理 Topic ,包括 创建、删除、分区扩容、查询 Topic 详细信息、查看 Topic 列表 等 
⽀持正则表达式匹配Topic#1、列一下 125这台上的所有topics  --exclude-internal 排除kafka内部topic,⽐如 __consumer_offsets-*./kafka-topics.sh --list --bootstrap-server  10.208.37.125:9092  --exclude-internal
 #2、创建一个 topic 叫 test 两个分区 两个副本(leader+follower数而不是1个leader 2个follower) ./kafka-topics.sh --create  --bootstrap-server  10.208.37.125:9092   --topic test  --partitions  2  --replication-factor 2
 #3、通过连接到zk,获取某个主题的详细信息./kafka-topics.sh --describe   --zookeeper 10.208.37.125:2181  --topic test2 --exclude-internal./kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal
 #4、删除topic ./kafka-topics.sh  --delete   --zookeeper 10.208.37.125:2181   --topic test./kafka-topics.sh  --delete   --bootstrap-server xxxx:9092 --topic test
 #5、Topic 分区扩容#单个topic分区扩容./kafka-topics.sh --zookeeper  localhost:2181 --alter --topic test --partitions 4 #扩容全部topic./kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4

补充:查看topic中消息内容

#查看指定topic中的消息内容
/usr/hdp/3.1.4.0-315/kafka/bin/kafka-console-consumer.sh --bootstrap-server  10.8.21.208:9092,10.8.21.209:9092,10.8.21.211:9092 --
topic "af3cff45-16ad-4532-ac76-c3b8bda80970" --from-beginning #查看指定topic中的所匹配的消息内容
/usr/hdp/3.1.4.0-315/kafka/bin/kafka-console-consumer.sh --bootstrap-server  10.8.21.208:9092,10.8.21.209:9092,10.8.21.211:9092 --
topic "af3cff45-16ad-4532-ac76-c3b8bda80970" --from-beginning  |grep "xxxx"

4、消费组情况指令

#查看group的消费情况先查询消费者组./bin/kafka-consumer-groups.sh --bootstrap-server  10.208.37.125:9092 --list在查看对应消费者组的消息消费情况./bin/kafka-consumer-groups.sh --bootstrap-server  10.208.37.125:9092 --describe --group groupname#删除group消费者组./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group test-group

重设消费者组位移

前提:consumer group状态必须是inactive的,即不能是处于正在工作中的状态。总体来说,重设位移的流程由3步组成,如下图所示:

在这里插入图片描述

确定topic作用域,当前有3种作用域指定方式:--all-topics(为consumer group下所有topic的所有分区调整位移)--topic t1 --topic t2(为指定的若干个topic的所有分区调整位移)--topic t1:0,1,2(为指定的topic分区调整位移)

在这里插入图片描述

#注意:--execute:执行真正的位移调整--export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用Earliest策略: 把位移调整到当前最早位移处$KAFKA_DIR/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-earliest --execute
Latest策略: 把位移调整到当前最新位移处$KAFKA_DIR/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-latest --execute
Current策略: 把位移调整到当前最新提交位移处$KAFKA_DIR/bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
Specified-Offset策略: 把位移调整到指定位移处$KAFKA_DIR/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-offset 2000 --execute
Shift-By-N策略: 把位移调整到当前位移+N处(N可以是负值)bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group  --topic test --reset-offsets --shift-by <offset_N> --execute
DateTime策略: (把位移调整到大于给定时间的最小位移处)时间需要减8bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group   --topic test --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute

消息组消费情况

在这里插入图片描述

# TOPIC: group对应的topic
# PARTITION:partition编号,从0开始0-5表示有6个partition
# CURRENT-OFFSET:此消费者当前已消费的offset
# LOG-END-OFFSET:生产者在此partition分区上已提交确认的offset
# LAG:两个offset的差值,就是常说的积压。此数值过大为异常。
# HOST:消费者所在的服务器ip 
# CLIENT-ID:消费者的信息

5、增删节点后的数据均衡

#增加数据节点后,虽然新节点上已经启动了broker,但 kafka 不会自动均衡数据,需要手动执行。命令工具:kafka-reassign-partitions.sh编写配置文件 move-json-file.json ,告诉 kafka 你希望哪些 topic 要重新分区:
{"topics": [{"topic": "xxt_topic"},{"topic": "xxfile_topic"},{"topic": "xxem_topic"}],"version": 1
}#执行命令生成分配信息:要注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它
# 下面 --broker-list 参数 对应的是 brokerid
$ ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file ~/mv.json --broker-list "1001,1002" --generate
Current partition replica assignment #当前分配信息
{"version":1,"partitions":[{"topic":"event_topic","partition":2,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":8,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":3,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":6,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"event_topic","partition":9,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"item_topic","partition":0,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"event_topic","partition":0,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"event_topic","partition":5,"replicas":[1002,1003],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":2,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":1,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":4,"replicas":[1003,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":1,"replicas":[1003,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":7,"replicas":[1003,1002],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":0,"replicas":[1003,1002],"log_dirs":["any","any"]}]}Proposed partition reassignment configuration #分配后的信息
{"version":1,"partitions":[{"topic":"event_topic","partition":7,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":1,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":1,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"item_topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":4,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":9,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":6,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":3,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":8,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":5,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":2,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":2,"replicas":[1001,1002],"log_dirs":["any","any"]}]}将上面得到期望的重新分配方式文件保存在一个 json 文件里面reassignment-json-file.json,然后通过参数 --execute 执行分配该命令也可以用于以下使用场景:•给分区增加副本,你只需要在 第 2 步生成的内容里面,在 replicas 参数中加入你想要增加的 副本所在 broker id 信息即可,这样执行的时候会自动在 对应 broker 上创建副本。•重新分配分区

6、设置 Topic 过期时间

# 设置 topic 过期时间(单位 毫秒)
### 3600000 毫秒 = 1小时
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --entity-name topic-devops-elk-log-hechuan-huanbao --entity-type topics --add-config retention.ms=3600000# 查看 topic 配置
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --describe --entity-type topics --entity-name topic-devops-elk-log-hechuan-huanbao

7、kafka性能测试

# 测试生产者
# 向指定主题发送了 1 千万条消息,每条消息大小是 1KB
# 它会打印出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时
结论:1、Producer端传输效率和partition数成正比2、Producer端传输效率和replication数量成反比3、Producer端传输数量和单条记录大小成正比参数解释:--topic topic名称,本例为test-topic--num-records 总共需要发送的消息数,本例为100000--record-size 每个记录的字节数,本例为1024,即1kb--throughput 每秒钟发送的记录数,本例为-1--producer-props  发送端的配置信息acks 确认机制,此处为-1,即所有follower同步完成后返回ack确认linger.ms 控制消息发送的延迟时间conpression.type 将消息压缩。数据将由 producer 压缩,以压缩格式写入服务器,并由 consumer 解压缩。压缩将提高 consumer 的吞吐量,但需付出一定的解压成本。 命令:bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
#测试消费者性能
结论:1、Customer端消费效率和partitions数成正比(但影响不大)2、Cumsomer端消费速率和thread成正比,但是达到一定数量(parttion数量)以后趋于平稳,再增加也不会继续变大命令:bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic --threads 1 --num-fetch-threads 1 Kafka性能测试结论:由于每个kafka集群环境差异都很大,本文不代表所有情况。但是一个通常的kafka生产集群以下特性是应该达到的:1.单个consumer的消费速率必须远大于单个producer的生产速率。2.单个broker数据生产效率不应小于50M/s,否则增加JVM内存,并增加缓冲区大小。

8、使用脚本生产/消费消息

# 连接到test-topic,然后通过输入+会车生产消息
$ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic  --producer-property
># --from-beginning: 指定从开始消费消息,否则会从最新的地方开始消费消息
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property

9、查询所有的groupid && 查询详情(Lag查询)

==============list==================
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
==============describe==============
bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group ${consumer-group}
====================================
bin/kafka-consumer-offset-checker.sh --zookeeper ${zk_ip:zk_port/chroot} --group {group} --topic {topic} 查询特定topic下consumer group的消费情况

10、动态删除zookeeper日志和快照数据

./bin/zkCleanup.sh /export/servers/zookeeper-3.4.6/data -n 1000  [-n 1000表示保留快照日志的数量]

11、查询一个topic有哪些consumer group在消费

方法一:
#! /bin/bash
topic=resource-create  (要查询的topic)
ConsumerGroups=`bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092`(注意服务是否为9092端口,下同)
for consumergroup in $ConsumerGroups
doresult=`bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group $consumergroup | grep $topic`if [ -n "$result" ]thenecho "topic:$topic"echo "group:$consumergroup"  (输出消费topic的消费者组)fi
done方法二:for i in `bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list`;do echo $i;bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group $i 2>/dev/null |grep  要查询的topic;done

12、加鉴权命令

==============启动kafka=================
bin/secured-kafka-server-start.sh -daemon config/server.properties==============查看Group详情==============
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --command-config config/consumer.properties --describe --group ${group}==============生产消息===================
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_midop==============消费消息===================
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_midop==============授权生产者write,describe权限==============
bin/kafka-acls.sh --add --authorizer-properties zookeeper.connect=${zk_ip:zk_port/chroot} --allow-principal User:jvessel --producer --topic=*==============授权消费者read权限,需要授权topic与group权限==============
bin/kafka-acls.sh --add --authorizer-properties zookeeper.connect=${zk_ip:zk_port/chroot} --allow-principal User:jvessel --consumer --topic=* --group=*==============查看授权====================
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=${zk_ip:zk_port/chroot} --list -cluster==============取消授权==============
--------------取消授权topic--------------
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=${zk_ip:zk_port/chroot} --remove --topic *--------------取消授权group--------------
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=${zk_ip:zk_port/chroot} --remove --group *--------------取消授权集群--------------
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=${zk_ip:zk_port/chroot} --remove --cluster--------------生产压测--------------
bin/kafka-producer-perf-test.sh --topic test_midop --num-records 100 --record-size 100 --throughput 20 --producer.config config/producer.properties  --producer-props bootstrap.servers=${brokerip}:9092

13、查询消息是否发送成功

	bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --new-consumer --from-beginning --topic ${topic_name} | grep ${message-key-word}

14、查询消息生产时间

./kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /data/${partition_name}/*.log./kafka-run-class.sh kafka.tools.DumpLogSegments --files /data/xxxxxxxxxxx.log

15、kafka partition扩容 && kafka broker扩容

a) broker扩容
kakfa集群有3个节点,下面我们将节点数目扩容为5个,操作如下:
【操作前】
查看zookeeper当中broker数目,执行如下命令,获取输出内容:
bin/zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 0] ls /kafka/brokers/ids
[1, 2, 3]
【第1步】
在三台机器上安装kafka程序
【第2步】
由于zookeeper存放kafka的broker节点信息、集群信息等元数据信息,因此kafka扩容不需要对zookeeper进行扩容,所以kafka扩容机器同样使用原zookeeper即可,即新安装的三个kafka配置信息中zookeeper.connect仍然保持不变如下所示:
zookeeper.connect=jd1:2181,jd2:2181,jd3:2181/kafka【第3步】
将三台机器的broker.id在原id为1,2,3的基础上设置成4,5,详细配置如下:
############################# Server Basics #############################
broker.id=4
default.replication.factor=3
num.partitions=2
auto.create.topics.enable=True
delete.topic.enable=true
queued.max.requests=1000
############################# Socket Server Settings #############################
listeners=PLAINTEXT://:9092
port=9092
advertised.host.name=192.168.0.7
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/export/work/kafka_9092/data
num.recovery.threads.per.data.dir=1
############################# Log Retention Policy #############################
log.retention.minutes=5
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=jd1:2181,jd2:2181,jd3:2181/kafka
zookeeper.connection.timeout.ms=6000
############################# END #############################
修改配置后,重新启动集群即完成broker扩展
bin/zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 0] ls /kafka/brokers/ids
[1, 2, 3]【第4步】
kafka扩容后原topic仍然记录为3个分区状态因此需要对topic进行重新分配即replica扩容
b) replica扩容
下面我们将testjd的副本数进行扩展,将replica原来的2个扩展为3个,操作如下:
【操作前】
Topic:testjd    PartitionCount:2        ReplicationFactor:2     Configs:Topic: testjd   Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2Topic: testjd   Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3【第1步】生成reassign plan
编辑文件指定待处理的topic,格式是json,格式如下,vim execute.json:
{"version": 1,"partitions": [{"topic": "testjd","partition": 0,"replicas": [1,2,3]},{"topic": "testjd","partition": 1,"replicas": [2,1,3]}]
}【第2步】执行扩容execute.jsonbin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --execute \
--reassignment-json-file execute.json
【注意】broker-id的顺序,排在第一个的id就是leader节点的id,不同分区的第一个id打散分布。【第3步】查看执行结果
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka  -verify\
--reassignment-json-file plan.json
Status of partition reassignment:  
Reassignment of partition [testjd,0] completed successfully
Reassignment of partition [testjd,1] completed successfully【第4步】验证:查看拓扑分布是否符合预期,收发消息是否正常,日志滚动是否正常(无Error)
Topic:testjd    PartitionCount:2        ReplicationFactor:3     Configs:Topic: testjd   Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3Topic: testjd   Partition: 1    Leader: 2       Replicas: 2,1,3 Isr: 2,3,1
c) partition扩容 下面我们将testjd的分区数进行扩展,将partitions由原来的2个扩展为7个,操作如下:
【操作前】
Topic: testjd        PartitionCount:2        ReplicationFactor:3     Configs:
Topic: testjd        Partition: 0    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
Topic: testjd        Partition: 1    Leader: 1       Replicas: 0,1,2 Isr: 1,2,0
【执行命令】
bin/kafka-topics.sh --alter  \
--zookeeper *** \
--topic XXX \
--partitions {N}
【操作后】
Topic:testjd PartitionCount:7        ReplicationFactor:3     Configs:
Topic: testjd        Partition: 0    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
Topic: testjd        Partition: 1    Leader: 1       Replicas: 0,1,2 Isr: 1,2,0
Topic: testjd        Partition: 2    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
Topic: testjd        Partition: 3    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
Topic: testjd        Partition: 4    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
Topic: testjd        Partition: 5    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0

16、平衡leader

bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/kafka 
==============平衡某partition===========
bin/kafka-preferred-replica-election --zookeeper zk_host:port/kafka --path-to-json-file xx.json

17、kafka 动态修改配置

==============cleanup.policy==============
bin/kafka-configs.sh --alter --zookeeper ${zk_ip:zk_port/chroot} --entity-type topics --entity-name ${topic_name} --add-config cleanup.policy=delete
bin/kafka-configs.sh --alter --zookeeper ${zk_ip:zk_port/chroot} --entity-type topics --entity-name ${topic-name} --add-config cleanup.policy=compact
bin/kafka-configs.sh --alter --zookeeper ${zk_ip:zk_port/chroot} --entity-type topics --entity-name ${topic_name} --add-config cleanup.policy=[compact,delete] 通过加方括号[]添加多个参数==============日志保存时间==============
bin/kafka-configs.sh --alter --zookeeper ${zk_ip:zk_port/chroot} --entity-type topics --entity-name ${topic_name} --add-config retention.ms=${xxxx}==============删除动态修改配置==============
bin/kafka-configs.sh --alter --zookeeper ${zk_ip:zk_port/chroot} --entity-type topics --entity-name ${topic_name} --delete-config retention.ms==============消息添加服务器时间==============
bin/kafka-configs.sh --alter --zookeeper ${zk_ip:zk_port/chroot} --entity-type topics --add-config message.timestamp.type=LogAppendTime --entity-name ${topic_name}

kafka常用性能调优

1、磁盘目录优化

	kafka读写的单位是partition,因此将一个topic拆分为多个partition可以提高吞吐量。但是这里有个前提,就是不同 partition 需要位于不同的磁盘(可以在同一个机器)。如果多个 partition 位于同一个磁盘,那么意味着有多个进程同时对一个磁盘的多个文件进行读写,使得操作系统会对磁盘读写进行频繁调度,也就是破坏了磁盘读写的连续性

2、JVM参数配置

修改bin/kafka-start-server.sh中的jvm设置,假设机器是32G内存,推荐使用最新的 G1 来代替 CMS 作为垃圾回收器,
这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长时间,
当然,因为像kafka,rocketmq,es这些中间件,写数据到磁盘会用到操作系统的page cache,所以JVM内存不宜分配过大,
需要给操作系统的缓存留出几个G。
KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn10G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"G1相比较于CMS的优势:•G1 是一种适用于服务器端的垃圾回收器,很好的平衡了吞吐量和响应能力•对于内存的划分方法不同,Eden, Survivor, Old 区域不再固定,使用内存会更高效。G1 通过对内存进行 Region 的划分,有效避免了内存碎片问题。•G1 可以指定GC时可用于暂停线程的时间(不保证严格遵守)。而 CMS 并不提供可控选项。•CMS 只有在 FullGC 之后会重新合并压缩内存,而G1把回收和合并集合在一起。•CMS 只能使用在Old区,在清理Young 时一般是配合使用 ParNew,而 G1 可以统一两类分区的回收算法。
•G1的适用场景:•JVM占用内存较大(At least 4G)•应用本身频繁申请、释放内存,进而产生大量内存碎片时。•对于GC时间较为敏感的应用。

3、日志数据刷盘策略

为了大幅度提高 producer 写入吞吐量,需要定期批量写文件。
总之,Kafka 要么是定量刷盘,要么是定时刷盘。
有 3 个参数可配置:•log.flush.interval.messages = 100000每当 producer 写入100000 条数据时,就把数据刷到磁盘控制消息到多少条就要强制刷新磁盘。Kafka 会在写入 page cache 的时候顺便检测一下•log.flush.interval.ms=500每隔 500 秒,就刷一次盘•log.flush.scheduler.interval.ms=200间隔多少毫秒,就检测数据是否需要刷新到磁盘上
注意:	1、后面两个参数是要配合在一起的。举个例子,假如说 log.flush.internval.ms 设置为 500,而 log.flush.schduler.interval.ms 设置为 200。也就是说,Kafka 每隔 200 毫秒检查一下,如果上次刷新到现在已经过了 500 毫秒,那么就刷新一次磁盘。2、log.flush.interval.messages 和 log.flush.interval.ms 都设置了的话,那么就是它们俩之间任何一个条件满足了,都会刷新磁盘。		

4、日志保留时间

	当 kafka server 的被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能导致磁盘空间不够用,kafka 默认是保留7天。参数: log.retention.hours = 168

5、参数调优

高并发下建议配置:
1、调整broker配置broker处理消息的最大线程数,默认值为3num.network.threads=xxxbroker处理磁盘IO的线程数,默认值为8num.io.threads=xxxbroker 用于发送网络数据的 socket 发送缓冲区大小,单位是字节.默认值为100kb,即102400字节socket.send.buffer.bytes=102400broker 用于接收网络数据的 socket 接收缓冲区大小,单位是字节.默认值为100kb,即102400字节socket.receive.buffer.bytes=102400控制 Kafka broker 接受的请求的最大大小.默认值为100MB,即 104857600 字节。通过调整此参数,可以防止客户端发送过大的消息socket.request.max.bytes=104857600
建议配置一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1. num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍.
2、调整Replica 复制配置:每个follow 从leader拉取数据进行同步数据,follow 同步性能由这几个参数决定:num.replica.fetchers: 拉取线程数replica.fetch.min.bytes: 拉取最小字节数replica.fetch.max.bytes: 拉取最大字节数replica.fetch.wait.max.ms: 最大等待时间优化建议:num.replica.fetchers配置多可以提高follower的I/O并发度,单位时间内leader持有更多的请求,相应的负载会增大,需要根据机器硬件资源做权衡。replica.fetch.min.bytes=1 默认配置是1字节,否则读取消息不及时replica.fetch.max.bytes=5*1024*1024 默认是1MB,这个值太小了,5MB为宜,根据业务情况调整。replica.fetch.wait.max.ms  follower 拉取速率,频率过高,会导致CPU飙升,因为leader无数据同步,leader会积压大量无效请求情况。
3、调整生产者及消费者配置
生产者配置优化:acks=all                提供最高的可靠性,但会增加延迟compression.type=snappy  启用压缩可以提高吞吐量并减少带宽消耗batch.size=16384         通过批量发送消息,提高吞吐量,但增加了延迟linger.ms=5              控制批次的等待时间,增加延迟以提高吞吐量max.request.size=1048576 限制请求的最大大小,防止过大的消息请求影响性能
消费者配置优化:fetch.min.bytes 消费者每次从服务器拉取最小数据量,增加这个值,可以减少消费者请求次数,降低网络消耗,提升消费者处理性能。fetch.max.bytes 与上面配置相对应,这是消费者每次从服务器拉取最大数据量,增加这个值,也有同样的效果。fetch.max.wait.ms 这个配置指定了消费者在读取到 fetch.min.bytes 设置的数据量之前,最多可以等待的时间。增加这个值,也有同样的效果。max.poll.records 消费者每次可以拉取的最大记录数,增加这个值,也有同样的效果,不过会增加每次消息处理的时间。max.partition.fetch.bytes 消费者从每个分区里拉取的最大数据量

数据可靠性保证方式

那么partition所在的broker什么时候发送ack给producer?有什么选择方案?
方式问题优点缺点
leader落盘后直接返回ack如果producer收到ack后leader挂了,follower还未同步成功,造成消息丢失。低延迟、性能高容易造成消息丢失
follower全部同步成功后返回ack不会出现消息丢失问题leader选举时容忍n台节点故障,需要n+1个副本,随便选一个follower即可。高延迟,需要等待follower都同步成功。
半数以上follower同步成功后返回ack不会出现消息丢失问题低延迟选举新leader时,容忍n台节点故障,需要2n+1个副本。

什么是ISR?

ISR机制介绍

	ISR 的全称叫做In-Sync Replicas(同步副本集),ISR 动态维护了一个和 Leader 副本保持同步副本集合,ISR 中的副本全部都和 Leader 的数据保持同步。ISR 机制通过副本冗余机制,提供了 kafka 消息的高可靠性,做到故障转移,保障服务的可用性。ISR 平衡了主从架构下,复制方案的选择(同步 / 异步 / 少数服从多数),让使用者根据参数自行选择。

ISR作用

1、生产消息时的ACK确认机制当我们生产消息的时候,到底要写入多少副本才能算成功呢?通过 ISR 就可以知晓了哪些 follower 与 Leader 保持着同步,在写入消息的时候,设置写入处于 ISR 中所有的副本才算成功2、Leader选举当 Leader 挂了之后,我们应该选择哪个 follower 来成为新的 Leader 呢?从 ISR 中选择对应的 follower 成为新的 Leader。3、最小 ISR副本数配置Kafka提供了 min.insync.replicas 参数配置,这个参数可以配置最少 ISR 中需要多少个副本,才能继续提供写服务。如果设置为 2,一旦 ISR 中的个数小于 2,那么就不再提供写服务,牺牲一定的可用性,来保障这种高可靠的场景需求。

ISR的优缺点

ISR机制的优点:1、提供了消息的高可靠性,即使部分副本失效,只要ISR中还有副本存活,消息就不会丢失。2、支持故障转移,当Leader失效时,ISR中的Follower可以顺利接替成为新的Leader提高了系统的可用性。3、通过ACK机制,生产者可以根据自己的需求,在可靠性和吞吐量之间进行权衡。ISR机制的缺点:1、同步复制会增加消息发送的延迟,因为生产者需要等待所有ISR中的副本完成复制。2、ISR中的副本越多,消息发送的延迟就越高。3、ISR中的副本数量受限于min.insync.replicas参数,如果副本数量低于该值,就无法提供写服务,会降低系统的可用性。

kafka怎么解决高延迟的问题?

	如果一台follower因为某种原因迟迟不能同步数据,那么leader就需要一直等它而不能发送ack,为了解决这个问题,kafka采用了一个ISR方案。leader维护了一个动态的in-sync-replica (与leader保持同步的副本集合,该集合中的副本和leader数据是一致的),当isr中的follower和leader数据同步完成之后,leader就会向follower发送ack,如果isr中的follower长时间未向leader发送同步完成消息,leader会将其从isr中剔除,等待的时长由replica.time.max.ms参数设定。leader故障之后,就会从isr中选举出新的leader。

消息积压问题排查处理

在这里插入图片描述

可能导致出现消息挤压的情景:1、线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。解决方案:此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。如图上所示2、由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息解决方案:此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。这个死信队列,kafka并没有提供,需要整合第三方插件
#排查kafka消息积压的方式
1、使用./bin/kafka-consumer-groups.sh --bootstrap-server  10.208.37.125:9092 --describe --group groupname命令行查看LAG项是否存在积压2、提供对应的服务日志给研发,让研发进行排查确认
#处理积压问题
1、简单粗暴法。跟研发确认是否清空消息进行重新消费,如果可以的话则直接删除
2、consumer导致kafka积压了大量消息方法:1.增大partion数量kafka-topics.sh --bootstrap-server <broker-list> --alter --topic <topic-name> --partitions <new-number-of-partitions>2.通过修改代码增加消费者加并发,服务, 扩大消费线程3.通过修改代码增加消费组服务数量4.kafka单机升级成了集群5.避免消费者消费消息时间过长,导致超时6.使Kafka分区之间的数据均匀分布场景:1.如果是Kafka消费能力不足,则可以考虑增加 topic 的 partition 的个数,同时提升消费者组的消费者数量,消费数 = 分区数 (二者缺一不可)2.若是下游数据处理不及时,则提高每批次拉取的数量。批次拉取数量过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。通过修改下面的消费者配置,可以提高消费者处理速度:fetch.min.bytes 消费者每次从服务器拉取最小数据量,增加这个值,可以减少消费者请求次数,降低网络消耗,提升消费者处理性能。fetch.max.bytes 与上面配置相对应,这是消费者每次从服务器拉取最大数据量,增加这个值,也有同样的效果。fetch.max.wait.ms 这个配置指定了消费者在读取到 fetch.min.bytes 设置的数据量之前,最多可以等待的时间。增加这个值,也有同样的效果。max.poll.records 消费者每次可以拉取的最大记录数,增加这个值,也有同样的效果,不过会增加每次消息处理的时间。max.partition.fetch.bytes 消费者从每个分区里拉取的最大数据量

消息丢失问题

kafka在生产端发送消息 和 消费端消费消息时都可能会丢失一些消息
可能会出现消息丢失的原因:1、生产者丢失消息: 生产者在发送消息时,会有一个ack机制,当ack=0或者ack=1时,都有可能丢失消息a、acks=0:表示生产者producer不需要等待任何broker确认收到消息的回复,就可以继续发送消息,性能最高,但最容易丢失消息,如果实际环境对性能要求高、对丢失消息不敏感的情况可以使用这种b、acks=1:至少要等待leader已经成功将数据写入到本地log,但是不需要等待所有follower是否成功写入,就可以继续发送消息。这种情况下,如果follower没有成功备份数据且leader此时宕机,则消息也会丢失c、acks=-1或者all:leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。2、消费端丢失消息: 消费端丢消息最主要体现在消费端offset的自动提交,如果开启了自动提交,万一消费到数据还没处理完,此时consumer直接宕机了,未处理完的数据 丢失了,下次也消费不到了,因为offset已经提交完毕,下次会从offset出开始消费新消息。解决办法://手动提交offset/*** 注意如果要使用手动提交offset,需要以下三点* ①:配置文件配置手动提交方式* ②:加上参数Acknowledgment ack* ③:方法中使用ack.acknowledge();手动提交*/ack.acknowledge();3、刷盘参数配置不合理:为了减少磁盘写入的次数,broker 会将消息暂时缓存起来(存储在 page cache 中),当消息的个数达到一定阈值或者过了一定的时间间隔后,再 flush 到磁盘。当遇到断电或机器故障的情况,PageCache 上的数据可能未来得及刷新到磁盘,会造成消息丢失。

消息重复消费问题

消息的重复消费在生产端和消费端都可能发生
1、生产端消息重复发送情景:发送消息如果配置了重试机制,比如由于网络波动,生产者未收到来自broker收到消息的返回响应,就会触发重试机制,3秒后再次发送此消息。broker之前已经收到过这个消息,但生产者由于触发了重试机制,就导致了消息的重复发送。那么broker就会在磁盘缓存多条同样的消息,消费端从broker拉取消息进行消费时,就会造成重复消费。注意: kafka新版本已经在broker中保证了接收消息的幂等性(比如2.4版本),只需在生产者加上以下参数enable.idempotence=ture // 1. 设置幂等  acks=all // 2. 当 enable.idempotence 为 true,这里默认为 all  max.in.flight.requests.per.connection=5 // 3. 注意这个参数需要 <= 5,否则会抛异常 OutOfOrderSequenceException新版kafka的broker幂等性具体实现原理:kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和Sequence Number,如果相同不会再接收。PID: 每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID对用户完全是透明的。生产者如果重启则会生成新的PID。Sequence Number: 对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。2、消费端消息重复消费情景:对于消费端消息的重复消费问题,如果消费端拉取了一部分数据,消费完毕后,准备执行手动提交(或自动提交)时,消费者挂掉了!此时offset还未提交呢,那么当服务重启时,还是会拉取相同的一批数据重复处理!造成消息重复消费无论是生产者还是消费者的重复消息,一般都会在消费端卡死,做幂等性处理。
幂等性可以用redis的setnx分布式锁来实现。比如操作订单消息,可以把订单id作为key,在消费消息时,通过setnx命令设置一下,
offset提交完成后,在redis中删除订单id的key。setnx命令保证同样的订单消息,只有一个能被消费,可有效保证消费的幂等性

zookeeper

ZooKeeper是一个开放源码的分布式应用程序协调服务。保证服务之间的同步,它是一个为分布式应用提供一致性服务的软件.
主要用来管理和协调broker,并存储Kafka的元数据(例如 有多少topic、partition、consumer)1. Broker之间互相感知每个Broker启动的时候都会在Zookeeper中注册⼀个临时节点, 然后关闭的时候就会删除这个节点。所以Broker们通过Zookeeper中的节点数据就可以指定当前哪些Broker在线。2. Topic的基本信息 当创建⼀个新的Topic的时候,它的⼀些基本信息也会在Zookeeper中注册节点并存储这些信息。3. Controller注册 每个集群中都会选择⼀个Broker来承担Controller的⻆⾊, Controller的选举就是通过在Zookeeper创建节点来实现的,谁先注册这个节点谁就是Controller, 当Controller宕机了,其他Broker就会收到监听则⽴⻢会再去尝试注册节点来竞选这个⻆⾊。主要包含三个角色和四种状态:领导者(leader): 负责进行投票的发起和决议,更新系统状态学习者(learner): follower(跟随者) 用于接收客户请求并向客户端返回结果,在选举中参与投票observer(观察者) 可以接收客户端连接,将写请求转发给leader节点,但observer不参与投票,只同步leader状态,提高读取速度客户端(client): 请求发起方四种状态: leading、following、observing、lookingLOOKING: 当前Server不知道leader是谁,正在搜寻。LEADING: 当前Server即为选举出来的leader。FOLLOWING: leader已经选举出来,当前Server与之同步。OBSERVING: observer的行为在大多数情况下与follower完全一致,但是他们不参加选举和投票,而仅仅接受(observing)选举和投票的结果

zookeeper选举流程

选举有以下两种场景

第一种: 服务器初始化启动的Leader选举。(1) 每个Server发出一个投票投给自己。由于是初始情况,Server1和Server2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myid和ZXID,使用(myid, ZXID)来表示,此时Server1的投票为(1, 0),Server2的投票为(2, 0),然后各自将这个投票发给集群中其他机器。(2) 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自LOOKING状态的服务器。(3) 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行PK,PK规则如下:1、优先检查ZXID。ZXID比较大的服务器优先作为Leader。2、如果ZXID相同,那么就比较myid。myid较大的服务器作为Leader服务器。对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较两者的ZXID,均为0,再比较myid,此时Server2的myid最大,于是更新自己的投票为(2, 0),然后重新投票,对于Server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。(4) 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于Server1、Server2而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了Leader。(5) 改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为FOLLOWING,如果是Leader,就变更为LEADING。
第二种: 服务器运行时期的Leader选举(服务器运行期间无法和Leader保持连接)。在Zookeeper运行期间,即便当有非Leader服务器宕机或新加入,此时也不会影响Leader,但是一旦Leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮Leader选举,其过程和启动时期的Leader选举过程基本一致。假设正在运行的有Server1、Server2、Server3三台服务器,当前Leader是Server2,若某一时刻Leader挂了,此时便开始Leader选举。选举过程如下:(1)变更状态。Leader挂后,余下的非Observer服务器都会将自己的服务器状态变更为LOOKING,然后开始进入Leader选举流程。(2)每个Server会发出一个投票。在这个过程中,需要生成投票信息(myid,ZXID)每个服务器上的ZXID可能不同,我们假定Server1的ZXID为123,而Server3的ZXID为122;在第一轮投票中,Server1和Server3都会投自己,产生投票(1, 123),(3, 122),然后各自将投票发送给集群中所有机器。(3)接收来自各个服务器的投票。与启动时过程相同。(4)处理投票。与启动时过程相同,此时,Server1将会成为Leader。(5)统计投票。与启动时过程相同。(6)改变服务器的状态。与启动时过程相同。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com