暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

吞吐提升 80%——GreptimeDB Java 客户端 Bulk Stream Insert 写入指南

GreptimeDB 2025-08-20
81

v0.16 更新请参考这篇文章:GreptimeDB v0.16 重磅发布:PromQL 与可观测性功能增强;Rust 客户端的用户请参考这篇文章:吞吐提升 50%—— GreptimeDB Rust 客户端 Bulk Stream Insert 写入指南

p.s. GreptimeDB 已经更新到 v0.16,欢迎各位用户更新到最新版本使用。本篇文章作为 v0.15 功能的介绍在 v0.16 中依然适用。

引言

GreptimeDB 是一款开源、云原生的统一可观测性数据库,专为指标(Metrics)、日志(Logs)和链路追踪(Traces)数据而设计,能够在任何规模下提供从边缘到云端的实时洞察。

为了满足不同场景下的数据写入需求,GreptimeDB 的 Java 客户端提供了两种方式:

  • Regular Insert API:低延迟,适合实时场景;
  • Bulk Stream Insert API:高吞吐,面向批量写入场景。

本文将深入探讨 Bulk API 的设计与使用,并通过测试对比两种写入方式的性能差异。

Java Ingester 简介

GreptimeDB Java Ingester 是一款轻量级、高性能的客户端,专为高效时序数据写入而设计。它基于 gRPC 协议,提供非阻塞、纯异步的 API,具备良好的扩展性与易用性。

架构概览

(图 1:Java Ingester 架构设计)
  • API Layer:提供客户端应用程序与 GreptimeDB 交互的 High Level 接口;
  • Data Model:使用表(Table)和 Schema 定义时序数据的组织结构;
  • Transport Layer:处理通信逻辑、请求分发,并负责 Client 管理;
  • Network Layer:基于 Arrow Flight 和 gRPC 进行底层协议通信。

创建客户端

GreptimeDB Ingester Java 客户端入口为 GreptimeDB
 类。用户调用带有适当配置选项的静态 create
 方法来创建客户端实例,全局使用单例即可:

String database = "public";
String[] endpoints = {"127.0.0.1:4001"};
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database)
        .build();

// Initialize the client
GreptimeDB client = GreptimeDB.create(opts);

写入方式概览

所有写入操作均基于 Table
 抽象构建。使用时需要:

  1. 创建表结构 TableSchema
  2. 根据 Schema 创建 Table
  3. 插入行数据并提交。

⚠️ 注意:

  • Table
     不可复用,需每次重新创建;
  • TableSchema
     可复用,推荐复用以减少开销。
// Create a table schema
TableSchema schema = TableSchema.newBuilder("metrics")
    .addTag("host", DataType.String)
    .addTag("region", DataType.String)
    .addField("cpu_util", DataType.Float64)
    .addField("memory_util", DataType.Float64)
    .addTimestamp("ts", DataType.TimestampMillisecond)
    .build();

// Create a table from the schema
Table table = Table.from(schema);

// Add rows to the table
// The values must be provided in the same order as defined in the schema
// In this case: addRow(host, region, cpu_util, memory_util, ts)
table.addRow("host1""us-west-1"0.420.78, System.currentTimeMillis());
table.addRow("host2""us-west-2"0.460.66, System.currentTimeMillis());
// Add more rows
// ..

// Complete the table to make it immutable. This finalizes the table for writing.
table.complete();

常规 Insert API

适用场景: 实时应用、IoT 传感器和交互式系统。

性能特点

  • 延迟:亚毫秒级;
  • 吞吐量:1k~10k 行/秒;
  • 网络模式:一请求一响应(Request-Response);
  • 内存占用:内存低且恒定,高并发/高吞吐可能会导致内存积压。

代码示例为:

// Add rows to the table
for (int row = 0; row < 100; row++) {
    Object[] rowData = generateRow(batch, row);
    table.addRow(rowData);
}

// Write the table to the database
CompletableFuture<Result<WriteOk, Err>> future = client.write(table);

高吞吐 Bulk Stream Insert API

Bulk API 提供了一种高性能、内存高效的大规模数据写入机制,利用 Java 堆外内存管理,在批量写入数据时能够实现最佳吞吐量。

适用场景: ETL 操作、数据迁移、批处理和日志摄取。

性能特征

  • 延迟:100~10,000毫秒(批处理);
  • 吞吐量:>10k 行/秒;
  • 网络模式:并行请求,流式传输,一张表独占一个流;
  • 内存占用:稳定,具备反压机制。

核心优势:

  • 并行处理:支持多个请求同时进行,提升整体吞吐;
  • 流式传输:基于 Apache Arrow Flight 流式协议;
  • 压缩传输:支持 Zstd 压缩算法;
  • 异步提交:非阻塞式请求提交,支持反压。

使用 Bulk Insert API 的典型模式

