1、代码逻辑实现
package day03;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Random;
/**
* @program: bigData_learn
* @description: 每个用户每5秒钟窗口的pv
* @author: Mr.逗
* @create: 2021-09-22 16:31
**/
public class FiveSecondWindow {
// SourceFunction并行度只能为1
// 自定义并行化版本的数据源,需要使用ParallelSourceFunction
public static class ClickSource implements SourceFunction<Event> {
private boolean running = true;
private String[] userArr = {"Mary", "Bob", "Alice", "Liz"};
private String[] urlArr = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
private Random random = new Random();
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while (running) {
// collect方法,向下游发送数据
ctx.collect(
new Event(
userArr[random.nextInt(userArr.length)],
urlArr[random.nextInt(urlArr.length)],
Calendar.getInstance().getTimeInMillis()
)
);
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
public static class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user, String url, Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Event{" +
"user='" + user + '\'' +
", url='" + url + '\'' +
", timestamp=" + new Timestamp(timestamp) +
'}';
}
}
public static class WindowResult extends ProcessWindowFunction<Event,String,String, TimeWindow>
{
@Override
public void process(String value, Context ctx, Iterable<Event> it, Collector<String> out) throws Exception {
long start = ctx.window().getStart();
long end = ctx.window().getEnd();
long count = it.spliterator().getExactSizeIfKnown();//迭代器里面共有多少条数据
out.collect("用户"+value+"在窗口"+new Timestamp(end)+new Timestamp(start)+""+"中的pv次数是:"+count);
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> source = env.addSource(new ClickSource());
source.keyBy(v->v.user)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new WindowResult())
.print();
String name = FiveSecondWindow.class.getName();
env.execute(name);
}
}
2、结果展示
用户Bob在窗口2021-09-22 16:45:00.02021-09-22 16:44:55.0中的pv次数是:2
用户Mary在窗口2021-09-22 16:45:00.02021-09-22 16:44:55.0中的pv次数是:2
用户Bob在窗口2021-09-22 16:45:05.02021-09-22 16:45:00.0中的pv次数是:2
用户Mary在窗口2021-09-22 16:45:05.02021-09-22 16:45:00.0中的pv次数是:1
用户Liz在窗口2021-09-22 16:45:05.02021-09-22 16:45:00.0中的pv次数是:2
用户Liz在窗口2021-09-22 16:45:10.02021-09-22 16:45:05.0中的pv次数是:1
用户Bob在窗口2021-09-22 16:45:10.02021-09-22 16:45:05.0中的pv次数是:1
用户Mary在窗口2021-09-22 16:45:10.02021-09-22 16:45:05.0中的pv次数是:3
用户Bob在窗口2021-09-22 16:45:15.02021-09-22 16:45:10.0中的pv次数是:4
用户Mary在窗口2021-09-22 16:45:15.02021-09-22 16:45:10.0中的pv次数是:1
用户Mary在窗口2021-09-22 16:45:20.02021-09-22 16:45:15.0中的pv次数是:2
用户Bob在窗口2021-09-22 16:45:20.02021-09-22 16:45:15.0中的pv次数是:1
用户Alice在窗口2021-09-22 16:45:20.02021-09-22 16:45:15.0中的pv次数是:2
用户Bob在窗口2021-09-22 16:45:25.02021-09-22 16:45:20.0中的pv次数是:3
用户Liz在窗口2021-09-22 16:45:25.02021-09-22 16:45:20.0中的pv次数是:1
用户Mary在窗口2021-09-22 16:45:25.02021-09-22 16:45:20.0中的pv次数是:1
用户Liz在窗口2021-09-22 16:45:30.02021-09-22 16:45:25.0中的pv次数是:1
用户Alice在窗口2021-09-22 16:45:30.02021-09-22 16:45:25.0中的pv次数是:1
用户Mary在窗口2021-09-22 16:45:30.02021-09-22 16:45:25.0中的pv次数是:3
用户Mary在窗口2021-09-22 16:45:35.02021-09-22 16:45:30.0中的pv次数是:2
用户Bob在窗口2021-09-22 16:45:35.02021-09-22 16:45:30.0中的pv次数是:2
用户Liz在窗口2021-09-22 16:45:35.02021-09-22 16:45:30.0中的pv次数是:1
用户Mary在窗口2021-09-22 16:45:40.02021-09-22 16:45:35.0中的pv次数是:4
用户Alice在窗口2021-09-22 16:45:40.02021-09-22 16:45:35.0中的pv次数是:1
用户Liz在窗口2021-09-22 16:45:45.02021-09-22 16:45:40.0中的pv次数是:1
用户Alice在窗口2021-09-22 16:45:45.02021-09-22 16:45:40.0中的pv次数是:2
用户Mary在窗口2021-09-22 16:45:45.02021-09-22 16:45:40.0中的pv次数是:1
用户Bob在窗口2021-09-22 16:45:45.02021-09-22 16:45:40.0中的pv次数是:1
用户Bob在窗口2021-09-22 16:45:50.02021-09-22 16:45:45.0中的pv次数是:4
用户Liz在窗口2021-09-22 16:45:50.02021-09-22 16:45:45.0中的pv次数是:1
用户Alice在窗口2021-09-22 16:45:55.02021-09-22 16:45:50.0中的pv次数是:2
用户Bob在窗口2021-09-22 16:45:55.02021-09-22 16:45:50.0中的pv次数是:3
用户Bob在窗口2021-09-22 16:46:00.02021-09-22 16:45:55.0中的pv次数是:3
用户Mary在窗口2021-09-22 16:46:00.02021-09-22 16:45:55.0中的pv次数是:1
用户Alice在窗口2021-09-22 16:46:00.02021-09-22 16:45:55.0中的pv次数是:1
用户Liz在窗口2021-09-22 16:46:05.02021-09-22 16:46:00.0中的pv次数是:1
用户Mary在窗口2021-09-22 16:46:05.02021-09-22 16:46:00.0中的pv次数是:1
用户Alice在窗口2021-09-22 16:46:05.02021-09-22 16:46:00.0中的pv次数是:3
用户Alice在窗口2021-09-22 16:46:10.02021-09-22 16:46:05.0中的pv次数是:1
用户Liz在窗口2021-09-22 16:46:10.02021-09-22 16:46:05.0中的pv次数是:1
用户Mary在窗口2021-09-22 16:46:10.02021-09-22 16:46:05.0中的pv次数是:1
用户Bob在窗口2021-09-22 16:46:10.02021-09-22 16:46:05.0中的pv次数是:2
用户Liz在窗口2021-09-22 16:46:15.02021-09-22 16:46:10.0中的pv次数是:2
用户Bob在窗口2021-09-22 16:46:15.02021-09-22 16:46:10.0中的pv次数是:1
用户Alice在窗口2021-09-22 16:46:15.02021-09-22 16:46:10.0中的pv次数是:2
用户Alice在窗口2021-09-22 16:46:20.02021-09-22 16:46:15.0中的pv次数是:2
用户Liz在窗口2021-09-22 16:46:20.02021-09-22 16:46:15.0中的pv次数是:1
用户Bob在窗口2021-09-22 16:46:20.02021-09-22 16:46:15.0中的pv次数是:1
用户Mary在窗口2021-09-22 16:46:20.02021-09-22 16:46:15.0中的pv次数是:1
用户Bob在窗口2021-09-22 16:46:25.02021-09-22 16:46:20.0中的pv次数是:1
用户Mary在窗口2021-09-22 16:46:25.02021-09-22 16:46:20.0中的pv次数是:1
用户Alice在窗口2021-09-22 16:46:25.02021-09-22 16:46:20.0中的pv次数是:2
用户Bob在窗口2021-09-22 16:46:30.02021-09-22 16:46:25.0中的pv次数是:2
用户Mary在窗口2021-09-22 16:46:30.02021-09-22 16:46:25.0中的pv次数是:1
用户Alice在窗口2021-09-22 16:46:30.02021-09-22 16:46:25.0中的pv次数是:2
用户Bob在窗口2021-09-22 16:46:35.02021-09-22 16:46:30.0中的pv次数是:2
用户Mary在窗口2021-09-22 16:46:35.02021-09-22 16:46:30.0中的pv次数是:1
用户Alice在窗口2021-09-22 16:46:35.02021-09-22 16:46:30.0中的pv次数是:2
用户Liz在窗口2021-09-22 16:46:40.02021-09-22 16:46:35.0中的pv次数是:2
用户Mary在窗口2021-09-22 16:46:40.02021-09-22 16:46:35.0中的pv次数是:1
用户Bob在窗口2021-09-22 16:46:40.02021-09-22 16:46:35.0中的pv次数是:2
用户Alice在窗口2021-09-22 16:46:45.02021-09-22 16:46:40.0中的pv次数是:1
用户Mary在窗口2021-09-22 16:46:45.02021-09-22 16:46:40.0中的pv次数是:1
用户Liz在窗口2021-09-22 16:46:45.02021-09-22 16:46:40.0中的pv次数是:3
用户Alice在窗口2021-09-22 16:46:50.02021-09-22 16:46:45.0中的pv次数是:1
用户Liz在窗口2021-09-22 16:46:50.02021-09-22 16:46:45.0中的pv次数是:1
用户Bob在窗口2021-09-22 16:46:50.02021-09-22 16:46:45.0中的pv次数是:1
用户Mary在窗口2021-09-22 16:46:50.02021-09-22 16:46:45.0中的pv次数是:2
文章转载自逗先生大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




