1. 下载 docker-compose.yml
version: '2.1'services:mongo:image: "mongo:4.0-xenial"command: --replSet rs0 --smallfiles --oplogSize 128ports:- "27017:27017"environment:- MONGO_INITDB_ROOT_USERNAME=mongouser- MONGO_INITDB_ROOT_PASSWORD=mongopwelasticsearch:image: elastic/elasticsearch:7.6.0environment:- cluster.name=docker-cluster- bootstrap.memory_lock=true- "ES_JAVA_OPTS=-Xms512m -Xmx512m"- discovery.type=single-nodeports:- "9200:9200"- "9300:9300"ulimits:memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536kibana:image: elastic/kibana:7.6.0ports:- "5601:5601"
2. 进入 MongoDB 容器,初始化副本集和数据:
docker-compose exec mongo usr/bin/mongo -u mongouser -p mongopw
// 1. 初始化副本集rs.initiate();rs.status();// 2. 切换数据库use mgdb;// 3. 初始化数据db.orders.insertMany([{order_id: 101,order_date: ISODate("2020-07-30T10:08:22.001Z"),customer_id: 1001,price: NumberDecimal("50.50"),product: {name: 'scooter',description: 'Small 2-wheel scooter'},order_status: false},{order_id: 102,order_date: ISODate("2020-07-30T10:11:09.001Z"),customer_id: 1002,price: NumberDecimal("15.00"),product: {name: 'car battery',description: '12V car battery'},order_status: false},{order_id: 103,order_date: ISODate("2020-07-30T12:00:30.001Z"),customer_id: 1003,price: NumberDecimal("25.25"),product: {name: 'hammer',description: '16oz carpenter hammer'},order_status: false}]);db.customers.insertMany([{customer_id: 1001,name: 'Jark',address: 'Hangzhou'},{customer_id: 1002,name: 'Sally',address: 'Beijing'},{customer_id: 1003,name: 'Edward',address: 'Shanghai'}]);
下载以下 jar 包到 <FLINK_HOME>/lib/:
下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译
•flink-sql-connector-elasticsearch7-1.16.0.jar
•flink-sql-connector-mongodb-cdc-2.4-SNAPSHOT.jar
然后启动 Flink 集群,再启动 SQL CLI.
-- Flink SQL
-- 设置间隔时间为3秒
Flink SQL> SET execution.checkpointing.interval = 3s;
-- 设置本地时区为 Asia/Shanghai
Flink SQL> SET table.local-time-zone = Asia/Shanghai;
Flink SQL> CREATE TABLE orders (_id STRING,order_id INT,order_date TIMESTAMP_LTZ(3),customer_id INT,price DECIMAL(10, 5),product ROW<name STRING, description STRING>,order_status BOOLEAN,PRIMARY KEY (_id) NOT ENFORCED) WITH ('connector' = 'mongodb-cdc','hosts' = 'localhost:27017','username' = 'mongouser','password' = 'mongopw','database' = 'mgdb','collection' = 'orders');Flink SQL> CREATE TABLE customers (_id STRING,customer_id INT,name STRING,address STRING,PRIMARY KEY (_id) NOT ENFORCED) WITH ('connector' = 'mongodb-cdc','hosts' = 'localhost:27017','username' = 'mongouser','password' = 'mongopw','database' = 'mgdb','collection' = 'customers');Flink SQL> CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP_LTZ(3),customer_id INT,price DECIMAL(10, 5),product ROW<name STRING, description STRING>,order_status BOOLEAN,customer_name STRING,customer_address STRING,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'enriched_orders');Flink SQL> INSERT INTO enriched_ordersSELECT o.order_id,o.order_date,o.customer_id,o.price,o.product,o.order_status,c.name,c. addressFROM orders AS oLEFT JOIN customers AS c ON o.customer_id = c.customer_id;
修改 MongoDB 里面的数据,观察 elasticsearch 里的结果。
db.orders.insert({order_id: 104,order_date: ISODate("2020-07-30T12:00:30.001Z"),customer_id: 1004,price: NumberDecimal("25.25"),product: {name: 'rocks',description: 'box of assorted rocks'},order_status: false});db.customers.insert({customer_id: 1004,name: 'Jacob',address: 'Shanghai'});db.orders.updateOne({ order_id: 104 },{ $set: { order_status: true } });db.orders.deleteOne({ order_id : 104 });
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




