暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何通过 OceanBase CDC连接器快速查询数据?

SQL学习者 2023-08-01
471

特性

NO.1 At-Least-Once 处理

OceanBase CDC 连接器是一个 Flink Source 连接器。它将首先读取数据库快照,然后再读取变化事件,并进行 At-Least-Once 处理。

OceanBase 数据库是一个分布式数据库,它的日志也分散在不同的服务器上。由于没有类似 MySQL binlog 偏移量的位置信息,OceanBase 数据库用时间戳作为位置标记。为确保读取完整的数据,liboblog(读取 OceanBase 日志记录的 C++ 库)可能会在给定的时间戳之前读取一些日志数据。因此,OceanBase 数据库可能会读到起始点附近时间戳的重复数据,从而保证了 At-Least-Once 处理。

NO.2 启动模式

配置选项 scan.startup.mode 指定 OceanBase CDC 连接器的启动模式。可用取值包括:

  • initial(默认):在首次启动时对受监视的数据库表执行初始快照,并继续读取最新的事务日志。
  • latest-offset:首次启动时,不对受监视的数据库表执行快照,仅从连接器启动时读取事务日志。
  • timestamp:在首次启动时不对受监视的数据库表执行初始快照,仅从指定的 scan.startup.timestamp 读取事务日志。

NO.3 消费事务日志

OceanBase CDC 连接器使用 oblogclient 消费 oblogproxy 中的事务日志。

NO.4 DataStream Source

OceanBase CDC 连接器也可以作为 DataStream Source 使用。您可以创建一个 SourceFunction,例如:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

public class OceanBaseSourceExample {

  public static void main(String[] args) throws Exception {
    SourceFunction<String> oceanBaseSource =
        OceanBaseSource.<String>builder()
            .configurl("127.0.0.1:2882:2881")  // set root server list, OceanBase Enterprise Edition
            .startupMode(StartupMode.INITIAL) // set startup mode
            .username("user@test_tenant")  // set cluster username
            .password("pswd")  // set cluster password
            .tenantName("test_tenant")  // set captured tenant name, do not support regex
            .databaseName("test_db")  // set captured database, support regex
            .tableName("test_table")  // set captured table, support regex
            .hostname("127.0.0.1")  // set hostname of OceanBase server or proxy
            .port(2881)  // set the sql port for OceanBase server or proxy
            .logProxyHost("127.0.0.1")  // set the hostname of log proxy
            .logProxyPort(2983)  // set the port of log proxy
            .deserializer(new JsonDebeziumDeserializationSchema())  // converts SourceRecord to JSON String
            .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // enable checkpoint
    env.enableCheckpointing(3000);

    env.addSource(oceanBaseSource).print().setParallelism(1);

    env.execute("Print OceanBase Snapshot + Commit Log");
  }
}

数据类型映射

当启动模式不是 INITIAL 时,连接器无法获得一个列的精度和比例。为兼容不同的启动模式,连接器不会将一个不同精度的 OceanBase 类型映射到不同的FLink 类型。例如,BOOLEAN、TINYINT(1) 或 BIT(1) 均会转换成 BOOLEAN。在 OceanBase 数据库中,BOOLEAN 等同于 TINYINT(1),所以 BOOLEAN 和 TINYINT 类型的列在 Flink 中会被映射为 TINYINT,而 BIT(1) 在 Flink 中会被映射为 BINARY(1)。

体验 Flink OceanBase CDC 连接器

NO.1 前提条件

在迁移 OceanBase 的数据之前,您需要确认以下信息:

  • 您已安装 OceanBase 数据库。更多信息,参考 OceanBase 数据库文档。
  • 您已安装 Elasticsearch。更多信息,参考 Elasticsearch 文档。
  • 您已安装 OceanBase 增量日志拉取组件 oblogproxy。更多信息,参考 oblogproxy 文档(社区版) 或 oblogproxy 文档(企业版)。
  • 您已安装 Flink。更多信息,参考 Flink 文档。

OceanBase CDC 连接器支持从 OceanBase 数据库读取快照数据和增量数据。本节介绍如何设置 OceanBase CDC 连接器,以在 OceanBase 数据库中查询数据。

NO.2 依赖

要设置 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和使用 SQL JAR 包捆绑的 SQL 客户端。

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-oceanbase-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
  <version>2.2.1</version>
</dependency>

NO.3 下载 SQL 客户端 JAR 包

点击 flink-sql-connector-oceanbase-cdc-2.2.1.jar

下载 JAR 包至 <FLINK_HOME>/lib/.

说明:下载链接仅适用于稳定发行版本。

flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT 快照版本与开发分支的版本对应。要使用快照版本,您必须自行下载并编译源代码。推荐使用稳定发行版本,例如 flink-sql-connector-oceanbase-cdc-2.2.1.jar。您可以在 Maven 中央仓库中找到使用稳定发行版本。

NO.4 配置 OceanBase 数据库和 oblogproxy

1. 按照部署文档配置 OceanBase 集群。

