Kafka

default

Kafka 的“难”不在于入门,而在于精通和稳定运维。它的设计极为精妙,提供了极高的灵活性和控制力,但这也意味着责任从框架转移到了开发者/运维者身上。你必须非常清楚自己的业务在一致性、可用性、分区容错性(CAP) 上的权衡,并做出正确的配置选择。

基本概念

Broker:Kafka 集群中的节点,存储和转发消息。一个 broker 对应一个物理节点。

主题(Topic):消息的分类单元,所有消息都存储在主题中。

分区(Partition):主题的物理分片,用于实现并行处理和负载均衡。

副本(Replica):分区的备份,用于提高容错性。

生产者(Producer):负责向Kafka主题发送消息。

消费者(Consumer):从Kafka主题拉取消息进行消费。

消费者组(Consumer Group):多个消费者组成的组,用于实现消息的负载分配。需要小于等于分区数。

┌─────────────────────────────────────────────────────────────┐ │ Kafka Cluster │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │Broker 1 │ │Broker 2 │ │Broker 3 │ │ │ │ │ │ │ │ │ │ │ │ Topic A │ │ Topic A │ │ Topic A │ │ │ │ P0(P) │ │ P0(R) │ │ P1(P) │ │ │ │ P1(R) │ │ P1(P) │ │ P2(P) │ │ │ │ P2(R) │ │ P2(R) │ │ P0(R) │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ │ │ │ │ └──────────────┼──────────────┘ │ │ │ │ │ ┌──────┴──────┐ │ │ │ ZooKeeper │ (或 KRaft Controller) │ │ │ Ensemble │ │ │ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ │ Producer Consumer Group (生产者) (消费者组)


部署

准备三台服务器

node1: 192.168.1.101

node2: 192.168.1.102

node3: 192.168.1.103

下载

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 1. 下载 Kafka(所有节点相同版本!v3.3以后)
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz
mv kafka_2.13-3.7.0 /opt/kafka
cd /opt/kafka

# 2. 创建数据和日志目录
sudo mkdir -p /data/kafka-logs
sudo chown -R $(whoami):$(whoami) /data/kafka-logs

# 3. 创建配置文件目录
mkdir -p /opt/kafka/config/kraft/cluster

生成集群ID(仅在一台服务器执行)

1
2
3
4
5
6
7
# 在 node1 上执行
cd /opt/kafka
CLUSTER_ID=$(./bin/kafka-storage.sh random-uuid)
echo "集群ID: $CLUSTER_ID"

# 记录这个ID,后续所有节点都要用
echo "export KAFKA_CLUSTER_ID=$CLUSTER_ID" >> ~/.bashrc

重要:记下输出的集群ID,例如:rK9sT9a3TlqjP7lVv7VZLQ

创建一键部署脚本

创建自动化部署脚本 deploy-cluster.sh。参考配置文件模板:/opt/kafka/config/kraft/cluster/template.properties

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/bin/bash
# deploy-cluster.sh - 需要在每台服务器分别运行,传入不同参数

set -e  # 出错时退出

NODE_ID=$1
NODE_IP=$2
CLUSTER_ID=$3
OTHER_NODES=$4  # 格式: "1@192.168.1.101:9093,2@192.168.1.102:9093,3@192.168.1.103:9093"

echo "部署节点 $NODE_ID, IP: $NODE_IP"

# 1. 生成配置
cat > /opt/kafka/config/kraft/server.properties << EOF
node.id=$NODE_ID
process.roles=controller,broker

listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
advertised.listeners=PLAINTEXT://${NODE_IP}:9092
controller.listener.names=CONTROLLER

controller.quorum.voters=${OTHER_NODES}

log.dirs=/data/kafka-logs
num.partitions=3
default.replication.factor=3
min.insync.replicas=2

# 性能配置
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

log.retention.hours=168
log.segment.bytes=1073741824
EOF

# 2. 格式化存储
echo "初始化存储..."
/opt/kafka/bin/kafka-storage.sh format \
    -t $CLUSTER_ID \
    -c /opt/kafka/config/kraft/server.properties \
    --ignore-formatted

