Kafka 的“难”不在于入门,而在于精通和稳定运维。它的设计极为精妙,提供了极高的灵活性和控制力,但这也意味着责任从框架转移到了开发者/运维者身上。你必须非常清楚自己的业务在一致性、可用性、分区容错性(CAP) 上的权衡,并做出正确的配置选择。
基本概念
Broker:Kafka 集群中的节点,存储和转发消息。一个 broker 对应一个物理节点。
主题(Topic):消息的分类单元,所有消息都存储在主题中。
分区(Partition):主题的物理分片,用于实现并行处理和负载均衡。
副本(Replica):分区的备份,用于提高容错性。
生产者(Producer):负责向Kafka主题发送消息。
消费者(Consumer):从Kafka主题拉取消息进行消费。
消费者组(Consumer Group):多个消费者组成的组,用于实现消息的负载分配。需要小于等于分区数。
┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Broker 1 │ │Broker 2 │ │Broker 3 │ │
│ │ │ │ │ │ │ │
│ │ Topic A │ │ Topic A │ │ Topic A │ │
│ │ P0(P) │ │ P0(R) │ │ P1(P) │ │
│ │ P1(R) │ │ P1(P) │ │ P2(P) │ │
│ │ P2(R) │ │ P2(R) │ │ P0(R) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ │ │
│ ┌──────┴──────┐ │
│ │ ZooKeeper │ (或 KRaft Controller) │
│ │ Ensemble │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
│ │
│ │
Producer Consumer Group
(生产者) (消费者组)
部署
准备三台服务器
node1: 192.168.1.101
node2: 192.168.1.102
node3: 192.168.1.103
下载
1
2
3
4
5
6
7
8
9
10
11
12
|
# 1. 下载 Kafka(所有节点相同版本!v3.3以后)
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz
mv kafka_2.13-3.7.0 /opt/kafka
cd /opt/kafka
# 2. 创建数据和日志目录
sudo mkdir -p /data/kafka-logs
sudo chown -R $(whoami):$(whoami) /data/kafka-logs
# 3. 创建配置文件目录
mkdir -p /opt/kafka/config/kraft/cluster
|
生成集群ID(仅在一台服务器执行)
1
2
3
4
5
6
7
|
# 在 node1 上执行
cd /opt/kafka
CLUSTER_ID=$(./bin/kafka-storage.sh random-uuid)
echo "集群ID: $CLUSTER_ID"
# 记录这个ID,后续所有节点都要用
echo "export KAFKA_CLUSTER_ID=$CLUSTER_ID" >> ~/.bashrc
|
重要:记下输出的集群ID,例如:rK9sT9a3TlqjP7lVv7VZLQ
创建一键部署脚本
创建自动化部署脚本 deploy-cluster.sh。参考配置文件模板:/opt/kafka/config/kraft/cluster/template.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
#!/bin/bash
# deploy-cluster.sh - 需要在每台服务器分别运行,传入不同参数
set -e # 出错时退出
NODE_ID=$1
NODE_IP=$2
CLUSTER_ID=$3
OTHER_NODES=$4 # 格式: "1@192.168.1.101:9093,2@192.168.1.102:9093,3@192.168.1.103:9093"
echo "部署节点 $NODE_ID, IP: $NODE_IP"
# 1. 生成配置
cat > /opt/kafka/config/kraft/server.properties << EOF
node.id=$NODE_ID
process.roles=controller,broker
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
advertised.listeners=PLAINTEXT://${NODE_IP}:9092
controller.listener.names=CONTROLLER
controller.quorum.voters=${OTHER_NODES}
log.dirs=/data/kafka-logs
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
# 性能配置
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
log.retention.hours=168
log.segment.bytes=1073741824
EOF
# 2. 格式化存储
echo "初始化存储..."
/opt/kafka/bin/kafka-storage.sh format \
-t $CLUSTER_ID \
-c /opt/kafka/config/kraft/server.properties \
--ignore-formatted
# 3. 创建 systemd 服务(生产环境推荐)
cat > /etc/systemd/system/kafka.service << EOF
[Unit]
Description=Apache Kafka (KRaft mode)
After=network.target
[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
EOF
echo "部署完成!"
echo "启动命令: sudo systemctl start kafka"
echo "查看日志: sudo journalctl -u kafka -f"
|
使用方法:
1
2
3
4
5
6
7
8
|
# 在 node1 上执行
./deploy-cluster.sh 1 192.168.1.101 "rK9sT9a3TlqjP7lVv7VZLQ" "1@192.168.1.101:9093,2@192.168.1.102:9093,3@192.168.1.103:9093"
# 在 node2 上执行
./deploy-cluster.sh 2 192.168.1.102 "rK9sT9a3TlqjP7lVv7VZLQ" "1@192.168.1.101:9093,2@192.168.1.102:9093,3@192.168.1.103:9093"
# 在 node3 上执行
./deploy-cluster.sh 3 192.168.1.103 "rK9sT9a3TlqjP7lVv7VZLQ" "1@192.168.1.101:9093,2@192.168.1.102:9093,3@192.168.1.103:9093"
|
防火墙配置
1
2
3
4
5
6
7
8
9
10
11
12
|
# CentOS/RHEL
sudo firewall-cmd --permanent --add-port=9092/tcp
sudo firewall-cmd --permanent --add-port=9093/tcp
sudo firewall-cmd --reload
# Ubuntu/Debian
sudo ufw allow 9092/tcp
sudo ufw allow 9093/tcp
sudo ufw reload
# 检查端口
sudo netstat -tlnp | grep -E '9092|9093'
|
检查
1
2
3
4
5
6
7
8
9
10
11
|
# 查看所有Broker
./bin/kafka-cluster.sh cluster-id --bootstrap-server 192.168.1.101:9092
# 详细仲裁状态
./bin/kafka-metadata-quorum.sh --bootstrap-server 192.168.1.101:9092 describe --replication
# 查看日志(实时)
tail -f /var/log/kafka/server.log | grep -E "INFO|ERROR|WARN"
# 检查节点连接
./bin/kafka-topics.sh --bootstrap-server 192.168.1.101:9092 --list
|
监控
kafka + prometheus + grafana
https://www.cnblogs.com/starsliao/p/18522791
使用
添加依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Kafka Starter -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- JSON 序列化支持 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
|
配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
spring.json.trusted.packages: "*"
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 可选:SSL配置
# ssl:
# key-password: password
# keystore-location: classpath:/keystore.jks
# keystore-password: password
# truststore-location: classpath:/truststore.jks
# truststore-password: password
# properties:
# security.protocol: SSL
|
消息实体类
1
2
3
4
5
6
7
8
|
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserMessage {
private Long id;
private String name;
private String email;
}
|
Kafka 生产者
基础生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
@Service
public class KafkaProducerService {
private static final String TOPIC = "user-topic";
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送字符串消息
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC, message);
System.out.println("发送消息: " + message);
}
// 发送对象消息
public void sendUserMessage(UserMessage user) {
kafkaTemplate.send(TOPIC, user);
System.out.println("发送用户消息: " + user);
}
// 发送消息到指定分区
public void sendMessageWithPartition(String message, int partition) {
kafkaTemplate.send(TOPIC, partition, null, message);
}
// 发送消息并获取发送结果
public void sendMessageWithCallback(String message) {
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(TOPIC, message);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送成功: " + message +
", offset: " + result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
System.err.println("发送失败: " + message +
", 错误: " + ex.getMessage());
}
});
}
}
|
使用 JSON 序列化的生产者配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
|
Kafka 消费者
基础消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
@Service
public class KafkaConsumerService {
// 监听单个主题
@KafkaListener(topics = "user-topic", groupId = "my-group")
public void consumeMessage(String message) {
System.out.println("收到消息: " + message);
}
// 监听多个主题
@KafkaListener(topics = {"topic1", "topic2"}, groupId = "my-group")
public void consumeMultipleTopics(String message) {
System.out.println("收到消息: " + message);
}
// 接收对象消息
@KafkaListener(topics = "user-topic", groupId = "my-group")
public void consumeUserMessage(UserMessage user) {
System.out.println("收到用户消息: " + user);
}
// 带消息头信息的消费
@KafkaListener(topics = "user-topic", groupId = "my-group")
public void consumeWithHeaders(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
System.out.printf("收到消息 [topic=%s, partition=%d, offset=%d]: %s%n",
topic, partition, offset, message);
}
}
|
批量消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
@Service
public class KafkaBatchConsumerService {
// 批量消费配置
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
batchListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // 启用批量监听
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // 批量大小
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
return new DefaultKafkaConsumerFactory<>(props);
}
// 批量消费消息
@KafkaListener(topics = "batch-topic",
containerFactory = "batchListenerContainerFactory")
public void consumeBatch(List<String> messages) {
System.out.println("批量收到 " + messages.size() + " 条消息");
messages.forEach(System.out::println);
}
}
|
手动提交偏移量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
@Configuration
public class ManualAckConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
manualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 手动提交配置
factory.getContainerProperties().setAckMode(
ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Service
public class ManualAckConsumer {
@KafkaListener(topics = "manual-topic",
containerFactory = "manualAckListenerContainerFactory")
public void consumeWithManualAck(String message,
Acknowledgment ack) {
try {
// 处理消息
System.out.println("处理消息: " + message);
// 手动提交偏移量
ack.acknowledge();
} catch (Exception e) {
// 处理异常,不提交偏移量,消息会被重新消费
System.err.println("处理消息失败: " + e.getMessage());
}
}
}
}
|
错误处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
@Configuration
public class KafkaErrorHandlingConfig {
// 自定义错误处理器
@Bean
public ConsumerAwareListenerErrorHandler listenErrorHandler() {
return (message, exception, consumer) -> {
System.err.println("消费消息失败: " + message.getPayload());
System.err.println("异常信息: " + exception.getMessage());
// 可以根据异常类型决定是否继续消费
return null;
};
}
// 死信队列配置
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
retryListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 配置重试
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()),
new FixedBackOff(1000L, 3L)); // 重试3次,间隔1秒
factory.setCommonErrorHandler(errorHandler);
return factory;
}
// 死信队列消费者
@KafkaListener(topics = "user-topic.DLT", groupId = "dlt-group")
public void processDltMessage(String message) {
System.out.println("收到死信队列消息: " + message);
// 处理死信消息
}
}
|
事务支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
@Configuration
public class KafkaTransactionConfig {
@Bean
public ProducerFactory<String, Object> transactionalProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-");
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
DefaultKafkaProducerFactory<String, Object> factory =
new DefaultKafkaProducerFactory<>(configProps);
factory.setTransactionIdPrefix("tx-");
return factory;
}
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
return new KafkaTransactionManager<>(transactionalProducerFactory());
}
@Service
public class TransactionalService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Transactional
public void sendWithTransaction(UserMessage user) {
// 发送多条消息,要么全部成功,要么全部失败
kafkaTemplate.send("topic1", user);
kafkaTemplate.send("topic2", user);
// 如果这里抛出异常,之前发送的消息都会回滚
}
}
}
|
测试示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@SpringBootTest
class KafkaTest {
@Autowired
private KafkaProducerService producerService;
@Test
void testSendMessage() {
UserMessage user = new UserMessage(1L, "张三", "zhangsan@example.com");
producerService.sendUserMessage(user);
// 验证消息是否发送成功
// 可以使用 Mockito 或嵌入式 Kafka 进行测试
}
}
|
常用配置项说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
spring:
kafka:
# 消费者配置
consumer:
auto-offset-reset: earliest # 从最早开始消费
enable-auto-commit: false # 关闭自动提交
max-poll-records: 500 # 每次拉取最大记录数
fetch-max-wait: 500 # 拉取最大等待时间(ms)
heartbeat-interval: 3000 # 心跳间隔
session-timeout: 10000 # 会话超时时间
# 生产者配置
producer:
acks: all # 消息确认机制
retries: 3 # 重试次数
batch-size: 16384 # 批量大小
buffer-memory: 33554432 # 缓冲区大小
linger-ms: 1 # 发送延迟
compression-type: snappy # 压缩类型
# 监听器配置
listener:
type: single # 或 batch
concurrency: 3 # 并发消费者数量
poll-timeout: 3000 # 轮询超时
ack-mode: manual_immediate # 确认模式
|
注意点
- 生产环境配置
集群大小:至少3个broker以确保高可用性
副本因子:重要topic设置replication-factor≥3
分区数:根据吞吐量需求合理设置,避免过多或过少
数据保留策略:设置合理的retention.ms和retention.bytes
- 生产者注意事项
1
2
3
4
5
6
|
// 关键配置
props.put("acks", "all"); // 数据可靠性
props.put("retries", 3); // 重试机制
props.put("max.in.flight.requests.per.connection", 1); // 消息顺序性
props.put("enable.idempotence", true); // 幂等性
props.put("compression.type", "snappy"); // 压缩
|
- 消费者注意事项
消费者组管理:合理规划consumer group
提交策略:根据业务选择auto或manual commit
处理偏移量:注意处理offset提交失败的情况
反压处理:控制消费速度,避免内存溢出
- 性能优化
批处理:合理设置batch.size和linger.ms
分区策略:自定义分区器优化数据分布
监控指标:监控生产/消费延迟、吞吐量、lag等
JVM调优:设置合适的堆内存和GC策略
- 数据可靠性
副本同步:确保min.insync.replicas配置合理
数据丢失预防:生产者使用acks=all,消费者及时提交offset
磁盘配置:使用多块磁盘,配置多个log.dirs
- 运维管理
监控告警:监控broker状态、topic大小、consumer lag
版本升级:注意版本兼容性,尤其是跨大版本升级
安全配置:启用SSL、SASL等安全机制
容量规划:定期评估存储需求,及时扩容
- 常见陷阱
消息顺序:分区内有序,跨分区无序
重复消费:at-least-once语义可能导致重复
数据倾斜:避免partition数据分布不均
消费者再平衡:rebalance期间暂停服务
- Kafka Streams注意事项
状态管理:合理设置state store清理策略
容错处理:处理KStreams应用失败恢复
资源隔离:避免与普通consumer/producer竞争资源
- Connect和Schema Registry
连接器配置:合理设置task.max
Schema演进:定义明确的兼容性策略
版本管理:Schema版本控制
- 监控和调试
关键指标:
生产/消费延迟
Consumer lag
Broker磁盘使用率
网络吞吐量
日志级别:适当调整log4j配置
工具使用:熟练掌握kafka-topics、kafka-consumer-groups等工具