抱歉,这是一篇迟来的实用操作贴,原付费内容,现在免费送给大家(已付费的用户别急,拿好手中剩余的ORC,等升值)。

鱼跃龙门,一门之隔,就是龙和鱼。同样,技术升职到最后也是有瓶颈,借用阿里职级来说,有些人一辈子就停留在p7了。
技术岗到了最后,除了你要在技术领域有自己的开源作品,还要在业内有一定的影响力。这也是我长期坚持写开源文章的动力之一。同样,如果您也是一位热爱分享的人,欢迎加入。
开始
通过传入开始和结束时间戳,重新消费kafka历史消息。
public static void main(String[] args) {String startTime = "2018-08-14 21:19:09"; // 开始时间String endTime = "2018-08-14 21:20:59"; // 结束时间// endTime = ""; // 不设置结束时间new ReConsumerByTime().start(startTime, endTime);}
详细代码:
package com.system.kafka.clients.demo.producer.reconsumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndTimestamp;import org.apache.kafka.common.TopicPartition;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.*;/*** 通过开始时间和结束时间,重新消费kafka消息*/public class ReConsumerByTime {KafkaConsumer<String, String> consumer;final String topic = "mytopic";final String groupId = "test";final int partitionNum = 4;final String bootstrapServers = "10.211.55.5:9092";public static void main(String[] args) {String startTime = "2018-08-14 21:19:09"; // 开始时间String endTime = "2018-08-14 21:20:59"; // 结束时间// endTime = ""; 不设置结束时间new ReConsumerByTime().start(startTime, endTime);}public void start(String startTime, String endTime) {init();start(timeFormat(startTime), timeFormat(endTime));}public void start(long startTime, long endTime) {Map<TopicPartition, Long> startMap = new HashMap<>();for (int i = 0; i < partitionNum; i++) startMap.put(new TopicPartition(topic, i), startTime);Map<TopicPartition, OffsetAndTimestamp> startOffsetMap = consumer.offsetsForTimes(startMap);List<TopicPartition> topicPartitions = new ArrayList<>();startMap.forEach((k, v) -> {topicPartitions.add(k);});consumer.assign(topicPartitions);startMap.forEach((k, v) -> {consumer.seek(k, startOffsetMap.get(k).offset());System.out.println(k + ", offsets:" + startOffsetMap.get(k).offset());});while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {try {if (endTime == 0 || record.timestamp() <= endTime) {System.out.printf("offset = %d,p = %d, key = %s, value = %s \r\n", record.offset(), record.partition(), record.key(), record.value());}} catch (Exception e) {e.printStackTrace();}}}}// 格式化时间public long timeFormat(String dateTime) {try {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");return sdf.parse(dateTime).getTime();} catch (ParseException e) {e.printStackTrace();return 0;}}// 初始化消费者连接public void init() {Properties props = new Properties();props.put("bootstrap.servers", bootstrapServers);props.put("group.id", groupId);props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");consumer = new KafkaConsumer<>(props);}}
作者:半兽人
链接:http://orchome.com/1004
来源:OrcHome
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
文章转载自orchome,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