# 3. 创建 systemd 服务(生产环境推荐)
cat > /etc/systemd/system/kafka.service << EOF
[Unit]
Description=Apache Kafka (KRaft mode)
After=network.target

[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target
EOF

echo "部署完成!"
echo "启动命令: sudo systemctl start kafka"
echo "查看日志: sudo journalctl -u kafka -f"

使用方法:

1
2
3
4
5
6
7
8
# 在 node1 上执行
./deploy-cluster.sh 1 192.168.1.101 "rK9sT9a3TlqjP7lVv7VZLQ" "1@192.168.1.101:9093,2@192.168.1.102:9093,3@192.168.1.103:9093"

# 在 node2 上执行  
./deploy-cluster.sh 2 192.168.1.102 "rK9sT9a3TlqjP7lVv7VZLQ" "1@192.168.1.101:9093,2@192.168.1.102:9093,3@192.168.1.103:9093"

# 在 node3 上执行
./deploy-cluster.sh 3 192.168.1.103 "rK9sT9a3TlqjP7lVv7VZLQ" "1@192.168.1.101:9093,2@192.168.1.102:9093,3@192.168.1.103:9093"

防火墙配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# CentOS/RHEL
sudo firewall-cmd --permanent --add-port=9092/tcp
sudo firewall-cmd --permanent --add-port=9093/tcp
sudo firewall-cmd --reload

# Ubuntu/Debian
sudo ufw allow 9092/tcp
sudo ufw allow 9093/tcp
sudo ufw reload

# 检查端口
sudo netstat -tlnp | grep -E '9092|9093'

检查

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 查看所有Broker
./bin/kafka-cluster.sh cluster-id --bootstrap-server 192.168.1.101:9092

# 详细仲裁状态
./bin/kafka-metadata-quorum.sh --bootstrap-server 192.168.1.101:9092 describe --replication

# 查看日志(实时)
tail -f /var/log/kafka/server.log | grep -E "INFO|ERROR|WARN"

# 检查节点连接
./bin/kafka-topics.sh --bootstrap-server 192.168.1.101:9092 --list

监控

kafka + prometheus + grafana

https://www.cnblogs.com/starsliao/p/18522791

使用

添加依赖

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
<!-- pom.xml -->
<dependencies>
    <!-- Spring Boot Kafka Starter -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
    <!-- JSON 序列化支持 -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

配置文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        spring.json.trusted.packages: "*"
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 可选:SSL配置
    # ssl:
    #   key-password: password
    #   keystore-location: classpath:/keystore.jks
    #   keystore-password: password
    #   truststore-location: classpath:/truststore.jks
    #   truststore-password: password
    # properties:
    #   security.protocol: SSL

消息实体类

1
2
3
4
5
6
7
8
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserMessage {
    private Long id;
    private String name;
    private String email;
}

Kafka 生产者

基础生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Service
public class KafkaProducerService {
    
    private static final String TOPIC = "user-topic";
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    // 发送字符串消息
    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
        System.out.println("发送消息: " + message);
    }
    
    // 发送对象消息
    public void sendUserMessage(UserMessage user) {
        kafkaTemplate.send(TOPIC, user);
        System.out.println("发送用户消息: " + user);
    }
    
    // 发送消息到指定分区
    public void sendMessageWithPartition(String message, int partition) {
        kafkaTemplate.send(TOPIC, partition, null, message);
    }
    
    // 发送消息并获取发送结果
    public void sendMessageWithCallback(String message) {
        ListenableFuture<SendResult<String, Object>> future = 
            kafkaTemplate.send(TOPIC, message);
        
        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("发送成功: " + message + 
                    ", offset: " + result.getRecordMetadata().offset());
            }
            
            @Override
            public void onFailure(Throwable ex) {
                System.err.println("发送失败: " + message + 
                    ", 错误: " + ex.getMessage());
            }
        });
    }
}

