import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
public class OceanBaseSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> oceanBaseSource =
OceanBaseSource.<String>builder()
.configurl("127.0.0.1:2882:2881") // set root server list, OceanBase Enterprise Edition
.startupMode(StartupMode.INITIAL) // set startup mode
.username("user@test_tenant") // set cluster username
.password("pswd") // set cluster password
.tenantName("test_tenant") // set captured tenant name, do not support regex
.databaseName("test_db") // set captured database, support regex
.tableName("test_table") // set captured table, support regex
.hostname("127.0.0.1") // set hostname of OceanBase server or proxy
.port(2881) // set the sql port for OceanBase server or proxy
.logProxyHost("127.0.0.1") // set the hostname of log proxy
.logProxyPort(2983) // set the port of log proxy
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.addSource(oceanBaseSource).print().setParallelism(1);
env.execute("Print OceanBase Snapshot + Commit Log");
}
}
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oceanbase-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.2.1</version>
</dependency>
mysql -h${host} -P${port} -uroot
mysql> SHOW TENANT;
mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}';
mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
public class OceanBaseSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> oceanBaseSource =
OceanBaseSource.<String>builder()
.configurl("127.0.0.1:2882:2881") // set root server list, OceanBase Enterprise Edition
.startupMode(StartupMode.INITIAL) // set startup mode
.username("user@test_tenant") // set cluster username
.password("pswd") // set cluster password
.tenantName("test_tenant") // set captured tenant name, do not support regex
.databaseName("test_db") // set captured database, support regex
.tableName("test_table") // set captured table, support regex
.hostname("127.0.0.1") // set hostname of OceanBase server or proxy
.port(2881) // set the sql port for OceanBase server or proxy
.logProxyHost("127.0.0.1") // set the hostname of log proxy
.logProxyPort(2983) // set the port of log proxy
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.addSource(oceanBaseSource).print().setParallelism(1);
env.execute("Print OceanBase Snapshot + Commit Log");
}
}
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oceanbase-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.2.1</version>
</dependency>
mysql -h${host} -P${port} -uroot
mysql> SHOW TENANT;
mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}';
mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;