暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink操练(二十八)之ListState求平均值

逗先生大数据 2021-11-12
1691

1、代码实现逻辑

package day03;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.Iterator;
import java.util.Random;

/**
* @program: Flink_learn
* @description: 使用列表状态变量求平均值
* @author: Mr.逗
* @create: 2021-09-17 16:41
**/

public class AvgByListState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source = env.addSource(new SourceFunction<Integer>() {
private boolean isRunning = true;
private Random random = new Random();

@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (isRunning) {
ctx.collect(random.nextInt(10));
Thread.sleep(300L);
}
}

@Override
public void cancel() {
isRunning = false;
}
});
source.keyBy(v->1)
.process(new KeyedProcessFunction<Integer, Integer, String>() {
private ListState<Integer> listState;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
listState=getRuntimeContext().getListState(new ListStateDescriptor<Integer>("list-state", Types.INT));
}
@Override
public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
listState.add(value);
Integer sum=0;
Integer count=0;
Iterator<Integer> it = listState.get().iterator();
while (it.hasNext())
{
sum+=it.next();
count+=1;
}
Double avg=(double)sum/count;
out.collect("元素和为:"+sum+",其元素个数为:"+count+"其平均值"+avg);
}
}).print();
String name = AvgByListState.class.getName();
env.execute(name);
}
}

2、结果之展示