2.在 sys 租户中,为 oblogproxy 创建用户。更多信息,参考 用户管理文档。

mysql -h${host} -P${port} -uroot
mysql> SHOW TENANT;
mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}';
mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;

3. 为你想要监控的租户创建一个用户,这个用户用来读取快照数据和变化事件数据。

4. 获取 rootservice_list 或 config-url 的值。

如果您是社区版用户,使用以下命令:

mysql> SHOW PARAMETERS LIKE 'rootservice_list';

如果您是企业版用户,使用以下命令:

mysql> SHOW PARAMETERS LIKE 'obconfig_url';

5. 按照 oblogproxy 文档 配置 oblogproxy。

NO.5 创建 OceanBase CDC 表

使用以下命令,创建 OceanBase CDC 表:

-- 每 3000 毫秒检查一次                   
Flink SQL> SET 'execution.checkpointing.interval' = '3s';

-- 在 Flink SQL 中创建 OceanBase 表 `orders`
Flink SQL> CREATE TABLE orders (
    order_id     INT,
    order_date   TIMESTAMP(0),
    customer_name STRING,
    price        DECIMAL(10, 5),
    product_id   INT,
    order_status BOOLEAN,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'oceanbase-cdc',
    'scan.startup.mode' = 'initial',
    'username' = 'user@test_tenant',
    'password' = 'pswd',
    'tenant-name' = 'test_tenant',
    'database-name' = 'test_db',
    'table-name' = 'orders',
    'hostname' = '127.0.0.1',
    'port' = '2881',
    'rootserver-list' = '127.0.0.1:2882:2881',
    'logproxy.host' = '127.0.0.1',
    'logproxy.port' = '2983');

-- 从表 orders 中读取快照数据和 binlogs
Flink SQL> SELECT * FROM orders;               

您也可以访问 Flink 官网文档,快速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息,参考 Flink 官网文档。

Flink OceanBase CDC 连接器配置项

支持的元数据

在创建表时,您可以使用以下格式的元数据作为只读列(VIRTUAL)。

如下 SQL 展示了如何在表中使用这些元数据列:


特性

NO.1 At-Least-Once 处理

OceanBase CDC 连接器是一个 Flink Source 连接器。它将首先读取数据库快照,然后再读取变化事件,并进行 At-Least-Once 处理。

OceanBase 数据库是一个分布式数据库,它的日志也分散在不同的服务器上。由于没有类似 MySQL binlog 偏移量的位置信息,OceanBase 数据库用时间戳作为位置标记。为确保读取完整的数据,liboblog(读取 OceanBase 日志记录的 C++ 库)可能会在给定的时间戳之前读取一些日志数据。因此,OceanBase 数据库可能会读到起始点附近时间戳的重复数据,从而保证了 At-Least-Once 处理。

NO.2 启动模式

配置选项 scan.startup.mode 指定 OceanBase CDC 连接器的启动模式。可用取值包括:

  • initial(默认):在首次启动时对受监视的数据库表执行初始快照,并继续读取最新的事务日志。
  • latest-offset:首次启动时,不对受监视的数据库表执行快照,仅从连接器启动时读取事务日志。
  • timestamp:在首次启动时不对受监视的数据库表执行初始快照,仅从指定的 scan.startup.timestamp 读取事务日志。

NO.3 消费事务日志

OceanBase CDC 连接器使用 oblogclient 消费 oblogproxy 中的事务日志。

NO.4 DataStream Source

OceanBase CDC 连接器也可以作为 DataStream Source 使用。您可以创建一个 SourceFunction,例如:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

public class OceanBaseSourceExample {

  public static void main(String[] args) throws Exception {
    SourceFunction<String> oceanBaseSource =
        OceanBaseSource.<String>builder()
            .configurl("127.0.0.1:2882:2881")  // set root server list, OceanBase Enterprise Edition
            .startupMode(StartupMode.INITIAL) // set startup mode
            .username("user@test_tenant")  // set cluster username
            .password("pswd")  // set cluster password
            .tenantName("test_tenant")  // set captured tenant name, do not support regex
            .databaseName("test_db")  // set captured database, support regex
            .tableName("test_table")  // set captured table, support regex
            .hostname("127.0.0.1")  // set hostname of OceanBase server or proxy
            .port(2881)  // set the sql port for OceanBase server or proxy
            .logProxyHost("127.0.0.1")  // set the hostname of log proxy
            .logProxyPort(2983)  // set the port of log proxy
            .deserializer(new JsonDebeziumDeserializationSchema())  // converts SourceRecord to JSON String
            .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // enable checkpoint
    env.enableCheckpointing(3000);

    env.addSource(oceanBaseSource).print().setParallelism(1);

    env.execute("Print OceanBase Snapshot + Commit Log");
  }
}

数据类型映射

