Spring集成Kafka示例
前言:随着实时数据处理需求的增长,Kafka 作为高吞吐、低延迟的消息中间件,已成为构建分布式系统的重要组件。Spring Boot 凭借其简洁的开发体验和强大的生态支持,成为集成 Kafka 的首选框架。本文将带你快速实现 Spring Boot 与 Kafka 的整合,涵盖生产者、消费者的基本使用及核心配置优化,助你构建可靠、高效的消息通信能力。
1.添加Maven依赖
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
2. application.yml 配置
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-classname: org.apache.kafka.common.serialization.StringSerializer
value-classname: org.apache.kafka.common.serialization.StringSerializer
properties:
acks: all # 确保写入所有副本才确认成功,提示数据可靠性
retries: 5
retry.backoff.ms: 1000
enable.idempotence: true # 开启可以消息去重,防重发;若不需要幂等性可关闭
max.in.flight.requests.per.connection: 5 # 控制并发批次
batch.size: 131072 # 128KB 提升吞吐,控制每批容纳的消息数量
linger.ms: 5 # 等待时间,用于攒批
compression.type: snappy # 压缩算法,平衡压缩率与 CPU 开销
consumer:
group-id: demo-group
auto-offset-reset: earliest
key-classname: org.apache.kafka.common.serialization.StringDeserializer
value-classname: org.apache.kafka.common.serialization.StringDeserializer
3. Kafka 配置类(KafkaConfig.java)
package com.example.kafkademo.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 131072); // 128KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
4. Kafka 生产者(KafkaProducer.java)
package com.example.kafkademo.producer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
// 同步发送(需要等待结果)
public void sendMessageSync(String topic, String message) throws Exception {
kafkaTemplate.send(topic, message).get(); // 同步等待结果
}
}
5. Kafka 消费者(KafkaConsumer.java)
package com.example.kafkademo.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "input-topic", groupId = "demo-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
6. 控制器(KafkaController.java)
package com.example.kafkademo.controller;
import com.example.kafkademo.producer.KafkaProducer;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private final KafkaProducer kafkaProducer;
public KafkaController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@PostMapping("/send")
public String send(@RequestBody String message) {
kafkaProducer.sendMessage("input-topic", message);
return "Message sent asynchronously";
}
@PostMapping("/send-sync")
public String sendSync(@RequestBody String message) throws Exception {
kafkaProducer.sendMessageSync("input-topic", message);
return "Message sent synchronously and acknowledged by Kafka";
}
}
评论区