使用 JSON 序列化的生产者配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class KafkaProducerConfig {
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                       StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                       JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Kafka 消费者

基础消费者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Service
public class KafkaConsumerService {
    
    // 监听单个主题
    @KafkaListener(topics = "user-topic", groupId = "my-group")
    public void consumeMessage(String message) {
        System.out.println("收到消息: " + message);
    }
    
    // 监听多个主题
    @KafkaListener(topics = {"topic1", "topic2"}, groupId = "my-group")
    public void consumeMultipleTopics(String message) {
        System.out.println("收到消息: " + message);
    }
    
    // 接收对象消息
    @KafkaListener(topics = "user-topic", groupId = "my-group")
    public void consumeUserMessage(UserMessage user) {
        System.out.println("收到用户消息: " + user);
    }
    
    // 带消息头信息的消费
    @KafkaListener(topics = "user-topic", groupId = "my-group")
    public void consumeWithHeaders(String message, 
                                   @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                   @Header(KafkaHeaders.OFFSET) long offset) {
        System.out.printf("收到消息 [topic=%s, partition=%d, offset=%d]: %s%n", 
                         topic, partition, offset, message);
    }
}

批量消费

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Service
public class KafkaBatchConsumerService {
    
    // 批量消费配置
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
        batchListenerContainerFactory() {
        
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true); // 启用批量监听
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                 StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                 StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // 批量大小
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    // 批量消费消息
    @KafkaListener(topics = "batch-topic", 
                   containerFactory = "batchListenerContainerFactory")
    public void consumeBatch(List<String> messages) {
        System.out.println("批量收到 " + messages.size() + " 条消息");
        messages.forEach(System.out::println);
    }
}

手动提交偏移量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
public class ManualAckConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
        manualAckListenerContainerFactory() {
        
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // 手动提交配置
        factory.getContainerProperties().setAckMode(
            ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        
        return factory;
    }
    
    @Service
    public class ManualAckConsumer {
        
        @KafkaListener(topics = "manual-topic", 
                      containerFactory = "manualAckListenerContainerFactory")
        public void consumeWithManualAck(String message, 
                                         Acknowledgment ack) {
            try {
                // 处理消息
                System.out.println("处理消息: " + message);
                // 手动提交偏移量
                ack.acknowledge();
            } catch (Exception e) {
                // 处理异常,不提交偏移量,消息会被重新消费
                System.err.println("处理消息失败: " + e.getMessage());
            }
        }
    }
}

错误处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Configuration
public class KafkaErrorHandlingConfig {
    
    // 自定义错误处理器
    @Bean
    public ConsumerAwareListenerErrorHandler listenErrorHandler() {
        return (message, exception, consumer) -> {
            System.err.println("消费消息失败: " + message.getPayload());
            System.err.println("异常信息: " + exception.getMessage());
            // 可以根据异常类型决定是否继续消费
            return null;
        };
    }
    
    // 死信队列配置
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
        retryListenerContainerFactory() {
        
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // 配置重试
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate()), 
            new FixedBackOff(1000L, 3L)); // 重试3次,间隔1秒
        
        factory.setCommonErrorHandler(errorHandler);
        return factory;
    }
    
    // 死信队列消费者
    @KafkaListener(topics = "user-topic.DLT", groupId = "dlt-group")
    public void processDltMessage(String message) {
        System.out.println("收到死信队列消息: " + message);
        // 处理死信消息
    }
}

事务支持

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Configuration
public class KafkaTransactionConfig {
    
    @Bean
    public ProducerFactory<String, Object> transactionalProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                       StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                       JsonSerializer.class);
        configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-");
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        DefaultKafkaProducerFactory<String, Object> factory = 
            new DefaultKafkaProducerFactory<>(configProps);
        factory.setTransactionIdPrefix("tx-");
        return factory;
    }
    
    @Bean
    public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
        return new KafkaTransactionManager<>(transactionalProducerFactory());
    }
    
    @Service
    public class TransactionalService {
        
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
        
        @Transactional
        public void sendWithTransaction(UserMessage user) {
            // 发送多条消息,要么全部成功,要么全部失败
            kafkaTemplate.send("topic1", user);
            kafkaTemplate.send("topic2", user);
            // 如果这里抛出异常,之前发送的消息都会回滚
        }
    }
}

