暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

flink入门(六到八)

逗先生大数据 2020-09-22
99

点击上方蓝字  关注我们
flink



 Data Sink 介绍(六)

首先 Sink 的意思是:

大概可以猜到了吧!Data sink 有点把数据存储下来(落库)的意思。

如上图,Source 就是数据的来源,中间的 Compute 其实就是 Flink 干的事情,可以做一系列的操作,操作完后就把计算后的数据结果 Sink 到某个地方。(可以是 MySQL、ElasticSearch、Kafka、Cassandra 等)。这里我说下自己目前做告警这块就是把 Compute 计算后的结果 Sink 直接告警出来了(发送告警消息到钉钉群、邮件、短信等),这个 sink 的意思也不一定非得说成要把数据存储到某个地方去。其实官网用的 Connector 来形容要去的地方更合适,这个 Connector 可以有 MySQL、ElasticSearch、Kafka、Cassandra RabbitMQ 等。

Flink Data Sink

前面文章 《从0到1学习Flink》—— Data Source 介绍 介绍了 Flink Data Source 有哪些,这里也看看 Flink Data Sink 支持的有哪些。

看下源码有哪些呢?

可以看到有 Kafka、ElasticSearch、Socket、RabbitMQ、JDBC、Cassandra POJO、File、Print 等 Sink 的方式。

SinkFunction

从上图可以看到 SinkFunction 接口有 invoke 方法,它有一个 RichSinkFunction 抽象类。

上面的那些自带的 Sink 可以看到都是继承了 RichSinkFunction 抽象类,实现了其中的方法,那么我们要是自己定义自己的 Sink 的话其实也是要按照这个套路来做的。

这里就拿个较为简单的 PrintSinkFunction 源码来讲下:



@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;

private static final boolean STD_OUT = false;
private static final boolean STD_ERR = true;

private boolean target;
private transient PrintStream stream;
private transient String prefix;

/**
* Instantiates a print sink function that prints to standard out.
*/
public PrintSinkFunction() {}

/**
* Instantiates a print sink function that prints to standard out.
*
* @param stdErr True, if the format should print to standard error instead of standard out.
*/
public PrintSinkFunction(boolean stdErr) {
target = stdErr;
}

public void setTargetToStandardOut() {
target = STD_OUT;
}

public void setTargetToStandardErr() {
target = STD_ERR;
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
// get the target stream
stream = target == STD_OUT ? System.out : System.err;

// set the prefix if we have a >1 parallelism
prefix = (context.getNumberOfParallelSubtasks() > 1) ?
((context.getIndexOfThisSubtask() + 1) + "> ") : null;
}

@Override
public void invoke(IN record) {
if (prefix != null) {
stream.println(prefix + record.toString());
}
else {
stream.println(record.toString());
}
}

@Override
public void close() {
this.stream = null;
this.prefix = null;
}

@Override
public String toString() {
return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
}
}

可以看到它就是实现了 RichSinkFunction 抽象类,然后实现了 invoke 方法,这里 invoke 方法就是把记录打印出来了就是,没做其他的额外操作。

如何使用?

