暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink 写入数据到 Kafka

TechStyle 2021-08-16
753

Kafka Setup

wget https://downloads.apache.org/kafka/2.8.0/kafka_2.12-2.8.0.tgz


tar zxvf kafka_2.12-2.8.0.tgz -C ~/app


cd ~/app/kafka_2.12-2.8.0


# start the zookeeper service
bin/zookeeper-server-start.sh config/zookeeper.properties


# start the Kafka broker service
bin/kafka-server-start.sh config/server.properties


# create topic
bin/kafka-topics.sh --create --topic flink_topic --bootstrap-server localhost:9092


# describle message
bin/kafka-topics.sh --describe --topic flink_topic --bootstrap-server localhost:9092


# produce message
bin/kafka-console-producer.sh --topic flink_topic --bootstrap-server localhost:9092


# consumer message
bin/kafka-console-consumer.sh --topic flink_topic --from-beginning --bootstrap-server localhost:9092


Maven Dependency

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>


<groupId>org.fool</groupId>
<artifactId>flink</artifactId>
<version>1.0-SNAPSHOT</version>


<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>


<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.5</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.5</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.5</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.12.5</version>
</dependency>


<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>


</dependencies>


</project>


SRC

src/main/java/org/fool/flink/contract/Sensor.java

package org.fool.flink.contract;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


@Data
@NoArgsConstructor
@AllArgsConstructor
public class Sensor {
private String id;
private Long timestamp;
private Double temperature;
}


src/main/java/org/fool/flink/sink/SinkKafkaTest.java

package org.fool.flink.sink;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.fool.flink.contract.Sensor;


import java.util.Objects;
import java.util.Properties;


public class SinkKafkaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);


Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");


String inputPath = Objects.requireNonNull(ClassLoader.getSystemClassLoader().getResource("sensor.txt")).getPath();


DataStream<String> inputStream = environment.readTextFile(inputPath);


DataStream<String> dataStream = inputStream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Sensor(fields[0], new Long(fields[1]), new Double(fields[2])).toString();
}
});


dataStream.addSink(new FlinkKafkaProducer<>("flink_topic", new SimpleStringSchema(), properties));


environment.execute();
}
}


src/main/resources/sensor.txt

1,1628754086,35.8
2,1628754096,36.8
3,1628754106,37.8
4,1628754116,38.8
1,1628754186,36.6
2,1628754296,36.6
3,1628754306,37.6
4,1628754416,38.6
1,1628754986,25.8
1,1628754086,39.6
2,1628754996,26.8
3,1628754906,27.8
4,1628754916,28.8


Run

Console Output

Kafka Consumer





泰克风格 只讲干货 不弄玄虚


文章转载自TechStyle,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论