您的位置:首页 > 文旅 > 旅游 > springboot引入kafka

springboot引入kafka

2024/10/4 1:06:55 来源:https://blog.csdn.net/jifgnie/article/details/140602277  浏览:    关键词:springboot引入kafka

一. Kafka 简介

什么是 Kafka?
Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,并于 2011 年开源。它用于构建实时数据管道和流应用,能够处理和分析流数据。Kafka 的核心概念
Producer(生产者):发送消息到 Kafka 主题。
Consumer(消费者):从 Kafka 主题中读取消息。
Broker(代理):Kafka 服务器,负责接收和存储消息。
Topic(主题):消息分类的逻辑单元。
Partition(分区):主题的物理分区,便于并行处理。
Offset(偏移量):每条消息在分区中的唯一标识符。

二、Spring Boot 2.7.0 集成 Kafka

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>
spring:kafka:bootstrap-servers: localhost:9092consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: default-group  # 默认的消费者组consumer-group-1:group-id: group1topic: topic1consumer-group-2:group-id: group2topic: topic2producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializertemplate:default-topic: my-topic

Kafka 配置类

 import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Bean(name = "kafkaListenerContainerFactoryGroup1")public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryGroup1(@Value("${spring.kafka.consumer-group-1.group-id}") String groupId) {return createFactory(groupId);}@Bean(name = "kafkaListenerContainerFactoryGroup2")public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryGroup2(@Value("${spring.kafka.consumer-group-2.group-id}") String groupId) {return createFactory(groupId);}private ConcurrentKafkaListenerContainerFactory<String, String> createFactory(String groupId) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();Map<String, Object> props = new HashMap<>(consumerConfigs());props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));return factory;}
}
auto.offset.reset 配置选项
earliest:当没有初始偏移量或者当前偏移量超出范围时,消费者将从最早的可用数据开始读取。这通常用于新消费者加入时,从头开始读取所有历史数据。
latest:当没有初始偏移量或者当前偏移量超出范围时,消费者将从最新的数据开始读取。这通常用于新消费者加入时,只读取从现在开始的数据。
none:如果消费者没有找到当前偏移量或偏移量超出范围,则会抛出异常。这要求消费者必须有有效的偏移量。
anything else:其他值将导致消费者抛出异常。

Kafka 消费者服务

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {@KafkaListener(topics = "${spring.kafka.consumer-group-1.topic}", containerFactory = "kafkaListenerContainerFactoryGroup1")public void listenGroup1(String message) {System.out.println("Received message in group1: " + message);}@KafkaListener(topics = "${spring.kafka.consumer-group-2.topic}", containerFactory = "kafkaListenerContainerFactoryGroup2")public void listenGroup2(String message) {System.out.println("Received message in group2: " + message);}
}

创建 Kafka 生产者

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String sendMessage(@RequestParam("message") String message) {kafkaTemplate.send("my-topic", message);return "Message sent to Kafka topic";}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com