"C:\Program Files\Java\jdk1.8.0_191\bin\java.exe" "-javaagent:F:\app\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=50113:F:\app\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_191\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\rt.jar;D:\bigData\bigData_learn\Flink_learn\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-java\1.13.0\flink-java-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-core\1.13.0\flink-core-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-annotations\1.13.0\flink-annotations-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-metrics-core\1.13.0\flink-metrics-core-1.13.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\force-shading\1.13.0\force-shading-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-java_2.12\1.13.0\flink-streaming-java_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-file-sink-common\1.13.0\flink-file-sink-common-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-runtime_2.12\1.13.0\flink-runtime_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-queryable-state-client-java\1.13.0\flink-queryable-state-client-java-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-hadoop-fs\1.13.0\flink-hadoop-fs-1.13.0.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.7\commons-io-2.7.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.49.Final-13.0\flink-shaded-netty-4.1.49.Final-13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-jackson\2.12.1-13.0\flink-shaded-jackson-2.12.1-13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-13.0\flink-shaded-zookeeper-3-3.4.14-13.0.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;C:\Users\Administrator\.m2\repository\com\typesafe\config\1.3.3\config-1.3.3.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;C:\Users\Administrator\.m2\repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;C:\Users\Administrator\.m2\repository\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;C:\Users\Administrator\.m2\repository\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;C:\Users\Administrator\.m2\repository\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.8.3\snappy-java-1.1.8.3.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\Administrator\.m2\repository\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-13.0\flink-shaded-guava-18.0-13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-clients_2.12\1.13.0\flink-clients_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-optimizer_2.12\1.13.0\flink-optimizer_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-table-api-java-bridge_2.12\1.13.0\flink-table-api-java-bridge_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-table-api-java\1.13.0\flink-table-api-java-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-table-planner-blink_2.12\1.13.0\flink-table-planner-blink_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-table-api-scala_2.12\1.13.0\flink-table-api-scala_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-table-api-scala-bridge_2.12\1.13.0\flink-table-api-scala-bridge_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-table-runtime-blink_2.12\1.13.0\flink-table-runtime-blink_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\codehaus\janino\janino\3.0.11\janino-3.0.11.jar;C:\Users\Administrator\.m2\repository\org\codehaus\janino\commons-compiler\3.0.11\commons-compiler-3.0.11.jar;C:\Users\Administrator\.m2\repository\org\apache\calcite\avatica\avatica-core\1.17.0\avatica-core-1.17.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-scala_2.12\1.13.0\flink-streaming-scala_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-scala_2.12\1.13.0\flink-scala_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-reflect\2.12.7\scala-reflect-2.12.7.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-compiler\2.12.7\scala-compiler-2.12.7.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-xml_2.12\1.0.6\scala-xml_2.12-1.0.6.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-table-common\1.13.0\flink-table-common-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-files\1.13.0\flink-connector-files-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm-7\7.1-13.0\flink-shaded-asm-7-7.1-13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-cep_2.12\1.13.0\flink-cep_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-csv\1.13.0\flink-csv-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka_2.12\1.13.0\flink-connector-kafka_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\2.4.1\kafka-clients-2.4.1.jar;C:\Users\Administrator\.m2\repository\com\github\luben\zstd-jni\1.4.3-1\zstd-jni-1.4.3-1.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-base\1.13.0\flink-connector-base-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\apache\bahir\flink-connector-redis_2.11\1.0\flink-connector-redis_2.11-1.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.2.0\flink-streaming-java_2.11-1.2.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-runtime_2.11\1.2.0\flink-runtime_2.11-1.2.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-hadoop2\1.2.0\flink-shaded-hadoop2-1.2.0.jar;C:\Users\Administrator\.m2\repository\org\tukaani\xz\1.0\xz-1.0.jar;C:\Users\Administrator\.m2\repository\xmlenc\xmlenc\0.52\xmlenc-0.52.jar;C:\Users\Administrator\.m2\repository\commons-codec\commons-codec\1.4\commons-codec-1.4.jar;C:\Users\Administrator\.m2\repository\commons-net\commons-net\3.1\commons-net-3.1.jar;C:\Users\Administrator\.m2\repository\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar;C:\Users\Administrator\.m2\repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;C:\Users\Administrator\.m2\repository\com\sun\jersey\jersey-core\1.9\jersey-core-1.9.jar;C:\Users\Administrator\.m2\repository\commons-el\commons-el\1.0\commons-el-1.0.jar;C:\Users\Administrator\.m2\repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;C:\Users\Administrator\.m2\repository\com\jamesmurty\utils\java-xmlbuilder\0.4\java-xmlbuilder-0.4.jar;C:\Users\Administrator\.m2\repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;C:\Users\Administrator\.m2\repository\commons-configuration\commons-configuration\1.7\commons-configuration-1.7.jar;C:\Users\Administrator\.m2\repository\commons-digester\commons-digester\1.8.1\commons-digester-1.8.1.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-core-asl\1.8.8\jackson-core-asl-1.8.8.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-mapper-asl\1.8.8\jackson-mapper-asl-1.8.8.jar;C:\Users\Administrator\.m2\repository\org\apache\avro\avro\1.7.7\avro-1.7.7.jar;C:\Users\Administrator\.m2\repository\com\thoughtworks\paranamer\paranamer\2.3\paranamer-2.3.jar;C:\Users\Administrator\.m2\repository\com\jcraft\jsch\0.1.42\jsch-0.1.42.jar;C:\Users\Administrator\.m2\repository\commons-beanutils\commons-beanutils-bean-collections\1.8.3\commons-beanutils-bean-collections-1.8.3.jar;C:\Users\Administrator\.m2\repository\commons-daemon\commons-daemon\1.0.13\commons-daemon-1.0.13.jar;C:\Users\Administrator\.m2\repository\javax\xml\bind\jaxb-api\2.2.2\jaxb-api-2.2.2.jar;C:\Users\Administrator\.m2\repository\javax\xml\stream\stax-api\1.0-2\stax-api-1.0-2.jar;C:\Users\Administrator\.m2\repository\javax\activation\activation\1.1\activation-1.1.jar;C:\Users\Administrator\.m2\repository\io\netty\netty-all\4.0.27.Final\netty-all-4.0.27.Final.jar;C:\Users\Administrator\.m2\repository\com\data-artisans\flakka-actor_2.11\2.3-custom\flakka-actor_2.11-2.3-custom.jar;C:\Users\Administrator\.m2\repository\com\data-artisans\flakka-remote_2.11\2.3-custom\flakka-remote_2.11-2.3-custom.jar;C:\Users\Administrator\.m2\repository\io\netty\netty\3.8.0.Final\netty-3.8.0.Final.jar;C:\Users\Administrator\.m2\repository\org\uncommons\maths\uncommons-maths\1.2.2a\uncommons-maths-1.2.2a.jar;C:\Users\Administrator\.m2\repository\com\data-artisans\flakka-slf4j_2.11\2.3-custom\flakka-slf4j_2.11-2.3-custom.jar;C:\Users\Administrator\.m2\repository\org\clapper\grizzled-slf4j_2.11\1.0.2\grizzled-slf4j_2.11-1.0.2.jar;C:\Users\Administrator\.m2\repository\com\github\scopt\scopt_2.11\3.2.0\scopt_2.11-3.2.0.jar;C:\Users\Administrator\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.7.4\jackson-core-2.7.4.jar;C:\Users\Administrator\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.7.4\jackson-databind-2.7.4.jar;C:\Users\Administrator\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.7.0\jackson-annotations-2.7.0.jar;C:\Users\Administrator\.m2\repository\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;C:\Users\Administrator\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\Administrator\.m2\repository\junit\junit\3.8.1\junit-3.8.1.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill_2.11\0.7.4\chill_2.11-0.7.4.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-clients_2.11\1.2.0\flink-clients_2.11-1.2.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-optimizer_2.11\1.2.0\flink-optimizer_2.11-1.2.0.jar;C:\Users\Administrator\.m2\repository\org\apache\sling\org.apache.sling.commons.json\2.0.6\org.apache.sling.commons.json-2.0.6.jar;C:\Users\Administrator\.m2\repository\mysql\mysql-connector-java\8.0.21\mysql-connector-java-8.0.21.jar;C:\Users\Administrator\.m2\repository\com\google\protobuf\protobuf-java\3.11.4\protobuf-java-3.11.4.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-jdbc_2.12\1.13.0\flink-connector-jdbc_2.12-1.13.0.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-log4j12\1.7.30\slf4j-log4j12-1.7.30.jar;C:\Users\Administrator\.m2\repository\log4j\log4j\1.2.17\log4j-1.2.17.jar;C:\Users\Administrator\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.14.0\log4j-to-slf4j-2.14.0.jar;C:\Users\Administrator\.m2\repository\org\apache\logging\log4j\log4j-api\2.14.0\log4j-api-2.14.0.jar;C:\Users\Administrator\.m2\repository\redis\clients\jedis\2.9.0\jedis-2.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-pool2\2.4.2\commons-pool2-2.4.2.jar;C:\Users\Administrator\.m2\repository\com\google\code\gson\gson\2.8.5\gson-2.8.5.jar" day03.AvgByListState
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
元素和为:2,其元素个数为:1其平均值2.0
元素和为:9,其元素个数为:2其平均值4.5
元素和为:14,其元素个数为:3其平均值4.666666666666667
元素和为:23,其元素个数为:4其平均值5.75
元素和为:29,其元素个数为:5其平均值5.8
元素和为:34,其元素个数为:6其平均值5.666666666666667
元素和为:42,其元素个数为:7其平均值6.0
元素和为:50,其元素个数为:8其平均值6.25
元素和为:50,其元素个数为:9其平均值5.555555555555555
元素和为:54,其元素个数为:10其平均值5.4
元素和为:55,其元素个数为:11其平均值5.0
元素和为:58,其元素个数为:12其平均值4.833333333333333
元素和为:64,其元素个数为:13其平均值4.923076923076923
元素和为:69,其元素个数为:14其平均值4.928571428571429
元素和为:77,其元素个数为:15其平均值5.133333333333334
元素和为:81,其元素个数为:16其平均值5.0625
元素和为:89,其元素个数为:17其平均值5.235294117647059
元素和为:98,其元素个数为:18其平均值5.444444444444445
元素和为:101,其元素个数为:19其平均值5.315789473684211
元素和为:102,其元素个数为:20其平均值5.1
元素和为:108,其元素个数为:21其平均值5.142857142857143
元素和为:109,其元素个数为:22其平均值4.954545454545454
元素和为:116,其元素个数为:23其平均值5.043478260869565


文章转载自逗先生大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论