测试示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@SpringBootTest
class KafkaTest {
    
    @Autowired
    private KafkaProducerService producerService;
    
    @Test
    void testSendMessage() {
        UserMessage user = new UserMessage(1L, "张三", "zhangsan@example.com");
        producerService.sendUserMessage(user);
        
        // 验证消息是否发送成功
        // 可以使用 Mockito 或嵌入式 Kafka 进行测试
    }
}

常用配置项说明

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
spring:
  kafka:
    # 消费者配置
    consumer:
      auto-offset-reset: earliest  # 从最早开始消费
      enable-auto-commit: false    # 关闭自动提交
      max-poll-records: 500        # 每次拉取最大记录数
      fetch-max-wait: 500          # 拉取最大等待时间(ms)
      heartbeat-interval: 3000     # 心跳间隔
      session-timeout: 10000       # 会话超时时间
      
    # 生产者配置  
    producer:
      acks: all                    # 消息确认机制
      retries: 3                   # 重试次数
      batch-size: 16384            # 批量大小
      buffer-memory: 33554432      # 缓冲区大小
      linger-ms: 1                 # 发送延迟
      compression-type: snappy     # 压缩类型
      
    # 监听器配置
    listener:
      type: single                 # 或 batch
      concurrency: 3               # 并发消费者数量
      poll-timeout: 3000           # 轮询超时
      ack-mode: manual_immediate   # 确认模式

注意点

  1. 生产环境配置

集群大小:至少3个broker以确保高可用性

副本因子:重要topic设置replication-factor≥3

分区数:根据吞吐量需求合理设置,避免过多或过少

数据保留策略:设置合理的retention.ms和retention.bytes

  1. 生产者注意事项
1
2
3
4
5
6
// 关键配置
props.put("acks", "all");          // 数据可靠性
props.put("retries", 3);           // 重试机制
props.put("max.in.flight.requests.per.connection", 1); // 消息顺序性
props.put("enable.idempotence", true); // 幂等性
props.put("compression.type", "snappy"); // 压缩
  1. 消费者注意事项

消费者组管理:合理规划consumer group

提交策略:根据业务选择auto或manual commit

处理偏移量:注意处理offset提交失败的情况

反压处理:控制消费速度,避免内存溢出

  1. 性能优化

批处理:合理设置batch.size和linger.ms

分区策略:自定义分区器优化数据分布

监控指标:监控生产/消费延迟、吞吐量、lag等

JVM调优:设置合适的堆内存和GC策略

  1. 数据可靠性

副本同步:确保min.insync.replicas配置合理

数据丢失预防:生产者使用acks=all,消费者及时提交offset

磁盘配置:使用多块磁盘,配置多个log.dirs

  1. 运维管理

监控告警:监控broker状态、topic大小、consumer lag

版本升级:注意版本兼容性,尤其是跨大版本升级

安全配置:启用SSL、SASL等安全机制

容量规划:定期评估存储需求,及时扩容

  1. 常见陷阱

消息顺序:分区内有序,跨分区无序

重复消费:at-least-once语义可能导致重复

数据倾斜:避免partition数据分布不均

消费者再平衡:rebalance期间暂停服务

  1. Kafka Streams注意事项

状态管理:合理设置state store清理策略

容错处理:处理KStreams应用失败恢复

资源隔离:避免与普通consumer/producer竞争资源

  1. Connect和Schema Registry

连接器配置:合理设置task.max

Schema演进:定义明确的兼容性策略

版本管理:Schema版本控制

  1. 监控和调试

关键指标:

生产/消费延迟

Consumer lag

Broker磁盘使用率

网络吞吐量

日志级别:适当调整log4j配置

工具使用:熟练掌握kafka-topics、kafka-consumer-groups等工具

Licensed under CC BY-NC-SA 4.0
Gear(夕照)的博客。记录开发、生活,以及一些不足为道的思考……