一、上下文
从《Kafka-Streams初识》中我们已经大致了解了Kafka-Streams,下面我们通过官网继续深入学习它
二、概念
Kafka Streams是一个客户端库,用于处理和分析存储在Kafka中的数据。它建立在重要的流处理概念之上,例如正确区分事件时间和流转时长、窗口支持以及简单而高效的应用程序状态管理和实时查询。且Kafka Streams的进入门槛很低,在单台机器上就可以进行快速验证。
Kafka Streams 优点:
1、设计为一个简单轻量级的客户端库,可以轻松嵌入到任何Java应用程序中,并与用户用于流式应用程序的任何现有打包、部署和操作工具集成
2、除了作为内部消息传递层的Apache Kafka本身之外,对系统没有外部依赖;值得注意的是,它使用Kafka的分区模型来水平扩展处理,同时保持强大的排序保证。
3、支持容错本地状态,这可以实现非常快速和高效的有状态操作,例如窗口连接和聚合。
4、支持精确一次处理语义学,以保证即使Streams客户端或Kafka broker在处理过程中出现故障,每条记录也只能处理一次。
5、使用one-record-at-a-time处理来实现毫秒级的处理延迟,并支持基于事件时间的窗口操作,记录无序到达。
6、提供必要的流处理原语,以及高级流DSL和低级处理器API
流处理拓扑
流是Kafka Streams提供的最重要的抽象:它代表一个无界的、持续更新的数据集。流是不可变数据记录的有序、可重放和容错序列,其中数据记录被定义为键值对。
流处理应用程序是利用Kafka Streams库的任何程序。它通过一个或多个处理器拓扑定义其计算逻辑,其中处理器拓扑是由流(边)连接的流处理器(节点)的图。
流处理器是处理器拓扑中的节点;它表示通过在拓扑中一次从其上游处理器接收一个输入记录来转换流中的数据的处理步骤,将其操作应用于它,并且随后可以产生一个或多个输出记录到其下游处理器。
拓扑中有两个特殊的处理器:
Source Processor:源处理器是一种特殊类型的流处理器,没有任何上游处理器。它通过消费来自一个或多个Kafka topic的记录并将它们转发到它的下游处理器,从一个或多个Kafka topic生成一个输入流到它的拓扑。
Sink Processor:接收器处理器是一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器接收到的任何记录发送到指定的Kafka topic
请注意,在普通处理器节点中,在处理当前记录时也可以访问其他远程系统。因此,处理后的结果可以流式传输回Kafka或写入外部系统。
以下是官方给的示例图:
Kafka Streams提供了两种定义流处理拓扑的方法:
1、Kafka Streams DSL提供了最常见的数据转换操作,如映射、过滤、连接和开箱即用的聚合;
2、较低级别的处理器API允许开发人员定义和连接自定义处理器以及与状态存储进行交互。
处理器拓扑只是流处理代码的逻辑抽象。在运行时,逻辑拓扑在应用程序内部被实例化和复制以进行并行处理。
时间
时间是流处理中的一个关键概念,例如,一些操作(如窗口)是基于时间边界定义的。
时间在流中的常见概念如下:
Event time:
事件或数据记录发生的时间点,即最初是在“源头”创建的。例如:如果事件是汽车中全球定位系统传感器报告的地理位置变化,那么相关的事件时间将是全球定位系统传感器捕获位置变化的时间。
Processing time:
事件或数据记录恰好被流处理应用程序处理的时间点,即记录正在被消费的时间点。流转时长可能是比原始事件时间晚几毫秒、几小时或几天等。示例:想象一个分析应用程序,它读取并处理从汽车传感器报告的地理位置数据,以将其呈现给车队管理仪表板。这里,分析应用程序中的处理时间可能是事件时间后的毫秒或秒(例如,对于基于Apache Kafka和Kafka Streams的实时管道)或小时(例如,对于基于Apache Hadoop或Apache Spark的批处理管道)。
Ingestion time:
Kafka broker 将事件或数据记录存储在Topic分区中的时间点。与事件时间的区别在于,此摄取时间戳是在Kafka代理将记录附加到目标主题时生成的,而不是在“在源”创建记录时生成的。与流转时长的区别在于流转时长是流处理应用程序处理记录的时间。例如,如果一条记录从未被处理过,则它没有流转时长的概念,但它仍然有一个摄取时间。
事件时间和摄取时间之间的选择实际上是通过Kafka(不是Kafka Streams)的配置完成的:从Kafka 0.10. x开始,时间戳会自动嵌入到Kafka消息中。根据Kafka的配置,这些时间戳代表事件时间或摄取时间。可以在broker级别或每个Topic上指定相应的Kafka配置设置。Kafka Streams中的默认时间戳提取器将按原样检索这些嵌入的时间戳。因此,应用程序的有效时间语义学取决于这些嵌入时间戳的有效Kafka配置。
Kafka Streams通过TimestampExtractor接口为每条数据记录分配一个时间戳。这些每条记录的时间戳描述了流在时间方面的进度,并被时间相关的操作(如窗口操作)所利用。因此,只有当新记录到达处理器时,这个时间才会提前。我们将这个数据驱动的时间称为应用程序的流时间,以区别于该应用程序实际执行时的挂钟时间。TimestampExtractor接口的具体实现将为流时间定义提供不同的语义学。
例如,根据数据记录的实际内容(如嵌入的时间戳字段)检索或计算时间戳以提供事件时间语义学,并返回当前的挂钟时间,从而产生流转时长语义学。因此,开发人员可以根据自己的业务需求实施不同的时间概念。
最后,每当Kafka Streams应用程序将记录写入Kafka时,它也会为这些新记录分配时间戳。时间戳的分配方式取决于上下文:
流表二元性
在工作实践中,流和数据库表都是经常会用到的。流和表之间实际上存在密切的关系,即所谓的流表二元性。Kafka在许多方面利用了这种二元性:例如,使您的应用程序具有弹性,支持容错有状态处理,或者针对您的应用程序的最新处理结果运行交互式查询。此外,除了内部使用之外,Kafka Streams API还允许开发人员在自己的应用程序中利用这种二元性。
因此,流可以被视为表,而表也可以被视为流。例如,Kafka的日志压缩功能利用了这种流表二元性。
表的简单形式是键值对的集合,也称为map或关联array。这样的表可能如下所示:
Stream as Table:
流可以被视为表的更改日志,其中流中的每个数据记录捕获表的状态更改。因此,流是变相的表,通过从头到尾重播更改日志以重建表,它可以很容易地变成“真正的”表。比如这个例子:聚合流中的数据记录——例如从页面浏览事件流中计算用户的页面浏览总数——将返回一个表(这里的键和值分别是用户及其相应的页面浏览次数)。
Table as Stream:
表可以被认为是流中每个键的最新值在某个时间点的快照(流的数据记录是键值对)。因此,表是变相的流,通过迭代表中的每个键值条目,它可以很容易地变成“真实”流。
让我们用一个例子来说明这一点。想象一个按用户跟踪页面浏览总数的表(下面图表的第一列)。随着时间的推移,每当处理新的页面浏览事件时,表的状态都会相应地更新。在这里,不同时间点之间的状态变化——以及表的不同修订——可以表示为更改日志流(第二列)。
有趣的是,由于流表二元性,可以使用相同的流来重建原始表(第三列):
例如,使用相同的机制通过更改数据捕获(CDC)复制数据库,并在Kafka Streams中跨机器复制其所谓的状态存储以实现容错。流表二元性是一个如此重要的概念,以至于Kafka Streams通过KStream、KTable和GlobalKTable接口对其进行显式建模。
聚合操作
聚合操作接受一个输入流或表,并通过将多个输入记录组合成一个输出记录来生成一个新表。聚合的示例是计算计数或总和。类似关系型数据库中的sum() 、count() 操作。
在Kafka Streams DSL中,聚合的输入流可以是KStream或KTable,但输出流将始终是KTable。这允许Kafka Streams在生成和发出值后,在进一步记录无序到达时更新聚合值。当这种无序到达发生时,聚合的KStream或KTable发出一个新的聚合值。因为输出是KTable,所以在后续处理步骤中,新值被认为用相同的键覆盖旧值。
窗口
窗口化允许我们控制如何将具有相同键的记录分组以进行有状态操作,例如聚合或连接到所谓的窗口中。每个记录键跟踪Windows。
Kafka Streams DSL中提供了窗口操作。使用窗口时,您可以为窗口指定宽限期。此宽限期控制Kafka Streams将等待给定窗口的乱序数据记录多长时间。如果一条记录在窗口的宽限期之后到达,则该记录将被丢弃,并且不会在该窗口中进行处理。具体来说,如果一条记录的时间戳表明它属于一个窗口,但当前的流时间大于窗口结束加上宽限期,则该记录将被丢弃。
乱序记录在现实世界中总是可能的,应该在您的应用程序中正确考虑。这取决于如何处理乱序记录的有效时间语义学。在处理时间的情况下,语义学是“当记录被处理时”,这意味着乱序记录的概念不适用,因为根据定义,没有记录可以乱序。因此,乱序记录只能被认为是事件时间的乱序记录。在这两种情况下,Kafka Streams都能够正确处理乱序记录。
状态
一些流处理应用程序不需要状态,这意味着消息的处理独立于所有其他消息的处理。然而,能够维护状态为复杂的流处理应用程序开辟了许多可能性:您可以加入输入流,或者对数据记录进行分组和聚合。Kafka Streams DSL提供了许多这样的有状态运算符。
Kafka Streams提供了所谓的状态存储,流处理应用程序可以使用它来存储和查询数据。这是实现有状态操作时的一个重要能力。Kafka Streams中的每个任务都嵌入了一个或多个状态存储,可以通过API访问这些状态存储,以存储和查询处理所需的数据。这些状态存储可以是持久的键值存储、内存中的哈希图或另一种方便的数据结构。Kafka Streams为本地状态存储提供了容错和自动恢复。
Kafka Streams允许通过创建状态存储的流处理应用程序外部的方法、线程、进程或应用程序对状态存储进行直接只读查询。这是通过称为交互式查询的功能提供的。所有存储都被命名,交互式查询仅公开底层实现的读取操作。
乱序处理
除了保证每条记录将被精确地处理一次之外,许多流处理应用程序将面临的另一个问题是如何处理可能影响其业务逻辑的乱序数据。在Kafka Streams中,有两个原因可能导致其时间戳方面的乱序数据到达:
1、在Topic分区内,记录的时间戳可能不会随着偏移量的增加而单调增加。由于Kafka Streams将始终尝试按照偏移顺序处理主题分区内的记录,因此可能会导致在同一Topic分区中,时间戳较大(但偏移量较小)的记录比时间戳较小(但偏移量较大)的记录更早被处理。
2、在可能正在处理多个Topic分区的流任务中,如果用户将应用程序配置为不等待所有分区包含一些缓冲数据,并从时间戳最小的分区中选择处理下一条记录,那么稍后当为其他Topic分区获取一些记录时,它们的时间戳可能小于从另一个Topic分区获取的已处理记录。
对于无状态操作,乱序数据不会影响处理逻辑,因为一次只考虑一条记录,而不会查看过去处理记录的历史记录;然而,对于聚合和连接等有状态操作,乱序数据可能会导致处理逻辑不正确。如果用户想要处理这种乱序数据,通常他们需要允许他们的应用程序等待更长的时间,同时在等待时间内记录他们的状态,即在延迟、成本和正确性之间做出权衡决定。特别是在Kafka Streams中,用户可以为窗口聚合配置他们的窗口运算符,以实现这种权衡(。至于Join,用户可以使用版本化的状态存储来解决乱序数据的问题,但默认情况下不会处理乱序数据:
对于Stream-Stream连接,所有三种类型(内部、外部、左侧)都正确处理乱序记录。
对于Stream-Table连接,如果不使用版本化存储,则不会处理乱序记录(即,Streams应用程序不会检查乱序记录,只是按偏移顺序处理所有记录),因此可能会产生不可预测的结果。使用版本化存储,流端乱序数据将通过在表中执行基于时间戳的查找来正确处理。表端乱序数据仍然没有处理。
三、架构
Kafka Streams通过构建Kafka生产者和消费者库并利用Kafka的本机功能来提供数据并行、分布式协调、容错和操作简单性,从而简化了应用程序开发。下面我们来看看Kafka Streams是如何在幕后工作的。
下图是官方给的使用Kafka Streams库的应用程序的结构图。让我们来看看一些细节。
流的分区和任务
Kafka的消息传递层对数据进行分区,用于存储和传输数据。Kafka Streams对数据进行分区,用于处理数据。在这两种情况下,这种分区都支持数据局部性、弹性、可扩展性、高性能和容错性。Kafka Streams使用分区和任务的概念作为其基于Kafka Topic分区的并行模型的逻辑单元。在并行性的上下文中,Kafka Streams和Kafka之间有密切的联系:
- 每个流分区都是一个完全有序的数据记录序列,并映射到一个Kafka Topic分区。
- 流中的数据记录映射到来自该Topic的Kafka消息。
- 数据记录的key决定了Kafka和Kafka Streams中数据的分区,即如何将数据路由到Topic中的特定分区
流任务可以独立并行处理,无需人工干预:
Kafka Streams根据应用程序的输入流分区创建固定数量的任务,每个任务分配一个来自输入流的分区列表(即Kafka Topic)。任务的分区分配永远不会改变,因此每个任务都是应用程序的固定并行单元。然后,任务可以根据分配的分区实例化自己的处理器拓扑;它们还为每个分配的分区维护一个缓冲区,并一次性处理来自这些记录缓冲区的消息。
因此,应用程序可以运行的最大并行度受流任务的最大数量限制,流任务本身由应用程序正在读取的输入Topic的最大分区数决定。例如,如果您的输入Topic有5个分区,那么您最多可以运行5个应用程序实例。这些实例将协作处理Topic的数据。如果您运行的应用程序实例数量大于输入主题的分区,“多余”的应用程序实例将启动但保持空闲;但是,如果其中一个繁忙实例出现故障,其中一个空闲实例将恢复前者的工作
Kafka Streams不是资源管理器,而是一个在其流处理应用程序运行的任何地方“运行”的库。任务可以由库自动分配给正在运行的应用程序实例。任务的分区分配永远不会改变;如果应用程序实例失败,其所有分配的任务将在其他实例上自动重新启动,并继续从同一流分区中使用。
注意:Topic分区被分配给任务,任务被分配给所有实例上的所有线程,以尽最大努力权衡有状态任务的负载平衡和粘度。对此分配,Kafka Streams使用StreamsPartitionAssignor类,不允许您更改为不同的分配者。如果您尝试使用不同的分配者,Kafka Streams会忽略它。
下图是官方例子:显示了两个任务,每个任务都分配有输入流的一个分区
线程模型
Kafka Streams允许用户配置库可用于在应用程序实例中并行处理的线程数。每个线程可以使用其处理器拓扑独立执行一个或多个任务。例如,下图显示了一个流线程运行两个流任务。
启动更多流线程或应用程序的更多实例仅仅相当于复制拓扑并让它处理Kafka分区的不同子集,有效地并行化处理。值得注意的是,线程之间没有共享状态,因此不需要线程间协调。这使得跨应用程序实例和线程并行运行拓扑变得非常简单。利用Kafka的协调功能,Kafka Streams透明地处理了各种流线程之间Kafka Topic分区的分配。
从Kafka 2.8开始,可以像扩展Kafka Stream客户端一样扩展流线程。只需添加或删除流线程,Kafka Streams将负责重新分配分区。另外还可以添加线程来替换已死亡的流线程,从而无需重新启动客户端以恢复正在运行的线程数。
本地状态存储
Kafka Streams提供了所谓的状态存储,流处理应用程序可以使用它来存储和查询数据,这是实现有状态操作时的一个重要功能。例如,当您调用有状态运算符(如join() 或 aggregate() 或者当您对流进行窗口化时,Kafka Streams DSL会自动创建和管理此类状态存储。
Kafka Streams应用程序中的每个流任务都可以嵌入一个或多个本地状态存储,可以通过API访问这些存储和查询处理所需的数据。Kafka Streams为此类本地状态存储提供容错和自动恢复。
下图显示了两个流任务及其专用的本地状态存储
容错
Kafka Streams建立在Kafka中本地集成的容错功能之上。Kafka分区高度可用和复制;因此,当流数据持久化到Kafka时,即使应用程序发生故障并需要重新处理它,它也可用。Kafka Streams中的任务利用Kafka消费者客户端提供的容错功能来处理故障。如果任务在失败的机器上运行,Kafka Streams会自动在应用程序的剩余运行实例之一中重新启动任务。
此外,Kafka Streams还确保本地状态存储对故障具有鲁棒性。对于每个状态存储,它维护一个复制的更改日志Kafka Topic,在其中跟踪任何状态更新。这些更改日志Topic也被分区,以便每个本地状态存储实例以及访问存储的任务都有自己的专用更改日志Topic分区。更改日志Topic上启用了日志压缩,以便可以安全地清除旧数据,以防止Topic无限期增长。
如果任务在一台失败的机器上运行并在另一台机器上重新启动,Kafka Streams保证通过在恢复对新启动的任务的处理之前重播相应的更改日志Topic来将其关联的状态存储恢复到故障前的内容。因此,故障处理对最终用户完全透明。
请注意,任务(重新)初始化的成本通常主要取决于通过重播状态存储相关的更改日志Topic来恢复状态的时间。为了最大限度地减少此恢复时间,用户可以将其应用程序配置为具有本地状态的备用副本(即状态的完全复制副本)。当任务迁移发生时,Kafka Streams会将任务分配给已经存在此类备用副本的应用程序实例,以最大限度地降低任务(重新)初始化成本。
从2.6开始,Kafka Streams将保证任务只分配给具有完全捕获状态的本地副本的实例(如果存在这样的实例)。备用任务将增加在故障情况下存在捕获实例的可能性。
另外还可以配置具有机架感知功能的备用副本。配置后,Kafka Streams将尝试将备用任务分配到与活动任务不同的“机架”上,从而在活动任务的机架发生故障时具有更快的恢复时间