// Create a BulkStreamWriter with the table schema
try (BulkStreamWriter writer = greptimeDB.bulkStreamWriter(schema)) {
    // Write multiple batches
    for (int batch = 0; batch < batchCount; batch++) {
        // Get a TableBufferRoot for this batch
        Table.TableBufferRoot table = writer.tableBufferRoot(1000); // column buffer size

        // Add rows to the batch
        for (int row = 0; row < rowsPerBatch; row++) {
            Object[] rowData = generateRow(batch, row);
            table.addRow(rowData);
        }

        // Complete the table to prepare for transmission
        table.complete();

        // Send the batch and get a future for completion
        CompletableFuture<Integer> future = writer.writeNext();

        // Wait for the batch to be processed (optional)
        Integer affectedRows = future.get();

        System.out.println("Batch " + batch + " wrote " + affectedRows + " rows");
    }

    // Signal completion of the stream
    writer.completed();
}

Bulk API 深入解析

1. 表创建要求

  • 不会自动创建表,须提前使用 SQL DDL 创建,不支持自动的 Schema 变更;
  • 不支持主键列(Tag),每行数据须包含所有列。

目前 Bulk API 处于比较早期的阶段,这些限制会在未来的版本更新中逐步完善。

2. 配置选项

Bulk API 可以通过多种选项配置来优化性能:

