Kafka Setup
wget https://downloads.apache.org/kafka/2.8.0/kafka_2.12-2.8.0.tgztar zxvf kafka_2.12-2.8.0.tgz -C ~/appcd ~/app/kafka_2.12-2.8.0# start the zookeeper servicebin/zookeeper-server-start.sh config/zookeeper.properties# start the Kafka broker servicebin/kafka-server-start.sh config/server.properties# create topicbin/kafka-topics.sh --create --topic flink_topic --bootstrap-server localhost:9092# describle messagebin/kafka-topics.sh --describe --topic flink_topic --bootstrap-server localhost:9092# produce messagebin/kafka-console-producer.sh --topic flink_topic --bootstrap-server localhost:9092# consumer messagebin/kafka-console-consumer.sh --topic flink_topic --from-beginning --bootstrap-server localhost:9092
Maven Dependency
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.fool</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.12.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version></dependency></dependencies></project>
SRC
src/main/java/org/fool/flink/contract/Sensor.java
package org.fool.flink.contract;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data@NoArgsConstructor@AllArgsConstructorpublic class Sensor {private String id;private Long timestamp;private Double temperature;}
src/main/java/org/fool/flink/sink/SinkKafkaTest.java
package org.fool.flink.sink;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.fool.flink.contract.Sensor;import java.util.Objects;import java.util.Properties;public class SinkKafkaTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setParallelism(1);Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");String inputPath = Objects.requireNonNull(ClassLoader.getSystemClassLoader().getResource("sensor.txt")).getPath();DataStream<String> inputStream = environment.readTextFile(inputPath);DataStream<String> dataStream = inputStream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {String[] fields = value.split(",");return new Sensor(fields[0], new Long(fields[1]), new Double(fields[2])).toString();}});dataStream.addSink(new FlinkKafkaProducer<>("flink_topic", new SimpleStringSchema(), properties));environment.execute();}}
src/main/resources/sensor.txt
1,1628754086,35.82,1628754096,36.83,1628754106,37.84,1628754116,38.81,1628754186,36.62,1628754296,36.63,1628754306,37.64,1628754416,38.61,1628754986,25.81,1628754086,39.62,1628754996,26.83,1628754906,27.84,1628754916,28.8
Run
Console Output

Kafka Consumer

泰克风格 只讲干货 不弄玄虚
文章转载自TechStyle,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




