设计一个物流公司车队管理解决方案,其中车队中的车辆启用了无线网络功能。每辆车定期报告其地理位置和操作参数,如燃油水平、速度、加速度、轴承、发动机温度等。物流公司希望利用这一遥测数据流实现一系列应用程序,以帮助他们管理业务的运营和财务方面。例如,监测运输车辆是否超速。接下来创建一个简单的实时流应用程序来计算运输车辆每几秒的平均速度。在本方案中,使用Kafka作为流数据源,将从Kafka的cars主题来读取这些事件。同时也将Kafka作为流的Data Sink,检测出的超速事件将写入Kafka的fastcars主题。车辆超速实时监测流程序的架构如图所示。

为了模拟车辆发送传感器数据,首先将创建一个Kafka Producer,它将id、speed、acceleration和timestamp写入Kafka的cars主题。
Java代码如下:
import org.apache.kafka.clients.producer. KafkaProducer;import org.apache.kafka.clients.producer. ProducerRecord;import Java.util.Properties;import Java. util.Random;public class RandomCarsKafkaProducer {private static final String TOPIC_CARS = "cars";//topicpublic static void main(String[ ] args){Properties props = new Properties();props.put("Bootstrap. servers", "localhost:9092");props.put("key. serializer","org. apache.kafka.common. serialization. StringSerializer");props.put("value.serializer","org.apache.kafka.common. serialization. StringSerializer");KafkaProducer < String, String> producer = new KafkaProducer<>(props);//int numRecsToProduce= -1;//-1 = infiniteint numRecsToProduce = 1000;//连续产生1000条数据produceRecord(producer, numRecsToProduce);}private static void produceRecord ( KafkaProducer <String, String> producer, int recordNum){int interval = 1000;//生成有限数据记录if(recordNum > 0){//生成一条数据,发送一条数据producer.send(generateCarRecord(TOPIC_CARS));try{Thread. sleep( interval);} catch(InterruptedException e) {e.printStackTrace();}produceRecord( producer,recordNum - 1);}//生成无限数据记录else if(recordNum < 0) {producer.send(generateCarRecord(TOPIC_CARS));try {Thread.sleep(interval);} catch (InterruptedException e) {e.printStackTrace();}produceRecord(producer, -1);}}//生成一条车辆监测信息的方法private static ProducerRecord<String, String> generateCarRecord(String topic) {Random r = new Random();String carName = "car" + r.nextInt(10);int speed = r.nextInt(150);float acc = r.nextFloat() * 100;long ts = System.currentTimeMillis();String value = carName + "," + speed + "," + acc + "," + ts;System.out.println("== Writing==:" + value);float d = r.nextFloat() * 100;if (d < 2) {//产生随机时延System.out.println("抱歉!有一些网络时延!");try {Thread.sleep(Float.value0f(d * 100).longValue());} catch (InterruptedException e) {e.printStackTrace();}}return new ProducerRecord<>(topic, "key", value);}}
注意,这里的时间戳是在源处生成事件(消息)的时间。
下面是流处理管道的实现。
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.AggregateFunction;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple. Tuple2;import org.apache.flink.connector.kafka. source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator. initializer.OffsetsInitializer;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.jsonProcessingException;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTineWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.Windows.TimeWindow;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import org.apache.flink.util.Collector;import org.apache.kafka.clients.producer.ProducerRecord;import Java.sql.Timestamp;import Java.time.Duration;import Java. util.Properties;/***物流公司车辆超速检测程序*/public class FastCarsDetect {//POJO类,检测到的车速数据类型public static class CarEvent {public String carId;//车辆idpublic int speed;//速度public double acceleration;//加速度public long timestamp;//时间戳public CarEvent(){}public CarEvent(String carId, int speed, double acceleration, long timestamp) {this.carId = carId;this.speed = speed;this.acceleration = acceleration;this.timestamp = timestamp;@Overridepublic String toString() {return "CarEvent{" + "carId= '" + carId + ",speed = " + speed + ", acceleration= " + acceleration + ", timestamp=" + timestamp + '}'";}}POJO类,检测到的平均车速数据类型public static class CarAvgEvent {public String carId;车辆idpublic double avgSpeed;//平均速度public String start;//计算平均值的时间范围下限public String end;//计算平均值的时间范围上限public CarAvgEvent( ){}public CarAvgEvent(String carId, double avgSpeed, String start, String end) {this.carId = carId;this.avgSpeed = avgSpeed;this.start = start;this.end = end;@Overridepublic String toString () {return "CarEvent{" +"carId= '" + carId + '\'' +",avgSpeed =" + avgSpeed +", start = '" + start + '\'' +", end = '" + end + '\'' +'}';}}public static void main(String[ ] args) throws Exception {//设置流执行环境final StreanExecutionEnvironment env =StreamExecutionEnvironment. getExecutionEnvironment();//Kafka SourceKafkaSource < String> source = KafkaSource.<String>builder(). setBootstrapServers("localhost:9092"). setTopics("cars"). setGroupId("group- test"). setStarting0ffsets(OffsetsInitializer.earliest( )). setValueOnlyDeserializer(new SimpleStringSchema()).build();//Kafka SinkProperties properties = new Properties();properties. setProperty("Bootstrap, servers","localhost:9092");//Kafka Brokers默认的最大事务超时(transaction,max.timeout.ms)为15min//当使用Semantic.EXACTLY_ONCE语义时,下面这个属性值不能超过15minproperties.setProperty("transaction.timeout.ms",String.value0f(5 * 60 * 100));FlinkKafkaProducer myProducer = new FlinkKafkaProducer <CarAvgEvent>("fastcars", //目标topicnew ObjSerializationSchema("fastcars"), //序列化 schenaproperties, //producer配置FlinkKafkaProducer. Semantic.EXACTLY_ONCE //容错性);myProducer. setWriteTimestampToKafka(true); //水印策略WatermarkStrategy<String> watermarkStrategy =WatermarkStrategy.<String> forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner ( new SerializableTimestampAssigner < String>() {@Overridepublic long extractTimestamp(String s, long l) {return Long. parseLong(s.split(",")[3]);}});env//指定Kafka数据源.fromSource(source, watermarkStrategy, "from cars topic")//转换为DataStream< SensorReading.map(new MapFunction<String,CarEvent>(){@Overridepublic CarEvent map(String s) throws Exception {String[] fields = s.split(",");return new CarEvent(fields[0],Integer.parseInt( fields[1]),Double. parseDouble(fields[2]),Long. parseLong(fields[3]));}})//转换为KeyedStream.keyBy( new KeySelector < CarEvent, String>() {@Overridepublic String getKey(CarEvent carEvent) throws Exception {return carEvent.carId;}})//大小为5s,滑动为2s的滑动窗口.window(SlidingEventTimeWindows, of(Time.seconds(5),Time.seconds(2)))//执行增量聚合. aggregate( new AvgSpeedAggFun(), new AvgSpeedProcessFun( )).filter(new FilterFunction< CarAvgEvent>(){@Overridepublic boolean filter(CarAvgEvent carAvgEvent) throws Exception {return carAvgEvent. avgSpeed >120.0;}})//.print().addSink(myProducer);//触发流程序执行env.execute("Flink Sensor Temperature Demo");}//增量处理函数public static class AvgSpeedAggFun implements AggregateFunction<CarEvent, //inputTuple2 < Double, Long>, //acc,<sum,count>Double> { //output,avg//创建初始ACC@Overridepublic Tuple2<Double, Long> createAccumulator() {return new Tuple2<>(0.0, 0L);}//累加每个传感器(每个分区)的事件@Overridepublic Tuple2<Double, Long> add(CarEvent carEvent, Tuple2<Double, Long> acc) {return new Tuple2<>(carEvent.speed + acc.f0, acc.f1 + 1);}//分区合并@Overridepublic Tuple2<Double, Long> merge(Tuple2<Double, Long> accl,Tuple2<Double, Long> acc2) {return new Tuple2<>(accl.f0 + acc2.f0, accl.f1 + acc2.f1);}//返回每个车辆的平均温度@Overridepublic Double getResult(Tuple2<Double, Long> t2) {return t2.f0 / t2.f1;}}//窗口处理函数public static class AvgSpeedProcessFun extends ProcessWindowFunction<Double,//input typeCarAvgEvent,//output typeString,//key typeTimeWindow> { //window type@Overridepublic void process(String id,Context context,Iterable < Double> events,Collector < CarAvgEvent> out) {double average = Math.round(events.iterator().next()*100) /100.0;out, collect(new CarAvgEvent(id, average,new Timestamp(context, window( ).getStart()).toString(),new Tinestamp(context, window( ).getEnd()).toString()));}//自定义的序列化模式public static class ObjSerializationSchenaimplements KafkaSerializationSchema <CarAvgEvent> {private String topic;private ObjectMapper mapper;public ObjSerializationSchema(String topic) {super();this. topic = topic;}@Overridepublic ProducerRecord < Byte[],Byte[]> serialize(CarAvgEvent obj, Long timestamp) {Byte[] b = null;if (mapper == null) {mapper = new ObjectMapper();}try {b = mapper.writeValueAsBytes(obj);} catch (JsonProcessingException e) {//TODO}return new ProducerRecord<>(topic, b);}}}
要运行以上程序,建议按以下步骤执行:
(1)启动ZooKeeper,命令如下:
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
(2)启动Kafka,命令如下:
$ . /bin/kafka-server-start. sh config/server.properties
(3)创建两个topic,命令如下:
$ ./bin/kafka-topics.sh --list -- zookeeper locallhost:2181$ ./bin/kafka-topics. sh --zookeeper localhost::2181 --replication-factor 1 --partitions 1 --create -- topic cars$ ./bin/kafka-topics.sh -- zookeeper localhost::2181 --replication-factor 1 --partitions 1 --create -- topic fastcars
(4)先在一个新的终端窗口中执行消费者脚本,以此来拉取fastcars topic数据,命令如下:
$ ./bin/kafka-console-consumer.sh --Bootstrap-server localhost:9092 -- topic fastcars
建议保持窗口运行。
(5)执行前面编写的流计算程序。
(6)执行前面编写的数据源程序,命令如下:
$ cd cars$ Java RandomCarsKafkaProducer
(7)回到消息者脚本执行窗口(第4步的窗口),查看超速数据。如果一切正常,则应该看到在fastcars主题收到的超速数据如下(部分):
{"carId" :"car2","avgSpeed":144.0,"start":"2021-08-2712:20:48.0","end":"2021-08-2712:20:53.0"}{"carId" :"car7","avgSpeed":130.0,"start":"2021-08-27 12:20:48.0","end":"2021-08-2112:20:53.0"}{"carId":"car7","avgSpeed":130.0,"start":"2021-08-2712:20:50.0","end":"2021-08-2712:20:55.0"}{"carId":"car4","avgSpeed":148.0,"start":"2021-08-27 12:20:56.0","end":"2021-08-2712:21:01.0"}{"carId":"car4","avgSpeed":148.0,"start":"2021-08-27 12:20:58.0","end":"2021-08-2712:21:03.0"}{"carId" :"car4","avgSpeed":148.0,"start":"2021-08-27 12:21:00.0","end":"2021-08-2712:21:05.0"}{"carId":"carl","avgSpeed":126.5,"start":"2021-08-27 12:21:00.0","end":"2021-08-2712:21:05.0"}{"carId":"car1","avgSpeed":134.0,"start":"2021-08-27 12:21:02.0","end":"2021-08-2712:21:07.0"}{"carId":"car1","avgSpeed":134.0,"start":"2021-08-2712:21:04.0","end":"2021-08-2712:21:09.0"}{"carId":"car7","avgSpeed":149.0,"start":"2021-08-27 12:21:12.0", "end":"2021-08-2712:21:17.0"}{"carId":"car7","avgSpeed":149.0,"start":"2021-08-2712:21:14.0","end":"2021-08-2712:21:19.0"}{"carId":"car7","avgSpeed":149.0,"start":"2021-08-27 12:21:16.0","end":"2021-08-2712:21:21.0}{"carId":"car6","avgSpeed":125.0,"start":"2021-08-2712:21:16.0","end":"2021-08-2712:21:21.0"}{"carId":"car1","avgSpeed":139.0,"start":"2021-08-27 12:21:20.0","end":"2021-08-2712:21:25.0"}{"carId":"car1","avgSpeed":139.0,"start":"2021-08-2712:21:22.0","end":"2021-08-2712:21:27.0"}{"carId":"car0","avgSpeed":144.0,"start":"2021-08-2712:21:40.0","end":"2021-08-2712:21:45.0"}{"carId":"car0","avgSpeed":144.0,"start":"2021-08-2712:21:42.0","end":"2021-08-2712:21:47.0"}{"carId":"car0","avgSpeed":144.0,"start":"2021-08-27 12:21:44.0","end":"2021-08-2712:21:49.0"}{"carId":"car4","avgSpeed":130. 0,"start":"2021-08-27 12:21:46.0","end":"2021-08-2712:21:51.0"}{"carld":"car4","avgSpeed":130.0,"start":"2021-08-27 12:21:48.0","end":"2021-08-2712:21:53.0"}{"carId":"car4","avgSpeed" :130.0,"start":"2021-08-2712:21:50.0","end":"2021-08-2712:21:55.0"}
以上。
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




