暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink实现运输公司车辆超速实时监测

大数据技能圈 2023-11-13
51

设计一个物流公司车队管理解决方案,其中车队中的车辆启用了无线网络功能。每辆车定期报告其地理位置和操作参数,如燃油水平、速度、加速度、轴承、发动机温度等。物流公司希望利用这一遥测数据流实现一系列应用程序,以帮助他们管理业务的运营和财务方面。例如,监测运输车辆是否超速。接下来创建一个简单的实时流应用程序来计算运输车辆每几秒的平均速度。在本方案中,使用Kafka作为流数据源,将从Kafka的cars主题来读取这些事件。同时也将Kafka作为流的Data Sink,检测出的超速事件将写入Kafka的fastcars主题。车辆超速实时监测流程序的架构如图所示。


为了模拟车辆发送传感器数据,首先将创建一个Kafka Producer,它将id、speed、acceleration和timestamp写入Kafka的cars主题。

Java代码如下:

    import org.apache.kafka.clients.producer. KafkaProducer;
    import org.apache.kafka.clients.producer. ProducerRecord;
    import Java.util.Properties;
    import Java. util.Random;
    public class RandomCarsKafkaProducer {
    private static final String TOPIC_CARS = "cars";
    //topic
    public static void main(String[ ] args){
    Properties props = new Properties();
    props.put("Bootstrap. servers", "localhost:9092");
    props.put("key. serializer",
    "org. apache.kafka.common. serialization. StringSerializer");
    props.put("value.serializer",
    "org.apache.kafka.common. serialization. StringSerializer");
    KafkaProducer < String, String> producer = new KafkaProducer<>(props);
    //int numRecsToProduce= -1;
    //-1 = infinite
    int numRecsToProduce = 1000;
    //连续产生1000条数据
    produceRecord(producer, numRecsToProduce);
    }
    private static void produceRecord ( KafkaProducer <String, String> producer, int recordNum){
    int interval = 1000;
    //生成有限数据记录
    if(recordNum > 0){
    //生成一条数据,发送一条数据
    producer.send(generateCarRecord(TOPIC_CARS));
    try{
    Thread. sleep( interval);
    } catch(InterruptedException e) {
    e.printStackTrace();
    }
    produceRecord( producer,recordNum - 1);
    }
    //生成无限数据记录
    else if(recordNum < 0) {
    producer.send(generateCarRecord(TOPIC_CARS));
    try {
    Thread.sleep(interval);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    produceRecord(producer, -1);
    }
    }
    //生成一条车辆监测信息的方法
    private static ProducerRecord<String, String> generateCarRecord(String topic) {
    Random r = new Random();
    String carName = "car" + r.nextInt(10);
    int speed = r.nextInt(150);
    float acc = r.nextFloat() * 100;
    long ts = System.currentTimeMillis();
    String value = carName + "," + speed + "," + acc + "," + ts;
    System.out.println("== Writing==:" + value);
    float d = r.nextFloat() * 100;
    if (d < 2) {
    //产生随机时延
    System.out.println("抱歉!有一些网络时延!");
    try {
    Thread.sleep(Float.value0f(d * 100).longValue());
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    return new ProducerRecord<>(topic, "key", value);
    }
    }

    注意,这里的时间戳是在源处生成事件(消息)的时间。

    下面是流处理管道的实现。

      import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
      import org.apache.flink.api.common.functions.AggregateFunction;
      import org.apache.flink.api.common.functions.FilterFunction;
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.common.serialization.SimpleStringSchema;
      import org.apache.flink.api.java.functions.KeySelector;
      import org.apache.flink.api.java.tuple. Tuple2;
      import org.apache.flink.connector.kafka. source.KafkaSource;
      import org.apache.flink.connector.kafka.source.enumerator. initializer.OffsetsInitializer;
      import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.jsonProcessingException;
      import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
      import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTineWindows;
      import org.apache.flink.streaming.api.windowing.time.Time;
      import org.apache.flink.streaming.api.windowing.Windows.TimeWindow;
      import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
      import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
      import org.apache.flink.util.Collector;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import Java.sql.Timestamp;
      import Java.time.Duration;
      import Java. util.Properties;
      /**
      *物流公司车辆超速检测程序
      */
      public class FastCarsDetect {
      //POJO类,检测到的车速数据类型
      public static class CarEvent {
      public String carId;
      //车辆id
      public int speed;
      //速度
      public double acceleration;
      //加速度
      public long timestamp;
      //时间戳
      public CarEvent(){}
      public CarEvent(String carId, int speed, double acceleration, long timestamp) {
      this.carId = carId;
      this.speed = speed;
      this.acceleration = acceleration;
      this.timestamp = timestamp;
      @Override
      public String toString() {
      return "CarEvent{" + "carId= '" + carId + ",speed = " + speed + ", acceleration= " + acceleration + ", timestamp=" + timestamp + '}'";
      }
      }
      POJO类,检测到的平均车速数据类型
      public static class CarAvgEvent {
      public String carId;
      车辆id
      public double avgSpeed;
      //平均速度
      public String start;
      //计算平均值的时间范围下限
      public String end;
      //计算平均值的时间范围上限
      public CarAvgEvent( ){}
      public CarAvgEvent(String carId, double avgSpeed, String start, String end) {
      this.carId = carId;
      this.avgSpeed = avgSpeed;
      this.start = start;
      this.end = end;
      @Override
      public String toString () {
      return "CarEvent{" +
      "carId= '" + carId + '\'' +
      ",avgSpeed =" + avgSpeed +
      ", start = '" + start + '\'' +
      ", end = '" + end + '\'' +
      '}';
      }
      }
      public static void main(String[ ] args) throws Exception {
      //设置流执行环境
      final StreanExecutionEnvironment env =
      StreamExecutionEnvironment. getExecutionEnvironment();
      //Kafka Source
      KafkaSource < String> source = KafkaSource.<String>builder()
      . setBootstrapServers("localhost:9092")
      . setTopics("cars")
      . setGroupId("group- test")
      . setStarting0ffsets(OffsetsInitializer.earliest( ))
      . setValueOnlyDeserializer(new SimpleStringSchema())
      .build();
      //Kafka Sink
      Properties properties = new Properties();
      properties. setProperty("Bootstrap, servers","localhost:9092");
      //Kafka Brokers默认的最大事务超时(transaction,max.timeout.ms)为15min
      //当使用Semantic.EXACTLY_ONCE语义时,下面这个属性值不能超过15min
      properties.setProperty("transaction.timeout.ms",String.value0f(5 * 60 * 100));
      FlinkKafkaProducer myProducer = new FlinkKafkaProducer <CarAvgEvent>(
      "fastcars", //目标topic
      new ObjSerializationSchema("fastcars"), //序列化 schena
      properties, //producer配置
      FlinkKafkaProducer. Semantic.EXACTLY_ONCE //容错性
      );
      myProducer. setWriteTimestampToKafka(true); //水印策略
      WatermarkStrategy<String> watermarkStrategy =WatermarkStrategy
      .<String> forBoundedOutOfOrderness(Duration.ofSeconds(2))
      .withTimestampAssigner ( new SerializableTimestampAssigner < String>() {
      @Override
      public long extractTimestamp(String s, long l) {
      return Long. parseLong(s.split(",")[3]);
      }
      });
      env
      //指定Kafka数据源
      .fromSource(source, watermarkStrategy, "from cars topic")
      //转换为DataStream< SensorReading
      .map(new MapFunction<String,CarEvent>(){
      @Override
      public CarEvent map(String s) throws Exception {
      String[] fields = s.split(",");
      return new CarEvent(fields[0],
      Integer.parseInt( fields[1]),
      Double. parseDouble(fields[2]),
      Long. parseLong(fields[3]));
      }
      })
      //转换为KeyedStream
      .keyBy( new KeySelector < CarEvent, String>() {
      @Override
      public String getKey(CarEvent carEvent) throws Exception {
      return carEvent.carId;
      }
      })
      //大小为5s,滑动为2s的滑动窗口
      .window(SlidingEventTimeWindows, of(Time.seconds(5),Time.seconds(2)))
      //执行增量聚合
      . aggregate( new AvgSpeedAggFun(), new AvgSpeedProcessFun( ))
      .filter(new FilterFunction< CarAvgEvent>(){
      @Override
      public boolean filter(CarAvgEvent carAvgEvent) throws Exception {
      return carAvgEvent. avgSpeed >120.0;
      }
      })
      //.print()
      .addSink(myProducer);
      //触发流程序执行
      env.execute("Flink Sensor Temperature Demo");
      }
      //增量处理函数
      public static class AvgSpeedAggFun implements AggregateFunction<
      CarEvent, //input
      Tuple2 < Double, Long>, //acc,<sum,count>
      Double> { //output,avg
      //创建初始ACC
      @Override
      public Tuple2<Double, Long> createAccumulator() {
      return new Tuple2<>(0.0, 0L);
      }


      //累加每个传感器(每个分区)的事件
      @Override
      public Tuple2<Double, Long> add(CarEvent carEvent, Tuple2<Double, Long> acc) {
      return new Tuple2<>(carEvent.speed + acc.f0, acc.f1 + 1);
      }


      //分区合并
      @Override
      public Tuple2<Double, Long> merge(
      Tuple2<Double, Long> accl,
      Tuple2<Double, Long> acc2) {
      return new Tuple2<>(accl.f0 + acc2.f0, accl.f1 + acc2.f1);
      }


      //返回每个车辆的平均温度
      @Override
      public Double getResult(Tuple2<Double, Long> t2) {
      return t2.f0 / t2.f1;
      }
      }
      //窗口处理函数
      public static class AvgSpeedProcessFun extends ProcessWindowFunction<
      Double,//input type
      CarAvgEvent,//output type
      String,//key type
      TimeWindow> { //window type
      @Override
      public void process(String id,
      Context context,
      Iterable < Double> events,
      Collector < CarAvgEvent> out) {
      double average = Math.round(events.iterator().next()*100) /100.0;
      out, collect(new CarAvgEvent(id, average,
      new Timestamp(context, window( ).getStart()).toString(),
      new Tinestamp(context, window( ).getEnd()).toString()));
      }
      //自定义的序列化模式
      public static class ObjSerializationSchena
      implements KafkaSerializationSchema <CarAvgEvent> {
      private String topic;
      private ObjectMapper mapper;
      public ObjSerializationSchema(String topic) {
      super();
      this. topic = topic;
      }
      @Override
      public ProducerRecord < Byte[],Byte[]> serialize(CarAvgEvent obj, Long timestamp) {
      Byte[] b = null;
      if (mapper == null) {
      mapper = new ObjectMapper();
      }
      try {
      b = mapper.writeValueAsBytes(obj);
      } catch (JsonProcessingException e) {
      //TODO
      }
      return new ProducerRecord<>(topic, b);
      }
      }
                  }

      要运行以上程序,建议按以下步骤执行:

      (1)启动ZooKeeper,命令如下:

        $ ./bin/zookeeper-server-start.sh config/zookeeper.properties

        (2)启动Kafka,命令如下:

          $ . /bin/kafka-server-start. sh config/server.properties

          (3)创建两个topic,命令如下:

            $ ./bin/kafka-topics.sh --list -- zookeeper locallhost:2181
            $ ./bin/kafka-topics. sh --zookeeper localhost::2181 --replication-factor 1 --partitions 1 --create -- topic cars
            $ ./bin/kafka-topics.sh -- zookeeper localhost::2181 --replication-factor 1 --partitions 1 --create -- topic fastcars

            (4)先在一个新的终端窗口中执行消费者脚本,以此来拉取fastcars topic数据,命令如下:

              $ ./bin/kafka-console-consumer.sh --Bootstrap-server localhost:9092 -- topic fastcars

              建议保持窗口运行。

              (5)执行前面编写的流计算程序。

              (6)执行前面编写的数据源程序,命令如下:

                $ cd cars
                $ Java RandomCarsKafkaProducer

                (7)回到消息者脚本执行窗口(第4步的窗口),查看超速数据。如果一切正常,则应该看到在fastcars主题收到的超速数据如下(部分):

                  {"carId" :"car2","avgSpeed":144.0,"start":"2021-08-2712:20:48.0","end":"2021-08-27
                  12:20:53.0"}
                  {"carId" :"car7","avgSpeed":130.0,"start":"2021-08-27 12:20:48.0","end":"2021-08-21
                  12:20:53.0"}
                  {"carId":"car7","avgSpeed":130.0,"start":"2021-08-2712:20:50.0","end":"2021-08-27
                  12:20:55.0"}
                  {"carId":"car4","avgSpeed":148.0,"start":"2021-08-27 12:20:56.0","end":"2021-08-27
                  12:21:01.0"}
                  {"carId":"car4","avgSpeed":148.0,"start":"2021-08-27 12:20:58.0","end":"2021-08-27
                  12:21:03.0"}
                  {"carId" :"car4","avgSpeed":148.0,"start":"2021-08-27 12:21:00.0","end":"2021-08-27
                  12:21:05.0"}
                  {"carId":"carl","avgSpeed":126.5,"start":"2021-08-27 12:21:00.0","end":"2021-08-27
                  12:21:05.0"}
                  {"carId":"car1","avgSpeed":134.0,"start":"2021-08-27 12:21:02.0","end":"2021-08-27
                  12:21:07.0"}
                  {"carId":"car1","avgSpeed":134.0,"start":"2021-08-2712:21:04.0","end":"2021-08-27
                  12:21:09.0"}
                  {"carId":"car7","avgSpeed":149.0,"start":"2021-08-27 12:21:12.0", "end":"2021-08-27
                  12:21:17.0"}
                  {"carId":"car7","avgSpeed":149.0,"start":"2021-08-2712:21:14.0","end":"2021-08-27
                  12:21:19.0"}
                  {"carId":"car7","avgSpeed":149.0,"start":"2021-08-27 12:21:16.0","end":"2021-08-27
                  12:21:21.0}
                  {"carId":"car6","avgSpeed":125.0,"start":"2021-08-2712:21:16.0","end":"2021-08-27
                  12:21:21.0"}
                  {"carId":"car1","avgSpeed":139.0,"start":"2021-08-27 12:21:20.0","end":"2021-08-27
                  12:21:25.0"}
                  {"carId":"car1","avgSpeed":139.0,"start":"2021-08-2712:21:22.0","end":"2021-08-27
                  12:21:27.0"}
                  {"carId":"car0","avgSpeed":144.0,"start":"2021-08-2712:21:40.0","end":"2021-08-27
                  12:21:45.0"}
                  {"carId":"car0","avgSpeed":144.0,"start":"2021-08-2712:21:42.0","end":"2021-08-27
                  12:21:47.0"}
                  {"carId":"car0","avgSpeed":144.0,"start":"2021-08-27 12:21:44.0","end":"2021-08-27
                  12:21:49.0"}
                  {"carId":"car4","avgSpeed":130. 0,"start":"2021-08-27 12:21:46.0","end":"2021-08-27
                  12:21:51.0"}
                  {"carld":"car4","avgSpeed":130.0,"start":"2021-08-27 12:21:48.0","end":"2021-08-27
                  12:21:53.0"}
                  {"carId":"car4","avgSpeed" :130.0,"start":"2021-08-2712:21:50.0","end":"2021-08-27
                  12:21:55.0"}

                  以上。


                  文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论