暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink读取Kafka数据

努力为了自由 2019-09-27
1105

Flink提供了流处理和批处理的抽象级别。

最低级别只提供状态流处理,通过process函数嵌入到datastream中,DataStream和DatasetAPI分别提供流处理和批处理的API,在处理的过程中可以将数据流定义为类似于关系型数据中的表,flink也提供了类似的操作方法,比如select,sum,count等等,在此基础上同样可以定义sql进行相应的数据处理。

Flink数据处理可以接收一个或多个source,经过一系列的transformation最终输出到一个或多个sink.

flink流处理是并行的和分布式,operator子任务的并行度取决于operator,流的并行度取决于生成流的operator。不同的operator具有不同的并行度。


Streams can transport data between two operators in a one-to-one(一对一) 
(or forwarding) pattern, or in a redistributing(重新分发) pattern:
One-to-one streams (for example between the Source and the map() operators
in the figure above) preserve the partitioning and ordering of the 
elements.(保持分区和排序) That means that subtask[1] of the map() operator 
will see the same elements in the same order as they were produced by
subtask[1] of the Source operator.


Redistributing streams (as between map() and keyBy/window above,
as well as between keyBy/window and Sink) change the partitioning
of streams(改变分区). Each operator subtask sends data to different 
target subtasks, depending on the selected transformation. Examples are
keyBy() (which re-partitions by hashing the key), broadcast(),
or rebalance() (which re-partitions randomly). In a redistributing
exchange the ordering among the elements is only preserved within
each pair of sending and receiving subtasks(只在对应的发送方和接收方保持排序) 
(for example, subtask[1] of map() and subtask[2] of keyBy/window).
So in this example, the ordering within each key is preserved,
but the parallelism(并行度) does introduce non-determinism regarding the order 
in which the aggregated results for different keys arrive at the sink.

When referring to time in a streaming program (for example to 
define windows), one can refer to different notions of time:
Event Time is the time when an event was created. It is usually 
described by a timestamp in the events, for example attached by 
the producing sensor, or the producing service. Flink accesses 
event timestamps via timestamp assigners.
Ingestion time is the time when an event enters the Flink dataflow
at the source operator.
Processing Time is the local time at each operator that performs a
time-based operation.

client负责启动程序,向JobManager提交数据

JobManager:相当于master,负责任务调度,资源分配,协调checkpoints以及失败恢复,可以是standlone模式,或者使用Yarn,Mesos进行管理

TaskManager:相当于worker,负责任务的执行,定时向JobManager发送心跳



在实际的业务场景中,一般是将数据通过kafka传输到集群进行处理,本文主要是进行flink读取kafka数据实战。

环境介绍

采用standalone模式

环境配置
192.168.127.128hadoop
192.168.127.129:kafka(kafka_2.11-0.10.2.1)
启动hadoop
cd到hadoop的sbin目录执行 ./start-all.sh
启动zookeeper
cd到zookeeper的bin目录执行 ./zkServer.sh start
启动kafka
./kafka-server-start.sh ../config/server.properties
查看kafka topic列表
./kafka-topics.sh --list --zookeeper localhost:2181
创建topic
./kafka-topics.sh --create --topic topicname --replication-factor 1 --partitions 1 --zookeeper localhost:2181
删除topic
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test2
Topic test2 is marked for deletion
此时topic只是标记被删除,并没有真正的删除,真正的删除要到zookeeper目录下删除
cd到zookeeper的bin目录下执行 ./zkCli.sh,进入zookeeper命令行
执行 ls /brokers/topics 可以看到所有的topic路径
[test2, con, test, test1, __consumer_offsets]
找到要删除的topic路径,执行 rmr /brokers/topics/test2
再执行 ls /brokers/topics
[con, test, test1, __consumer_offsets]
可以看到test2已经被删除了
再进到kafka/bin目录下执行./kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
con
test
test1 可以看到test2已经被删除了

代码实战

package com.cesgroup.example.kafka;


import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;


import java.util.Properties;


/**
*
*/
public class KafkaMessageStreaming {
public static void main(String[] args) throws Exception {


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.127.129:9092");
props.setProperty("group.id", "test-consumer-group");


FlinkKafkaConsumer010<String> consumer =
new FlinkKafkaConsumer010<String>("test1", new SimpleStringSchema(), props);
consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());


DataStream<Tuple2<String, Long>> kafkaStream = env
.addSource(consumer)
.flatMap(new MessageSplitter())
.keyBy(0)
.timeWindow(Time.seconds(10))


.apply(new WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<String, Long>> out) throws Exception {
long sum = 0L;
int count = 0;
for (Tuple2<String, Long> record: input) {
sum += record.f1;
count++;
}
Tuple2<String, Long> result = input.iterator().next();
result.f1 = sum / count;
out.collect(result);
}
});


kafkaStream.writeAsText("/opt/result.txt", FileSystem.WriteMode.OVERWRITE);
kafkaStream.print();
env.execute("Flink-Kafka demo");


}
}


package com.cesgroup.example.kafka;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


/**
* split message
*/
public class MessageSplitter implements FlatMapFunction<String, Tuple2<String, Long>> {


@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
if (value != null && value.contains(",")) {
String[] parts = value.split(",");
out.collect(new Tuple2<String, Long>(parts[1], Long.parseLong(parts[2])));
}
}
}


package com.cesgroup.example.kafka;




import com.sun.management.OperatingSystemMXBean;


import java.lang.management.ManagementFactory;


public class MemoryUsageExtrator {


private static OperatingSystemMXBean mxBean =
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();


/**
* Get current free memory size in bytes
* @return free RAM size
*/
public static long currentFreeMemorySizeInBytes() {
return mxBean.getFreePhysicalMemorySize();
}
}


package com.cesgroup.example.kafka;


import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;


import javax.annotation.Nullable;


/**
* watermark
*/
public class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String> {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
if (lastElement != null && lastElement.contains(",")) {
String[] parts = lastElement.split(",");
return new Watermark(Long.parseLong(parts[0]));
}
return null;
}


@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
if (element != null && element.contains(",")) {
String[] parts = element.split(",");
return Long.parseLong(parts[0]);
}
return 0L;
}
}


参考:

https://www.cnblogs.com/huxi2b/p/7219792.html

文章转载自努力为了自由,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论