00
引言
01
Kafka消费滞后现象解析
分区滞后量 = 该分区最新消息位置(Log End Offset) - 消费者在该分区的消费位置(Consumer Offset)
02
Kafka消费滞后的常见原因分析
03
监控Kafka消费滞后的方法与工具
import org.apache.kafka.clients.admin.*;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.util.*;import java.util.concurrent.ExecutionException;public class KafkaLagMonitor {public static void main(String[] args) {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");try (AdminClient adminClient = AdminClient.create(props)) {// 获取所有消费者组ListConsumerGroupsResult groupsResult = adminClient.listConsumerGroups();Collection<ConsumerGroupListing> groups = groupsResult.all().get();for (ConsumerGroupListing group : groups) {String groupId = group.groupId();System.out.println("消费者组: " + groupId);// 获取消费者组的消费位置Map<TopicPartition, OffsetAndMetadata> offsets =adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();// 获取主题分区的最新位置Set<TopicPartition> partitions = offsets.keySet();Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =adminClient.listOffsets(partitions.stream().collect(HashMap::new,(m, tp) -> m.put(tp, OffsetSpec.latest()),HashMap::putAll)).all().get();// 计算并显示滞后量for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {TopicPartition partition = entry.getKey();long consumerOffset = entry.getValue().offset();long endOffset = endOffsets.get(partition).offset();long lag = endOffset - consumerOffset;System.out.printf("主题: %s, 分区: %d, 消费位置: %d, 最新位置: %d, 滞后量: %d%n",partition.topic(), partition.partition(), consumerOffset, endOffset, lag);}System.out.println("-----------------------------------");}} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}}
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"下的records-lag-max和records-lag-avg:分别表示最大滞后量和平均滞后量。 kafka.consumer:type=consumer-coordinator-metrics,client-id="{client-id}"下的commit-latency-avg和commit-latency-max:表示提交消费位置的平均延迟和最大延迟。
import javax.management.*;import java.lang.management.ManagementFactory;import java.util.Set;public class KafkaJmxLagMonitor {public static void main(String[] args) throws Exception {MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();// 查询所有消费者的JMX指标ObjectName objectName = new ObjectName("kafka.consumer:type=consumer-fetch-manager-metrics,*");Set<ObjectName> objectNames = mBeanServer.queryNames(objectName, null);for (ObjectName name : objectNames) {String clientId = name.getKeyProperty("client-id");// 获取最大滞后量Object maxLag = mBeanServer.getAttribute(name, "records-lag-max");// 获取平均滞后量Object avgLag = mBeanServer.getAttribute(name, "records-lag-avg");System.out.printf("消费者: %s, 最大滞后量: %s, 平均滞后量: %s%n",clientId, maxLag, avgLag);}}}
// 这不是Java代码,而是配置示例// Prometheus配置文件 prometheus.ymlscrape_configs:- job_name: 'kafka'static_configs:- targets: ['kafka-exporter:9308']// Kafka Exporter启动命令kafka-exporter --kafka.server=kafka:9092 --group.filter=.*
import org.apache.kafka.clients.admin.*;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.util.*;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class CustomLagMonitor {private final AdminClient adminClient;private final String groupId;private final String topic;private final long lagThreshold;public CustomLagMonitor(String bootstrapServers, String groupId, String topic, long lagThreshold) {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);this.adminClient = AdminClient.create(props);this.groupId = groupId;this.topic = topic;this.lagThreshold = lagThreshold;}public void startMonitoring(long intervalSeconds) {ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(this::checkLag, 0, intervalSeconds, TimeUnit.SECONDS);}private void checkLag() {try {// 获取消费者组的消费位置Map<TopicPartition, OffsetAndMetadata> offsets =adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();// 过滤出指定主题的分区Set<TopicPartition> partitions = new HashSet<>();for (TopicPartition partition : offsets.keySet()) {if (partition.topic().equals(topic)) {partitions.add(partition);}}// 获取主题分区的最新位置Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =adminClient.listOffsets(partitions.stream().collect(HashMap::new,(m, tp) -> m.put(tp, OffsetSpec.latest()),HashMap::putAll)).all().get();// 计算总滞后量long totalLag = 0;for (TopicPartition partition : partitions) {long consumerOffset = offsets.get(partition).offset();long endOffset = endOffsets.get(partition).offset();long lag = endOffset - consumerOffset;totalLag += lag;System.out.printf("分区: %d, 滞后量: %d%n", partition.partition(), lag);}System.out.printf("主题: %s, 消费者组: %s, 总滞后量: %d%n", topic, groupId, totalLag);// 检查是否超过阈值,发送告警if (totalLag > lagThreshold) {sendAlert(topic, groupId, totalLag);}} catch (Exception e) {e.printStackTrace();}}private void sendAlert(String topic, String groupId, long lag) {// 实现告警逻辑,如发送邮件、短信、钉钉消息等System.out.printf("告警: 主题 %s 的消费者组 %s 滞后量达到 %d,超过阈值 %d%n",topic, groupId, lag, lagThreshold);}public static void main(String[] args) {CustomLagMonitor monitor = new CustomLagMonitor("localhost:9092", // Kafka服务器地址"my-consumer-group", // 消费者组ID"my-topic", // 主题名称1000 // 滞后阈值);monitor.startMonitoring(60); // 每60秒检查一次}}
04
解决Kafka消费滞后的策略与最佳实践
import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.*;import java.util.concurrent.*;public class MultiThreadedConsumer {private final KafkaConsumer<String, String> consumer;private final ExecutorService executorService;private final int numWorkers;private final String topic;private volatile boolean running = true;public MultiThreadedConsumer(String bootstrapServers, String groupId, String topic, int numWorkers) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");this.consumer = new KafkaConsumer<>(props);this.executorService = Executors.newFixedThreadPool(numWorkers);this.numWorkers = numWorkers;this.topic = topic;}public void start() {consumer.subscribe(Collections.singletonList(topic));while (running) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {// 将消息分配给多个工作线程处理Map<TopicPartition, List<ConsumerRecord<String, String>>> partitionRecords = new HashMap<>();for (TopicPartition partition : records.partitions()) {partitionRecords.put(partition, records.records(partition));}CountDownLatch latch = new CountDownLatch(partitionRecords.size());for (Map.Entry<TopicPartition, List<ConsumerRecord<String, String>>> entry : partitionRecords.entrySet()) {TopicPartition partition = entry.getKey();List<ConsumerRecord<String, String>> partitionRecord = entry.getValue();executorService.submit(() -> {try {processRecords(partitionRecord);// 记录最后处理的偏移量long lastOffset = partitionRecord.get(partitionRecord.size() - 1).offset();consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));} catch (Exception e) {e.printStackTrace();} finally {latch.countDown();}});}try {// 等待所有分区的消息处理完成latch.await();} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}consumer.close();executorService.shutdown();}private void processRecords(List<ConsumerRecord<String, String>> records) {// 实际的消息处理逻辑for (ConsumerRecord<String, String> record : records) {System.out.printf("处理消息: 主题=%s, 分区=%d, 偏移量=%d, 键=%s, 值=%s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());// 模拟处理时间try {Thread.sleep(10);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}public void stop() {running = false;}public static void main(String[] args) {MultiThreadedConsumer consumer = new MultiThreadedConsumer("localhost:9092", // Kafka服务器地址"multi-threaded-group", // 消费者组ID"my-topic", // 主题名称10 // 工作线程数);consumer.start();}}
import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.*;public class BatchProcessingConsumer {private final KafkaConsumer<String, String> consumer;private final String topic;private final int batchSize;private volatile boolean running = true;public BatchProcessingConsumer(String bootstrapServers, String groupId, String topic, int batchSize) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(batchSize));this.consumer = new KafkaConsumer<>(props);this.topic = topic;this.batchSize = batchSize;}public void start() {consumer.subscribe(Collections.singletonList(topic));while (running) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();// 按分区批量处理消息for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);processBatch(partitionRecords);long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();currentOffsets.put(partition, new OffsetAndMetadata(lastOffset + 1));}// 提交所有分区的偏移量consumer.commitSync(currentOffsets);}}consumer.close();}private void processBatch(List<ConsumerRecord<String, String>> records) {// 收集批量数据List<String> messages = new ArrayList<>(records.size());for (ConsumerRecord<String, String> record : records) {messages.add(record.value());}// 批量处理System.out.printf("批量处理 %d 条消息%n", messages.size());// 这里是实际的批量处理逻辑,如批量写入数据库、批量调用API等// 示例中只是简单打印for (int i = 0; i < messages.size(); i += 100) {int end = Math.min(i + 100, messages.size());List<String> batch = messages.subList(i, end);System.out.printf("处理子批次: %d-%d%n", i, end - 1);// 模拟批量处理try {Thread.sleep(50);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}public void stop() {running = false;}public static void main(String[] args) {BatchProcessingConsumer consumer = new BatchProcessingConsumer("localhost:9092", // Kafka服务器地址"batch-processing-group", // 消费者组ID"my-topic", // 主题名称1000 // 批次大小);consumer.start();}}
Properties props = new Properties();// 基本配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "optimized-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 性能优化配置// 每次拉取的最大消息数量,根据消息大小和处理能力调整props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");// 拉取请求的最大字节数,根据网络带宽和内存调整props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576");// 消费者拉取超时时间,避免长时间阻塞props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");// 消费者会话超时时间,根据环境稳定性调整props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");// 心跳间隔时间,通常设置为会话超时时间的1/3props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");// 消费位置自动提交间隔,如果手动提交则禁用props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 消费者拉取消息的最大时间间隔,超过此时间将触发重平衡props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicBoolean;public class ScalableConsumerGroup {private final String bootstrapServers;private final String groupId;private final String topic;private final ExecutorService executorService;private final AtomicBoolean running = new AtomicBoolean(true);private final int maxConsumers;private int currentConsumers;public ScalableConsumerGroup(String bootstrapServers, String groupId, String topic, int initialConsumers, int maxConsumers) {this.bootstrapServers = bootstrapServers;this.groupId = groupId;this.topic = topic;this.maxConsumers = maxConsumers;this.currentConsumers = initialConsumers;this.executorService = Executors.newCachedThreadPool();}public void start() {// 启动初始数量的消费者for (int i = 0; i < currentConsumers; i++) {startConsumer();}// 启动监控线程,根据滞后情况动态调整消费者数量executorService.submit(this::monitorAndScale);}private void startConsumer() {executorService.submit(() -> {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(java.util.Collections.singletonList(topic));while (running.get()) {ConsumerRecords<String, String> records = consumer.poll(java.time.Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {processRecord(record);}}}});}private void processRecord(ConsumerRecord<String, String> record) {// 实际的消息处理逻辑System.out.printf("处理消息: 主题=%s, 分区=%d, 偏移量=%d, 键=%s, 值=%s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());// 模拟处理时间try {Thread.sleep(50);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void monitorAndScale() {while (running.get()) {try {// 获取当前消费滞后情况long currentLag = getCurrentLag();System.out.printf("当前滞后量: %d, 当前消费者数量: %d%n", currentLag, currentConsumers);// 根据滞后情况调整消费者数量if (currentLag > 10000 && currentConsumers < maxConsumers) {// 滞后量大,增加消费者int newConsumers = Math.min(currentConsumers + 2, maxConsumers);int consumersToAdd = newConsumers - currentConsumers;System.out.printf("滞后量过大,增加 %d 个消费者%n", consumersToAdd);for (int i = 0; i < consumersToAdd; i++) {startConsumer();}currentConsumers = newConsumers;} else if (currentLag < 1000 && currentConsumers > 1) {// 滞后量小,减少消费者(保留至少一个)currentConsumers = Math.max(currentConsumers - 1, 1);System.out.println("滞后量较小,减少消费者数量至: " + currentConsumers);// 注意:这里只是减少计数,实际的消费者会在下一次重平衡时自动调整}// 每分钟检查一次Thread.sleep(60000);} catch (Exception e) {e.printStackTrace();}}}private long getCurrentLag() {// 实现获取当前消费滞后量的逻辑// 这里可以使用前面介绍的监控方法// 简化起见,这里返回一个模拟值return (long) (Math.random() * 20000);}public void stop() {running.set(false);executorService.shutdown();}public static void main(String[] args) {ScalableConsumerGroup group = new ScalableConsumerGroup("localhost:9092", // Kafka服务器地址"scalable-group", // 消费者组ID"my-topic", // 主题名称2, // 初始消费者数量10 // 最大消费者数量);group.start();}}
import org.apache.kafka.clients.consumer.*;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.atomic.AtomicLong;public class BackpressureExample {private final String bootstrapServers;private final String sourceTopic;private final String targetTopic;private final AtomicBoolean running = new AtomicBoolean(true);private final ExecutorService executorService = Executors.newFixedThreadPool(2);private final AtomicLong currentLag = new AtomicLong(0);private final long maxLag;public BackpressureExample(String bootstrapServers, String sourceTopic, String targetTopic, long maxLag) {this.bootstrapServers = bootstrapServers;this.sourceTopic = sourceTopic;this.targetTopic = targetTopic;this.maxLag = maxLag;}public void start() {// 启动监控线程executorService.submit(this::monitorLag);// 启动消费-生产线程executorService.submit(this::consumeAndProduce);}private void monitorLag() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "lag-monitor");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList(sourceTopic));while (running.get()) {// 获取分配的分区consumer.poll(Duration.ofMillis(0));consumer.assignment().forEach(partition -> {// 获取最新位置long endOffset = consumer.endOffsets(Collections.singleton(partition)).get(partition);// 获取当前消费位置long currentOffset = consumer.position(partition);// 计算滞后量long lag = endOffset - currentOffset;// 更新当前滞后量currentLag.set(lag);System.out.printf("当前滞后量: %d%n", lag);});Thread.sleep(5000); // 每5秒检查一次}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void consumeAndProduce() {// 配置消费者Properties consumerProps = new Properties();consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "backpressure-consumer");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 配置生产者Properties producerProps = new Properties();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {consumer.subscribe(Collections.singletonList(sourceTopic));while (running.get()) {// 检查当前滞后量,如果超过最大值,则暂停消费long lag = currentLag.get();if (lag > maxLag) {System.out.println("滞后量过大,暂停消费");Thread.sleep(1000); // 暂停1秒continue;}// 正常消费ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息String processedValue = processRecord(record);// 发送到目标主题producer.send(new ProducerRecord<>(targetTopic, record.key(), processedValue),(metadata, exception) -> {if (exception != null) {exception.printStackTrace();}});}// 提交消费位置consumer.commitSync();}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private String processRecord(ConsumerRecord<String, String> record) {// 实际的消息处理逻辑System.out.printf("处理消息: 主题=%s, 分区=%d, 偏移量=%d, 键=%s, 值=%s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());// 模拟处理时间try {Thread.sleep(50);} catch (InterruptedException e) {Thread.currentThread().interrupt();}return "processed-" + record.value();}public void stop() {running.set(false);executorService.shutdown();}public static void main(String[] args) {BackpressureExample example = new BackpressureExample("localhost:9092", // Kafka服务器地址"source-topic", // 源主题"target-topic", // 目标主题5000 // 最大滞后量);example.start();}}
import com.zaxxer.hikari.HikariConfig;import com.zaxxer.hikari.HikariDataSource;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.serialization.StringDeserializer;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.SQLException;import java.time.Duration;import java.util.Collections;import java.util.Properties;import java.util.concurrent.atomic.AtomicBoolean;public class OptimizedDatabaseConsumer {private final String bootstrapServers;private final String topic;private final String jdbcUrl;private final String dbUser;private final String dbPassword;private final AtomicBoolean running = new AtomicBoolean(true);private HikariDataSource dataSource;public OptimizedDatabaseConsumer(String bootstrapServers, String topic,String jdbcUrl, String dbUser, String dbPassword) {this.bootstrapServers = bootstrapServers;this.topic = topic;this.jdbcUrl = jdbcUrl;this.dbUser = dbUser;this.dbPassword = dbPassword;}public void start() {// 初始化数据库连接池initDatabaseConnectionPool();// 配置消费者Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "db-consumer");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList(topic));while (running.get()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {// 批量写入数据库batchInsertToDatabase(records);// 提交消费位置consumer.commitSync();}}}}private void initDatabaseConnectionPool() {HikariConfig config = new HikariConfig();config.setJdbcUrl(jdbcUrl);config.setUsername(dbUser);config.setPassword(dbPassword);config.setMaximumPoolSize(20); // 根据需要调整连接池大小config.setMinimumIdle(5);config.setIdleTimeout(30000);config.setConnectionTimeout(10000);config.addDataSourceProperty("cachePrepStmts", "true");config.addDataSourceProperty("prepStmtCacheSize", "250");config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");dataSource = new HikariDataSource(config);}private void batchInsertToDatabase(ConsumerRecords<String, String> records) {String sql = "INSERT INTO messages (message_key, message_value, topic, partition, offset) VALUES (?, ?, ?, ?, ?)";try (Connection conn = dataSource.getConnection();PreparedStatement pstmt = conn.prepareStatement(sql)) {conn.setAutoCommit(false);int batchSize = 0;for (ConsumerRecord<String, String> record : records) {pstmt.setString(1, record.key());pstmt.setString(2, record.value());pstmt.setString(3, record.topic());pstmt.setInt(4, record.partition());pstmt.setLong(5, record.offset());pstmt.addBatch();batchSize++;// 每500条提交一次if (batchSize >= 500) {pstmt.executeBatch();batchSize = 0;}}// 提交剩余的批次if (batchSize > 0) {pstmt.executeBatch();}conn.commit();System.out.printf("成功写入 %d 条消息到数据库%n", records.count());} catch (SQLException e) {e.printStackTrace();}}public void stop() {running.set(false);if (dataSource != null) {dataSource.close();}}public static void main(String[] args) {OptimizedDatabaseConsumer consumer = new OptimizedDatabaseConsumer("localhost:9092", // Kafka服务器地址"db-topic", // 主题名称"jdbc:mysql://localhost:3306/kafka_messages", // 数据库URL"user", // 数据库用户名"password" // 数据库密码);consumer.start();}}
05
Kafka消费滞后的预防与长期维护策略
import org.apache.kafka.clients.consumer.*;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicLong;public class KafkaPerformanceTest {private final String bootstrapServers;private final String topic;private final int messageCount;private final int messageSize;private final int consumerCount;private final ExecutorService executorService;public KafkaPerformanceTest(String bootstrapServers, String topic,int messageCount, int messageSize, int consumerCount) {this.bootstrapServers = bootstrapServers;this.topic = topic;this.messageCount = messageCount;this.messageSize = messageSize;this.consumerCount = consumerCount;this.executorService = Executors.newFixedThreadPool(consumerCount + 1);}public void runTest() throws InterruptedException {// 创建测试主题(如果不存在)createTestTopic();// 启动消费者CountDownLatch consumerLatch = new CountDownLatch(consumerCount);AtomicLong totalConsumed = new AtomicLong(0);long startTime = System.currentTimeMillis();for (int i = 0; i < consumerCount; i++) {executorService.submit(() -> {try {long consumed = runConsumer();totalConsumed.addAndGet(consumed);} finally {consumerLatch.countDown();}});}// 启动生产者executorService.submit(() -> runProducer());// 等待所有消费者完成consumerLatch.await();long endTime = System.currentTimeMillis();long duration = endTime - startTime;// 计算性能指标double messagesPerSec = 1000.0 * totalConsumed.get() duration;double mbPerSec = messagesPerSec * messageSize (1024.0 * 1024.0);System.out.printf("性能测试结果:%n");System.out.printf("总消息数: %d%n", totalConsumed.get());System.out.printf("总耗时: %.2f 秒%n", duration 1000.0);System.out.printf("吞吐量: %.2f 消息/秒%n", messagesPerSec);System.out.printf("吞吐量: %.2f MB/秒%n", mbPerSec);executorService.shutdown();}private void createTestTopic() {// 实际应用中应使用AdminClient创建主题// 这里简化处理,假设主题已存在}private void runProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.ACKS_CONFIG, "1");props.put(ProducerConfig.LINGER_MS_CONFIG, "5");props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");// 生成测试消息StringBuilder messageBuilder = new StringBuilder();for (int i = 0; i < messageSize; i++) {messageBuilder.append('a');}String message = messageBuilder.toString();try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {for (int i = 0; i < messageCount; i++) {String key = "key-" + i;producer.send(new ProducerRecord<>(topic, key, message),(metadata, exception) -> {if (exception != null) {exception.printStackTrace();}});if (i % 10000 == 0) {System.out.printf("已生产 %d 条消息%n", i);}}}System.out.println("生产者完成");}private long runConsumer() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "perf-test-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");long count = 0;try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList(topic));while (count < messageCount) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 模拟消息处理count++;if (count % 10000 == 0) {System.out.printf("消费者已处理 %d 条消息%n", count);}if (count >= messageCount) {break;}}}}return count;}public static void main(String[] args) throws InterruptedException {KafkaPerformanceTest test = new KafkaPerformanceTest("localhost:9092", // Kafka服务器地址"perf-test-topic", // 测试主题1000000, // 消息数量1024, // 消息大小(字节)3 // 消费者数量);test.runTest();}}
这个性能测试示例可以帮助评估Kafka生产者和消费者的处理能力。通过调整消息数量、消息大小和消费者数量等参数,可以模拟不同的负载场景,找出系统的性能瓶颈和最大处理能力。在实际应用中,应当根据性能测试结果,预留足够的处理能力冗余,以应对流量波动和突发情况。
import io.prometheus.client.Counter;import io.prometheus.client.Gauge;import io.prometheus.client.exporter.HTTPServer;import org.apache.kafka.clients.admin.*;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.io.IOException;import java.util.*;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class KafkaLagMonitoringSystem {private final AdminClient adminClient;private final Set<String> monitoredGroups;private final Set<String> monitoredTopics;private final ScheduledExecutorService scheduler;private HTTPServer server;// Prometheus指标private final Gauge lagGauge = Gauge.build().name("kafka_consumer_group_lag").help("Kafka消费者组滞后量").labelNames("group", "topic", "partition").register();private final Counter errorCounter = Counter.build().name("kafka_lag_monitor_errors").help("监控过程中的错误数").register();public KafkaLagMonitoringSystem(String bootstrapServers, Set<String> groups, Set<String> topics) {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);this.adminClient = AdminClient.create(props);this.monitoredGroups = groups;this.monitoredTopics = topics;this.scheduler = Executors.newScheduledThreadPool(1);}public void start(int intervalSeconds, int prometheusPort) throws IOException {// 启动Prometheus HTTP服务器server = new HTTPServer(prometheusPort);// 定期执行监控任务scheduler.scheduleAtFixedRate(this::monitorLag, 0, intervalSeconds, TimeUnit.SECONDS);System.out.printf("监控系统已启动,Prometheus指标暴露在端口 %d%n", prometheusPort);}private void monitorLag() {try {// 如果没有指定消费者组,则获取所有消费者组Set<String> groupsToMonitor = monitoredGroups;if (groupsToMonitor.isEmpty()) {ListConsumerGroupsResult groupsResult = adminClient.listConsumerGroups();Collection<ConsumerGroupListing> groups = groupsResult.all().get();groupsToMonitor = new HashSet<>();for (ConsumerGroupListing group : groups) {groupsToMonitor.add(group.groupId());}}for (String groupId : groupsToMonitor) {// 获取消费者组的消费位置Map<TopicPartition, OffsetAndMetadata> offsets =adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();// 过滤出需要监控的主题Set<TopicPartition> partitionsToCheck = new HashSet<>();for (TopicPartition partition : offsets.keySet()) {if (monitoredTopics.isEmpty() || monitoredTopics.contains(partition.topic())) {partitionsToCheck.add(partition);}}if (partitionsToCheck.isEmpty()) {continue;}// 获取主题分区的最新位置Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =adminClient.listOffsets(partitionsToCheck.stream().collect(HashMap::new,(m, tp) -> m.put(tp, OffsetSpec.latest()),HashMap::putAll)).all().get();// 计算并记录滞后量for (TopicPartition partition : partitionsToCheck) {OffsetAndMetadata metadata = offsets.get(partition);if (metadata != null) {long consumerOffset = metadata.offset();long endOffset = endOffsets.get(partition).offset();long lag = endOffset - consumerOffset;// 更新Prometheus指标lagGauge.labels(groupId, partition.topic(), String.valueOf(partition.partition())).set(lag);System.out.printf("组: %s, 主题: %s, 分区: %d, 滞后量: %d%n",groupId, partition.topic(), partition.partition(), lag);}}}} catch (Exception e) {errorCounter.inc();e.printStackTrace();}}public void stop() {scheduler.shutdown();if (server != null) {server.stop();}adminClient.close();}public static void main(String[] args) throws IOException {// 配置要监控的消费者组和主题Set<String> groups = new HashSet<>(Arrays.asList("group1", "group2"));Set<String> topics = new HashSet<>(Arrays.asList("topic1", "topic2"));KafkaLagMonitoringSystem monitor = new KafkaLagMonitoringSystem("localhost:9092", // Kafka服务器地址groups, // 要监控的消费者组topics // 要监控的主题);monitor.start(30, 8080); // 每30秒监控一次,Prometheus指标暴露在8080端口}}
import io.kubernetes.client.openapi.ApiClient;import io.kubernetes.client.openapi.ApiException;import io.kubernetes.client.openapi.Configuration;import io.kubernetes.client.openapi.apis.AppsV1Api;import io.kubernetes.client.openapi.models.V1Deployment;import io.kubernetes.client.openapi.models.V1DeploymentSpec;import io.kubernetes.client.openapi.models.V1Scale;import io.kubernetes.client.openapi.models.V1ScaleSpec;import io.kubernetes.client.util.Config;import org.apache.kafka.clients.admin.*;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.io.IOException;import java.util.*;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class KafkaAutoScaler {private final AdminClient adminClient;private final String groupId;private final String topic;private final String namespace;private final String deploymentName;private final int minReplicas;private final int maxReplicas;private final long lagThresholdPerReplica;private final ScheduledExecutorService scheduler;private final AppsV1Api appsV1Api;public KafkaAutoScaler(String bootstrapServers, String groupId, String topic,String namespace, String deploymentName,int minReplicas, int maxReplicas, long lagThresholdPerReplica) throws IOException {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);this.adminClient = AdminClient.create(props);this.groupId = groupId;this.topic = topic;this.namespace = namespace;this.deploymentName = deploymentName;this.minReplicas = minReplicas;this.maxReplicas = maxReplicas;this.lagThresholdPerReplica = lagThresholdPerReplica;this.scheduler = Executors.newScheduledThreadPool(1);// 初始化Kubernetes客户端ApiClient client = Config.defaultClient();Configuration.setDefaultApiClient(client);this.appsV1Api = new AppsV1Api();}public void start(int intervalSeconds) {// 定期执行自动扩缩容任务scheduler.scheduleAtFixedRate(this::autoScale, 0, intervalSeconds, TimeUnit.SECONDS);System.out.printf("自动扩缩容系统已启动,每 %d 秒检查一次%n", intervalSeconds);}private void autoScale() {try {// 获取当前滞后量long totalLag = getTotalLag();System.out.printf("当前总滞后量: %d%n", totalLag);// 获取当前副本数int currentReplicas = getCurrentReplicas();System.out.printf("当前副本数: %d%n", currentReplicas);// 计算所需副本数int desiredReplicas = calculateDesiredReplicas(totalLag, currentReplicas);System.out.printf("期望副本数: %d%n", desiredReplicas);// 如果需要调整副本数,则更新Deploymentif (desiredReplicas != currentReplicas) {updateReplicas(desiredReplicas);System.out.printf("已将副本数从 %d 调整为 %d%n", currentReplicas, desiredReplicas);}} catch (Exception e) {e.printStackTrace();}}private long getTotalLag() throws Exception {// 获取消费者组的消费位置Map<TopicPartition, OffsetAndMetadata> offsets =adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();// 过滤出指定主题的分区Set<TopicPartition> partitionsToCheck = new HashSet<>();for (TopicPartition partition : offsets.keySet()) {if (partition.topic().equals(topic)) {partitionsToCheck.add(partition);}}if (partitionsToCheck.isEmpty()) {return 0;}// 获取主题分区的最新位置Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =adminClient.listOffsets(partitionsToCheck.stream().collect(HashMap::new,(m, tp) -> m.put(tp, OffsetSpec.latest()),HashMap::putAll)).all().get();// 计算总滞后量long totalLag = 0;for (TopicPartition partition : partitionsToCheck) {OffsetAndMetadata metadata = offsets.get(partition);if (metadata != null) {long consumerOffset = metadata.offset();long endOffset = endOffsets.get(partition).offset();long lag = endOffset - consumerOffset;totalLag += lag;}}return totalLag;}private int getCurrentReplicas() throws ApiException {V1Scale scale = appsV1Api.readNamespacedDeploymentScale(deploymentName, namespace, null);return scale.getSpec().getReplicas();}private int calculateDesiredReplicas(long totalLag, int currentReplicas) {// 根据滞后量计算所需副本数int desiredReplicas = (int) Math.ceil((double) totalLag lagThresholdPerReplica);// 确保副本数在最小值和最大值之间desiredReplicas = Math.max(desiredReplicas, minReplicas);desiredReplicas = Math.min(desiredReplicas, maxReplicas);// 避免频繁扩缩容,只有当需要增加或减少超过1个副本时才调整if (Math.abs(desiredReplicas - currentReplicas) <= 1) {return currentReplicas;}return desiredReplicas;}private void updateReplicas(int replicas) throws ApiException {V1Scale scale = new V1Scale();V1ScaleSpec spec = new V1ScaleSpec();spec.setReplicas(replicas);scale.setSpec(spec);appsV1Api.replaceNamespacedDeploymentScale(deploymentName, namespace, scale, null, null, null, null);}public void stop() {scheduler.shutdown();adminClient.close();}public static void main(String[] args) throws IOException {KafkaAutoScaler autoScaler = new KafkaAutoScaler("localhost:9092", // Kafka服务器地址"my-consumer-group", // 消费者组ID"my-topic", // 主题名称"default", // Kubernetes命名空间"kafka-consumer", // Deployment名称2, // 最小副本数10, // 最大副本数10000 // 每个副本的滞后阈值);autoScaler.start(60); // 每60秒检查一次}}
import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.*;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicBoolean;public class DisasterRecoveryConsumer {private final String bootstrapServers;private final String primaryGroupId;private final String backupGroupId;private final String topic;private final long maxLagThreshold;private final long checkIntervalMs;private final AtomicBoolean running = new AtomicBoolean(true);private final ExecutorService executorService = Executors.newFixedThreadPool(2);private final AtomicBoolean backupActive = new AtomicBoolean(false);public DisasterRecoveryConsumer(String bootstrapServers, String primaryGroupId, String backupGroupId,String topic, long maxLagThreshold, long checkIntervalMs) {this.bootstrapServers = bootstrapServers;this.primaryGroupId = primaryGroupId;this.backupGroupId = backupGroupId;this.topic = topic;this.maxLagThreshold = maxLagThreshold;this.checkIntervalMs = checkIntervalMs;}public void start() {// 启动监控线程executorService.submit(this::monitorPrimaryConsumer);// 启动备用消费者线程executorService.submit(this::runBackupConsumer);System.out.println("灾备消费系统已启动");}private void monitorPrimaryConsumer() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());try (AdminClient adminClient = AdminClient.create(props)) {while (running.get()) {try {// 获取主消费者组的滞后情况Map<TopicPartition, OffsetAndMetadata> offsets =adminClient.listConsumerGroupOffsets(primaryGroupId).partitionsToOffsetAndMetadata().get();// 过滤出指定主题的分区Set<TopicPartition> partitions = new HashSet<>();for (TopicPartition partition : offsets.keySet()) {if (partition.topic().equals(topic)) {partitions.add(partition);}}if (partitions.isEmpty()) {System.out.println("主消费者组未消费指定主题,激活备用消费者");backupActive.set(true);Thread.sleep(checkIntervalMs);continue;}// 获取主题分区的最新位置Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =adminClient.listOffsets(partitions.stream().collect(HashMap::new,(m, tp) -> m.put(tp, OffsetSpec.latest()),HashMap::putAll)).all().get();// 计算总滞后量long totalLag = 0;for (TopicPartition partition : partitions) {long consumerOffset = offsets.get(partition).offset();long endOffset = endOffsets.get(partition).offset();long lag = endOffset - consumerOffset;totalLag += lag;}System.out.printf("主消费者组滞后量: %d%n", totalLag);// 检查是否需要激活备用消费者if (totalLag > maxLagThreshold) {System.out.println("主消费者组滞后量超过阈值,激活备用消费者");backupActive.set(true);} else {backupActive.set(false);}} catch (Exception e) {System.out.println("监控主消费者时出错,激活备用消费者");backupActive.set(true);e.printStackTrace();}Thread.sleep(checkIntervalMs);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void runBackupConsumer() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, backupGroupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList(topic));while (running.get()) {// 检查备用消费者是否激活if (backupActive.get()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {// 处理消息for (ConsumerRecord<String, String> record : records) {processRecord(record);}// 提交消费位置consumer.commitSync();System.out.printf("备用消费者处理了 %d 条消息%n", records.count());}} else {// 备用消费者未激活,短暂休眠Thread.sleep(1000);}}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void processRecord(ConsumerRecord<String, String> record) {// 实际的消息处理逻辑System.out.printf("备用消费者处理消息: 主题=%s, 分区=%d, 偏移量=%d, 键=%s, 值=%s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());}public void stop() {running.set(false);executorService.shutdown();}public static void main(String[] args) {DisasterRecoveryConsumer consumer = new DisasterRecoveryConsumer("localhost:9092", // Kafka服务器地址"primary-group", // 主消费者组ID"backup-group", // 备用消费者组ID"important-topic", // 主题名称10000, // 最大滞后阈值30000 // 检查间隔(毫秒));consumer.start();}}
06
总结
07
加群请添加作者

08
获取文档资料

推荐阅读系列文章
建议收藏 | Dinky系列总结篇 建议收藏 | Flink系列总结篇 建议收藏 | Flink CDC 系列总结篇 建议收藏 | Doris实战文章合集 建议收藏 | Paimon 实战文章总结 建议收藏 | Fluss 实战文章总结 建议收藏 | Seatunnel 实战文章系列合集 建议收藏 | 实时离线输数仓(数据湖)总结篇 建议收藏 | 实时离线数仓实战第一阶段总结 超700star!电商项目数据湖建设实战代码 ,拿来即用! 从0到1建设电商项目数据湖实战教程 推荐一套开源电商项目数据湖建设实战代码
如果喜欢 请点个在看分享给身边的朋友
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