1
SingleOutputStreamOperator.addSink(new PrintSinkFunction<>();

这样就可以了,如果是其他的 Sink Function 的话需要换成对应的。

使用这个 Function 其效果就是打印从 Source 过来的数据,和直接 Source.print() 效果一样。

下篇文章我们将讲解下如何自定义自己的 Sink Function,并使用一个 demo 来教大家,让大家知道这个套路,且能够在自己工作中自定义自己需要的 Sink Function,来完成自己的工作需求。

最后

本文主要讲了下 Flink 的 Data Sink,并介绍了常见的 Data Sink,也看了下源码的 SinkFunction,介绍了一个简单的 Function 使用, 告诉了大家自定义 Sink Function 的套路,下篇文章带大家写个。

如何自定义 Data Sink(七)

介绍了 Flink Data Sink,也介绍了 Flink 自带的 Sink,那么如何自定义自己的 Sink 呢?这篇文章将写一个 demo 教大家将从 Kafka Source 的数据 Sink 到 MySQL 中去。

准备工作

我们先来看下 Flink 从 Kafka topic 中获取数据的 demo,首先你需要安装好了 FLink 和 Kafka 。

运行启动 Flink、Zookepeer、Kafka,

好了,都启动了!

数据库建表


DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
 `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
 `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
 `password` varchar(25) COLLATE utf8_bin DEFAULT NULL,
 `age` int(10) DEFAULT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

实体类

Student.java


package com.zhisheng.flink.model;

/**
* Desc:
* weixin: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
public class Student {
   public int id;
   public String name;
   public String password;
   public int age;

   public Student() {
   }

   public Student(int id, String name, String password, int age) {
       this.id = id;
       this.name = name;
       this.password = password;
       this.age = age;
   }

@Override
   public String toString() {
       return "Student{" +
               "id=" + id +
               ", name='" + name + '\'' +
               ", password='" + password + '\'' +
               ", age=" + age +
               '}';
   }

   public int getId() {
       return id;
   }

   public void setId(int id) {
       this.id = id;
   }

   public String getName() {
       return name;
   }

   public void setName(String name) {
       this.name = name;
   }

   public String getPassword() {
       return password;
   }

   public void setPassword(String password) {
       this.password = password;
   }

   public int getAge() {
       return age;
   }

   public void setAge(int age) {
       this.age = age;
   }
}

工具类

工具类往 kafka topic student 发送数据


import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Metric;
import com.zhisheng.flink.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
* 往kafka中写数据
* 可以使用这个main函数进行测试一下
* weixin: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
public class KafkaUtils2 {
   public static final String broker_list = "localhost:9092";
   public static final String topic = "student";  //kafka topic 需要和 flink 程序用同一个 topic

   public static void writeToKafka() throws InterruptedException {
       Properties props = new Properties();
       props.put("bootstrap.servers", broker_list);
       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       KafkaProducer producer = new KafkaProducer<String, String>(props);

       for (int i = 1; i <= 100; i++) {
           Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
           ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
           producer.send(record);
           System.out.println("发送数据: " + JSON.toJSONString(student));
       }
       producer.flush();
   }

   public static void main(String[] args) throws InterruptedException {
       writeToKafka();
   }
}

SinkToMySQL

该类就是 Sink Function,继承了 RichSinkFunction ,然后重写了里面的方法。在 invoke 方法中将数据插入到 MySQL 中。


package com.zhisheng.flink.sink;

import com.zhisheng.flink.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
* Desc:
* weixin: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
public class SinkToMySQL extends RichSinkFunction<Student> {
   PreparedStatement ps;
   private Connection connection;

/**
    * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
    *
    * @param parameters
    * @throws Exception
    */
@Override
   public void open(Configuration parameters) throws Exception {
       super.open(parameters);
       connection = getConnection();
       String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
       ps = this.connection.prepareStatement(sql);
   }

@Override
   public void close() throws Exception {
       super.close();
//关闭连接和释放资源
       if (connection != null) {
           connection.close();
       }
       if (ps != null) {
           ps.close();
       }
   }

/**
    * 每条数据的插入都要调用一次 invoke() 方法
    *
    * @param value
    * @param context
    * @throws Exception
    */
@Override
   public void invoke(Student value, Context context) throws Exception {
//组装数据,执行插入操作
       ps.setInt(1, value.getId());
       ps.setString(2, value.getName());
       ps.setString(3, value.getPassword());
       ps.setInt(4, value.getAge());
       ps.executeUpdate();
   }

   private static Connection getConnection() {
       Connection con = null;
       try {
           Class.forName("com.mysql.jdbc.Driver");
           con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456");
       } catch (Exception e) {
           System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
       }
       return con;
   }
}

Flink 程序

这里的 source 是从 kafka 读取数据的,然后 Flink 从 Kafka 读取到数据(JSON)后用阿里 fastjson 来解析成 student 对象,然后在 addSink 中使用我们创建的 SinkToMySQL,这样就可以把数据存储到 MySQL 了。


package com.zhisheng.flink;

import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Student;
import com.zhisheng.flink.sink.SinkToMySQL;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