当启动模式不是 INITIAL 时,连接器无法获得一个列的精度和比例。为兼容不同的启动模式,连接器不会将一个不同精度的 OceanBase 类型映射到不同的FLink 类型。例如,BOOLEAN、TINYINT(1) 或 BIT(1) 均会转换成 BOOLEAN。在 OceanBase 数据库中,BOOLEAN 等同于 TINYINT(1),所以 BOOLEAN 和 TINYINT 类型的列在 Flink 中会被映射为 TINYINT,而 BIT(1) 在 Flink 中会被映射为 BINARY(1)。

体验 Flink OceanBase CDC 连接器

NO.1 前提条件

在迁移 OceanBase 的数据之前,您需要确认以下信息:

  • 您已安装 OceanBase 数据库。更多信息,参考 OceanBase 数据库文档。
  • 您已安装 Elasticsearch。更多信息,参考 Elasticsearch 文档。
  • 您已安装 OceanBase 增量日志拉取组件 oblogproxy。更多信息,参考 oblogproxy 文档(社区版) 或 oblogproxy 文档(企业版)。
  • 您已安装 Flink。更多信息,参考 Flink 文档。

OceanBase CDC 连接器支持从 OceanBase 数据库读取快照数据和增量数据。本节介绍如何设置 OceanBase CDC 连接器,以在 OceanBase 数据库中查询数据。

NO.2 依赖

要设置 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和使用 SQL JAR 包捆绑的 SQL 客户端。

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-oceanbase-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
  <version>2.2.1</version>
</dependency>

NO.3 下载 SQL 客户端 JAR 包

点击 flink-sql-connector-oceanbase-cdc-2.2.1.jar

下载 JAR 包至 <FLINK_HOME>/lib/.

说明:下载链接仅适用于稳定发行版本。

flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT 快照版本与开发分支的版本对应。要使用快照版本,您必须自行下载并编译源代码。推荐使用稳定发行版本,例如 flink-sql-connector-oceanbase-cdc-2.2.1.jar。您可以在 Maven 中央仓库中找到使用稳定发行版本。

NO.4 配置 OceanBase 数据库和 oblogproxy

1. 按照部署文档配置 OceanBase 集群。

2.在 sys 租户中,为 oblogproxy 创建用户。更多信息,参考 用户管理文档。

mysql -h${host} -P${port} -uroot
mysql> SHOW TENANT;
mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}';
mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;

3. 为你想要监控的租户创建一个用户,这个用户用来读取快照数据和变化事件数据。

4. 获取 rootservice_list 或 config-url 的值。

如果您是社区版用户,使用以下命令:

mysql> SHOW PARAMETERS LIKE 'rootservice_list';

如果您是企业版用户,使用以下命令:

mysql> SHOW PARAMETERS LIKE 'obconfig_url';

5. 按照 oblogproxy 文档 配置 oblogproxy。

NO.5 创建 OceanBase CDC 表

使用以下命令,创建 OceanBase CDC 表:

-- 每 3000 毫秒检查一次                   
Flink SQL> SET 'execution.checkpointing.interval' = '3s';

-- 在 Flink SQL 中创建 OceanBase 表 `orders`
Flink SQL> CREATE TABLE orders (
    order_id     INT,
    order_date   TIMESTAMP(0),
    customer_name STRING,
    price        DECIMAL(10, 5),
    product_id   INT,
    order_status BOOLEAN,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'oceanbase-cdc',
    'scan.startup.mode' = 'initial',
    'username' = 'user@test_tenant',
    'password' = 'pswd',
    'tenant-name' = 'test_tenant',
    'database-name' = 'test_db',
    'table-name' = 'orders',
    'hostname' = '127.0.0.1',
    'port' = '2881',
    'rootserver-list' = '127.0.0.1:2882:2881',
    'logproxy.host' = '127.0.0.1',
    'logproxy.port' = '2983');

-- 从表 orders 中读取快照数据和 binlogs
Flink SQL> SELECT * FROM orders;               

您也可以访问 Flink 官网文档,快速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息,参考 Flink 官网文档。

Flink OceanBase CDC 连接器配置项

支持的元数据

在创建表时,您可以使用以下格式的元数据作为只读列(VIRTUAL)。

如下 SQL 展示了如何在表中使用这些元数据列:

CREATE TABLE products (
    tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
   'connector' = 'oceanbase-cdc',
   'scan.startup.mode' = 'initial',
   'username' = 'user@test_tenant',
   'password' = 'pswd',
   'tenant-name' = 'test_tenant',
   'database-name' = 'test_db',
   'table-name' = 'orders',
   'hostname' = '127.0.0.1',
   'port' = '2881',
   'rootserver-list' = '127.0.0.1:2882:2881',
   'logproxy.host' = '127.0.0.1',
   'logproxy.port' = '2983');
CREATE TABLE products ( tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL, db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'oceanbase-cdc', 'scan.startup.mode' = 'initial', 'username' = 'user@test_tenant', 'password' = 'pswd', 'tenant-name' = 'test_tenant', 'database-name' = 'test_db', 'table-name' = 'orders', 'hostname' = '127.0.0.1', 'port' = '2881', 'rootserver-list' = '127.0.0.1:2882:2881', 'logproxy.host' = '127.0.0.1', 'logproxy.port' = '2983');
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论