Spring集成Kafka示例

前言:随着实时数据处理需求的增长,Kafka 作为高吞吐、低延迟的消息中间件,已成为构建分布式系统的重要组件。Spring Boot 凭借其简洁的开发体验和强大的生态支持,成为集成 Kafka 的首选框架。本文将带你快速实现 Spring Boot 与 Kafka 的整合,涵盖生产者、消费者的基本使用及核心配置优化,助你构建可靠、高效的消息通信能力。

1.添加Maven依赖

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

2. application.yml 配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-classname: org.apache.kafka.common.serialization.StringSerializer
      value-classname: org.apache.kafka.common.serialization.StringSerializer
      properties:
        acks: all # 确保写入所有副本才确认成功,提示数据可靠性
        retries: 5
        retry.backoff.ms: 1000
        enable.idempotence: true # 开启可以消息去重,防重发;若不需要幂等性可关闭
        max.in.flight.requests.per.connection: 5 # 控制并发批次
        batch.size: 131072  # 128KB 提升吞吐,控制每批容纳的消息数量
        linger.ms: 5 # 等待时间,用于攒批
        compression.type: snappy # 压缩算法,平衡压缩率与 CPU 开销
    consumer:
      group-id: demo-group
      auto-offset-reset: earliest
      key-classname: org.apache.kafka.common.serialization.StringDeserializer
      value-classname: org.apache.kafka.common.serialization.StringDeserializer

3. Kafka 配置类(KafkaConfig.java

package com.example.kafkademo.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 131072); // 128KB
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

4. Kafka 生产者(KafkaProducer.java

package com.example.kafkademo.producer;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }

    // 同步发送(需要等待结果)
    public void sendMessageSync(String topic, String message) throws Exception {
        kafkaTemplate.send(topic, message).get();  // 同步等待结果
    }
}

5. Kafka 消费者(KafkaConsumer.java

package com.example.kafkademo.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "input-topic", groupId = "demo-group")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

6. 控制器(KafkaController.java

package com.example.kafkademo.controller;

import com.example.kafkademo.producer.KafkaProducer;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/kafka")
public class KafkaController {

    private final KafkaProducer kafkaProducer;

    public KafkaController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @PostMapping("/send")
    public String send(@RequestBody String message) {
        kafkaProducer.sendMessage("input-topic", message);
        return "Message sent asynchronously";
    }

    @PostMapping("/send-sync")
    public String sendSync(@RequestBody String message) throws Exception {
        kafkaProducer.sendMessageSync("input-topic", message);
        return "Message sent synchronously and acknowledged by Kafka";
    }
}