BulkWrite.Config cfg = BulkWrite.Config.newBuilder()
        .allocatorInitReservation(64 * 1024 * 1024L// 自定义内存分配:初始预留 64MB
        .allocatorMaxAllocation(4 * 1024 * 1024 * 1024L// 自定义内存分配:最大分配 4GB
        .timeoutMsPerMessage(60 * 1000// 每个请求超时时间为 60 秒(侧重吞吐,建议适当容忍延迟)
        .maxRequestsInFlight(10// 并发控制:配置最多允许 10 个并发(在途)请求
        .build();
// 启用 Zstd 压缩
Context ctx = Context.newDefault().withCompression(Compression.Zstd);

BulkStreamWriter writer = greptimeDB.bulkStreamWriter(schema, cfg, ctx);

3. 性能调优建议

  • 压缩选项:建议网络 IO 瓶颈时启用 Zstd 压缩;
  • 并发度配置
// 假设单实例单表写入,CPU cores = 4

// 网络绑定型 (Network-Bound):Bulk Stream Insert 主要等待网络传输
BulkWrite.Config network_bound_options = BulkWrite.Config.newBuilder()
    .maxRequestsInFlight(8)  // 推荐 8-16,充分利用网络带宽
    .build();

// CPU 密集型 (CPU-Intensive):如果数据写入前需要大量计算处理
BulkWrite.Config network_bound_options = BulkWrite.Config.newBuilder()
    .maxRequestsInFlight(8)  // 推荐 CPU 核心数
    .build();

// 混合负载:根据实际瓶颈调整
BulkWrite.Config network_bound_options = BulkWrite.Config.newBuilder()
    .maxRequestsInFlight(8)  // 在网络和 CPU 之间平衡
    .build();

  • 批次大小:Ingester 的 Table
     本质上是一个 buffer
    。小批次适合低延迟场景(实时性要求较高但数据量较小);大批次适合高吞吐场景,但是会加大延迟。

性能对比实验

为了更好地说明 Bulk API 的使用场景,我们在 greptimedb-ingester-java
 仓库中构建了一个简单的日志场景测试工具。

日志场景测试工具:https://github.com/GreptimeTeam/greptimedb-ingester-java/tree/main/ingester-example/src/main/java/io/greptime/bench

该工具提供了 TableDataProvider
,一个为性能测试设计的高性能日志数据生成器,能够生成包含 15 个字段的合成日志数据,模拟真实分布式系统的日志场景,表结构如下👇

表结构

CREATE TABLEIFNOTEXISTS`my_bench_table` (
`log_ts`TIMESTAMP(3NOTNULL,
`business_name`STRINGNULL,
`app_name`STRINGNULL,
`host_name`STRINGNULL,
`log_message`STRINGNULL,
`log_level`STRINGNULL,
`log_name`STRINGNULL,
`uri`STRINGNULL,
`trace_id`STRINGNULL,
`span_id`STRINGNULL,
`errno`STRINGNULL,
`trace_flags`STRINGNULL,
`trace_state`STRINGNULL,
`pod_name`STRINGNULL,
TIMEINDEX (`log_ts`)
)

ENGINE=mito

数据特点

关键特性:大型 log_message
 字段

  • 目标长度:2,000 字符;
  • 内容生成:基于模板的系统,根据日志级别生成不同类型的消息;

字段基数/维度分布

  • 高基数字段(近乎唯一)
    • trace_id
      span_id
      :使用 64 位随机数生成 ;
    • log_ts
      :基于毫秒时间戳。 低基数等字段不在这里一一列举。

运行 Benchmark 测试

  1. 启动 GreptimeDB;
  2. 建表;
  3. 按照顺序启动 Bulk &Regular API Benchmark;
  4. 启动命令:
# Bulk API Benchmark
运行 io.greptime.bench.benchmark.BatchingWriteBenchmark

# Regular API Benchmark
运行 io.greptime.bench.benchmark.BatchingWriteBenchmark

  1. 我的本地测试结果如下:
API 类型
吞吐量
总耗时
提升幅度
Bulk API
180,962 rows/s
27.630 s
+83%
Regular API
98,868 rows/s
50.572 s
基准线

结果表明:

  • Bulk API 吞吐量提升约 80%+;
  • Regular API 适合小规模、低延迟场景;
  • Bulk API 适合高吞吐、大批量写入场景。

Bulk API 结果

 - === Running Bulk API Log Data Benchmark ===
 - Setting up bulk writer...
 - Starting bulk API benchmark: RandomTableDataProvider
 - Table: my_bench_table (14 columns)
 - Target rows: 5000000
 - Batch size: 65536
 - Parallelism: 4
// 省略无关日志...
 - → Batch 165536 rows processed (47012 rows/sec)
 - → Batch 2: 131072 rows processed(75199 rows/sec)
// 省略部分过程日志 ...
 - → Batch 73: 4784128 rows processed(180744 rows/sec)
 - → Batch 74: 4849664 rows processed(180748 rows/sec)
 - → Batch 75: 4915200 rows processed(180699 rows/sec)
 - Completing bulk write operation, signaling end of transmission
 - Waiting for server to complete processing
 - → Batch 76: 4980736 rows processed(180972 rows/sec)
 - → Batch 77: 5000000 rows processed(181015 rows/sec)

 - Finishing bulk writer and waiting for all responses...
 - All bulk writes completed successfully
 - Cleaning up data provider...
 - Bulk API benchmark completed successfully!
 - 
=== Benchmark Result ===
 - Table: my_bench_table
 - 
 - Provider                          Rows Duration(ms)      Throughput     Status
 - --------------------------------------------------------------------------
 - RandomTableDataProvider         5000000        27630       180962 r/s    SUCCESS

Regular API 结果

 - === Running Batching API Log Data Benchmark ===
 - Setting up batching writer...
 - Starting batching API benchmark: RandomTableDataProvider
 - Table: my_bench_table (14 columns)
 - Target rows: 5000000
 - Batch size: 65536
 - Concurrency: 4
// 省略无关日志 ...
 - → Batch 165536 rows processed (17415 rows/sec)
 - → Batch 2: 131072 rows processed(32031 rows/sec)
// 省略部分过程日志 ...
 - → Batch 73: 4784128 rows processed(98586 rows/sec)
 - → Batch 74: 4849664 rows processed(98638 rows/sec)
 - → Batch 75: 4915200 rows processed(98776 rows/sec)
 - → Batch 76: 4934464 rows processed(97574 rows/sec)
 - Finishing batching writer and waiting for all responses...
 - → Batch 77: 5000000 rows processed(98868 rows/sec)
 - All batching writes completed successfully
 - Cleaning up data provider...
 - Batching API benchmark completed successfully!
 - 
=== Benchmark Result ===
 - Table: my_bench_table
 - 
 - Provider                          Rows Duration(ms)      Throughput     Status
 - --------------------------------------------------------------------------
 - RandomTableDataProvider         5000000        50572        98868 r/s    SUCCESS

API 使用选择指南

使用场景(特征)
推荐 API
理由
实时监控告警
Regular API
低延迟,立即响应
IoT 传感器数据
Regular API
数据量小,实时性强
交互式仪表板
Regular API
即时反馈
ETL 数据管道
Bulk API
数据量大,可容忍延迟
日志收集系统
Bulk API
高吞吐量,批处理
历史数据迁移
Bulk API
一次性大量数据操作

总结

GreptimeDB 提供两种互补的写入方式:

  • Regular API:低延迟,适合实时性要求高的小规模写入;
  • Bulk API:高吞吐,适合大规模批量导入和日志处理。

开发者可根据业务需求灵活选择或组合使用,从而在不同场景下兼顾系统的可扩展性与高性能目标。


关于 Greptime

Greptime 格睿科技专注于打造新一代可观测数据库,服务开发者与企业用户,覆盖从从边缘设备到云端企业级部署的多样化需求。

  • GreptimeDB 开源版:开源、云原生,统一处理指标、日志和追踪数据,适合中小规模 IoT,个人项目与可观测性场景;
  • GreptimeDB 企业版:面向关键业务,提供更高性能、高安全性、高可用性和智能化运维服务;
  • GreptimeCloud 云服务:全托管云服务,零运维体验“企业级”可观测数据库,弹性扩展,按需付费。

欢迎加入开源社区参与贡献与交流!推荐从带有 good first issue
 标签的任务入手,一起共建可观测未来。


⭐ Star us on GitHub:https://github.com/GreptimeTeam/greptimedb

📚 官网:https://greptime.cn/

📖 文档:https://docs.greptime.cn/

🌍 Twitter:https://twitter.com/Greptime

💬 Slack:https://greptime.com/slack

💼 LinkedIn:https://www.linkedin.com/company/greptime/


往期精彩文章:

点击「阅读原文」,立即体验 GreptimeDB!

文章转载自GreptimeDB,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论