/**
* Desc:
* weixin: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
public class Main3 {
   public static void main(String[] args) throws Exception {
       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       Properties props = new Properties();
       props.put("bootstrap.servers", "localhost:9092");
       props.put("zookeeper.connect", "localhost:2181");
       props.put("group.id", "metric-group");
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       props.put("auto.offset.reset", "latest");

       SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
               "student",   //这个 kafka topic 需要和上面的工具类的 topic 一致
               new SimpleStringSchema(),
               props)).setParallelism(1)
               .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象

       student.addSink(new SinkToMySQL()); //数据 sink 到 mysql

       env.execute("Flink add sink");
   }
}

结果

运行 Flink 程序,然后再运行 KafkaUtils2.java 工具类,这样就可以了。

如果数据插入成功了,那么我们查看下我们的数据库:

数据库中已经插入了 100 条我们从 Kafka 发送的数据了。证明我们的 SinkToMySQL 起作用了。是不是很简单?

项目结构

怕大家不知道我的项目结构,这里发个截图看下:

最后

本文主要利用一个 demo,告诉大家如何自定义 Sink Function,将从 Kafka 的数据 Sink 到 MySQL 中,如果你项目中有其他的数据来源,你也可以换成对应的 Source,也有可能你的 Sink 是到其他的地方或者其他不同的方式,那么依旧是这个套路:继承 RichSinkFunction 抽象类,重写 invoke 方法。

Flink Data transformation(八)

就说过 Flink 程序的结构

Flink 应用程序结构就是如上图所示:

1、Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。

2、Transformation:数据转换的各种操作,有 Map FlatMap Filter KeyBy Reduce Fold Aggregations Window WindowAll Union Window join Split Select Project 等,操作很多,可以将数据转换计算成你想要的数据。

3、Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 Sink。

那么这篇文章我们就来看下 Flink Data Transformation 吧,数据转换操作还是蛮多的,需要好好讲讲!

Transformation

Map

这是最简单的转换之一,其中输入是一个数据流,输出的也是一个数据流:

还是拿上一篇文章的案例来将数据进行 map 转换操作:

1
2
3
4
5
6
7
8
9
10
11
12
SingleOutputStreamOperator<Student> map = student.map(new MapFunction<Student, Student>() {
@Override
   public Student map(Student value) throws Exception {
       Student s1 = new Student();
       s1.id = value.id;
       s1.name = value.name;
       s1.password = value.password;
       s1.age = value.age + 5;
       return s1;
   }
});
map.print();

将每个人的年龄都增加 5 岁,其他不变。

FlatMap

FlatMap 采用一条记录并输出零个,一个或多个记录。

1
2
3
4
5
6
7
8
9
SingleOutputStreamOperator<Student> flatMap = student.flatMap(new FlatMapFunction<Student, Student>() {
@Override
   public void flatMap(Student value, Collector<Student> out) throws Exception {
       if (value.id % 2 == 0) {
           out.collect(value);
       }
   }
});
flatMap.print();

这里将 id 为偶数的聚集出来。

Filter

Filter 函数根据条件判断出结果。

1
2
3
4
5
6
7
8
9
10
SingleOutputStreamOperator<Student> filter = student.filter(new FilterFunction<Student>() {
@Override
   public boolean filter(Student value) throws Exception {
       if (value.id > 95) {
           return true;
       }
       return false;
   }
});
filter.print();

这里将 id 大于 95 的过滤出来,然后打印出来。

KeyBy

KeyBy 在逻辑上是基于 key 对流进行分区。在内部,它使用 hash 函数对流进行分区。它返回 KeyedDataStream 数据流。

1
2
3
4
5
6
7
KeyedStream<Student, Integer> keyBy = student.keyBy(new KeySelector<Student, Integer>() {
@Override
   public Integer getKey(Student value) throws Exception {
       return value.age;
   }
});
keyBy.print();

上面对 student 的 age 做 KeyBy 操作分区

Reduce

Reduce 返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SingleOutputStreamOperator<Student> reduce = student.keyBy(new KeySelector<Student, Integer>() {
@Override
   public Integer getKey(Student value) throws Exception {
       return value.age;
   }
}).reduce(new ReduceFunction<Student>() {
@Override
   public Student reduce(Student value1, Student value2) throws Exception {
       Student student1 = new Student();
       student1.name = value1.name + value2.name;
       student1.id = (value1.id + value2.id) 2;
       student1.password = value1.password + value2.password;
       student1.age = (value1.age + value2.age) 2;
       return student1;
   }
});
reduce.print();

上面先将数据流进行 keyby 操作,因为执行 reduce 操作只能是 KeyedStream,然后将 student 对象的 age 做了一个求平均值的操作。

Fold

Fold 通过将最后一个文件夹流与当前记录组合来推出 KeyedStream。 它会发回数据流。

1
2
3
4
5
6
KeyedStream.fold("1", new FoldFunction<Integer, String>() {
@Override
   public String fold(String accumulator, Integer value) throws Exception {
       return accumulator + "=" + value;
   }
})

Aggregations

DataStream API 支持各种聚合,例如 min,max,sum 等。 这些函数可以应用于 KeyedStream 以获得 Aggregations 聚合。

1
2
3
4
5
6
7
8
9
10
KeyedStream.sum(0) 
KeyedStream.sum("key")
KeyedStream.min(0)
KeyedStream.min("key")
KeyedStream.max(0)
KeyedStream.max("key")
KeyedStream.minBy(0)
KeyedStream.minBy("key")
KeyedStream.maxBy(0)
KeyedStream.maxBy("key")

max 和 maxBy 之间的区别在于 max 返回流中的最大值,但 maxBy 返回具有最大值的键, min 和 minBy 同理。

Window

Window 函数允许按时间或其他条件对现有 KeyedStream 进行分组。 以下是以 10 秒的时间窗口聚合:

1
inputStream.keyBy(0).window(Time.seconds(10));

Flink 定义数据片段以便(可能)处理无限数据流。 这些切片称为窗口。 此切片有助于通过应用转换处理数据块。 要对流进行窗口化,我们需要分配一个可以进行分发的键和一个描述要对窗口化流执行哪些转换的函数

要将流切片到窗口,我们可以使用 Flink 自带的窗口分配器。 我们有选项,如 tumbling windows, sliding windows, global 和 session windows。 Flink 还允许您通过扩展 WindowAssginer 类来编写自定义窗口分配器。 这里先预留下篇文章来讲解这些不同的 windows 是如何工作的。

WindowAll

windowAll 函数允许对常规数据流进行分组。 通常,这是非并行数据转换,因为它在非分区数据流上运行。

与常规数据流功能类似,我们也有窗口数据流功能。 唯一的区别是它们处理窗口数据流。 所以窗口缩小就像 Reduce 函数一样,Window fold 就像 Fold 函数一样,并且还有聚合。

1
inputStream.keyBy(0).windowAll(Time.seconds(10));

Union

Union 函数将两个或多个数据流结合在一起。 这样就可以并行地组合数据流。 如果我们将一个流与自身组合,那么它会输出每个记录两次。

1
inputStream.union(inputStream1, inputStream2, ...);

Window join

我们可以通过一些 key 将同一个 window 的两个数据流 join 起来。

1
2
3
4
inputStream.join(inputStream1)
          .where(0).equalTo(1)
          .window(Time.seconds(5))    
          .apply (new JoinFunction () {...});

以上示例是在 5 秒的窗口中连接两个流,其中第一个流的第一个属性的连接条件等于另一个流的第二个属性。

Split

此功能根据条件将流拆分为两个或多个流。 当您获得混合流并且您可能希望单独处理每个数据流时,可以使用此方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {
@Override
   public Iterable<String> select(Integer value) {
       List<String> output = new ArrayList<String>();
       if (value % 2 == 0) {
           output.add("even");
       }
       else {
           output.add("odd");
       }
       return output;
   }
});

Select

此功能允许您从拆分流中选择特定流。

1
2
3
4
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

Project

Project 函数允许您从事件流中选择属性子集,并仅将所选元素发送到下一个处理流。

1
2
DataStream<Tuple4<Integer, Double, String, String>> in = // [...] 
DataStream<Tuple2<String, String>> out = in.project(3,2);

上述函数从给定记录中选择属性号 2 和 3。 以下是示例输入和输出记录:

1
2
(1,10.0,A,B)=> (B,A)
(2,20.0,C,D)=> (D,C)

最后

本文主要介绍了 Flink Data 的常用转换方式:Map、FlatMap、Filter、KeyBy、Reduce、Fold、Aggregations、Window、WindowAll、Union、Window Join、Split、Select、Project 等。并用了点简单的 demo 介绍了如何使用,具体在项目中该如何将数据流转换成我们想要的格式,还需要根据实际情况对待。







文章转载自逗先生大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论