暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

迈向主动智能:使用流式数据库构建事件驱动智能体

作者:Mike Wang|RisingWave Software Engineer

基于大语言模型(LLM)开发的智能问答助手,会在用户提问时生成回答,其表现非常出色,已经深度融入我们的日常生活。然而,实现这些”魔法“的前提是,我们需要向 LLM 提供高质量且描述精确的问题。

那么,能不能实现一种无需人类指令即可主动行动的智能助手呢?在现实中,每时每刻都有无数事件发生,主动型智能助手必须具备“眼睛”和“耳朵”,能够实时感知周围发生的一切,才能在事件发生的第一时间采取行动。

1主动智能助手

让我们看看几个关于主动智能助手的例子:

  • 销售额打破月度记录时,给我发邮件。
  • 我快到家时,打开空调。
  • TSLA 股票收益率超过 3% 时,卖出 40% 的股票。

目前,大多数 LLM 都能给你发邮件,或者通过 Home Assistant API 打开空调,甚至卖出 TSLA 股票。但问题在于,这些任务只有在你明确提出要求后才能执行。如果遇到具体的事件,比如“收益率超过 3%”,或者“你即将到家时”,它无法自动触发这些操作。虽然开发人员可以为每种特定情景单独开发功能,但如果想在其他条件满足时卖出股票呢?或者希望 LLM 处理所有事件呢?

这样的处理方式就显得不仅低效,而且成本高昂了,因为每秒钟可能有数千甚至数百万事件发生。实际上,LLM 需要的是一种机制,让它能够创建事件监听器。

2系统架构

如果你想在早上 7 点起床,一般你会设置一个闹钟。当时间到达 7 点时,闹钟会“触发”铃声将你叫醒。这里,事件是“时间”,而需要关注的事件是“时间到达 7 点”。

同样,对于 LLM,它可以设置一个事件监听器,当”需要关注的事件”发生时触发下一步行为。

事件驱动智能助手的工作流程

上图展示了事件驱动智能助手的工作流程。这里使用流式数据库来构建事件驱动智能助手,它能够存储、处理、转换事件,并支持用户查询数据。

3构建事件监听器

首先,用户需要描述要监控的事件,例如,“当有 @risingwave.com 的用户注册了就告诉我。” 在这个例子中,智能助手帮我们管理网站。它可以访问数据,以帮助确定刚刚在网站上注册的人来自哪家公司。这些数据可能是后端的日志,或者是用户表中提交的 INSERT 语句。

假设这个事件的具体形式如下:

{"id""x7cj7Hjis52-H""payload": {"email""peter.shen@example.com"}, "timestamp""2024-11-14T06:00:00"}
{"id""gdgs+52djKFO""payload": {"email""john.doe@example.com"}, "timestamp""2024-11-14T07:00:00"}
{"id""289ghnwNFoiu9dK""payload": {"email""example.user@risingwave.com"}, "timestamp""2024-11-14T08:00:00"}......

所有这些事件会被导入流式数据库。LLM 会在数据库中创建规则以筛选事件。当需要关注的事件发生时,流式数据库会通知事件处理器,事件处理器则会通过包含事件和上下文的提示词调用 LLM。例如:

发送消息给用户:example.user@risingwave.com 刚刚注册了网站。

接着,LLM 使用工具调用发送消息通知用户。以下是 SQL 示例:

SELECT
  '发送消息给用户:' || (payload).email || ' 刚刚注册了网站。' AS prompt
FROM register_events
WHERE created_at > '2024-11-14 20:00:00'
  AND (payload).email LIKE '%@risingwave.com'


这段 SQL 首先通过 created_at > '2024-11-14 20:00:00'
筛选出所有历史事件,因为用户希望从现在开始监控 @risingwave.com 用户的注册情况。接着,它只保留 payload
email
字段以 risingwave.com
结尾的事件,并将这些注册事件转换为触发 LLM 的提示。这就是主动型助手的“眼睛”和“耳朵”。

那么为什么这段 SQL 能够筛选事件并触发 LLM?这段 SQL 用于构建一个物化视图,其完整的 SQL 语句如下:

CREATE MATERIALIZED VIEW event_listener_risingwavecom_register AS
SELECT ... FROM ...

物化视图用于存储查询的最新结果。你可以直接查询这个物化视图,也可以基于它创建新的物化视图,例如:

SELECT * FROM event_listener_risingwavecom_register

在流式数据库中,物化视图会在上游数据发生变化时自动、增量地更新。因此,当 register_events
表插入一条新事件时,流式数据库会按照 SQL 语句中定义的规则处理该事件。如果新插入行的 payload
email
字段以 “@risingwave.com” 结尾,那么物化视图中将插入一个新的提示。而物化视图的更新本身也是一个事件,RisingWave 提供了发布/订阅机制,可以在物化视图更新时通知下游处理。

4为什么选择流式数据库

流式数据库是实现这些“魔法”的核心。它具备我们需要的一切:流处理引擎、数据存储、数据服务以及 SQL 接口。虽然市面上有许多流处理引擎,但拥有 SQL 接口的引擎可以大幅减少开发事件驱动助手所需的时间,而 RisingWave 就是一个绝佳的选择。

SQL 接口至关重要

SQL 是一种描述目标结果的语言,无需手动编写计算过程。数据库会根据 SQL 自动生成优化的计算图。例如,如果想获取销售金额最高的前 10 条交易记录,只需写下:

SELECT * FROM sales_global ORDER BY amount_usd LIMIT 10;


还可以用 MATERIALIZED VIEW
来维护这个查询的最新结果:
CREATE MATERIALIZED VIEW top_10_sales AS
SELECT * FROM sales_global ORDER BY amount_usd LIMIT 10;


每次查询物化视图 SELECT * FROM top_10_sales
都会得到最新的前 10 条销售记录。当上游数据发生变化时,物化视图会自动增量更新。

如果让 LLM 写 Python 代码来实现这个需求,LLM 不仅需要写出正确的计算过程,还需要实现一个分布式的增量更新 Top-K 算法来处理事件。这需要复杂的提示工程,让 LLM 成为流处理和分布式系统的专家。而使用 SQL 时,LLM 只需描述需要的结果。这意味着更少的 LLM 逻辑步骤、更少的提示工程、更少的领域知识需求,从而大幅降低成本。

输入与输出

RisingWave 与 Postgres 兼容,因此可以无缝融入 Postgres 生态系统。你可以通过工具(如 psql、dbt)或编程语言中的 Postgres 客户端库访问它,也可以用作 Grafana 的 Postgres 数据源或 Postgres FDW。此外,RisingWave 支持创建数据源从外部数据存储(如 Kafka、MySQL、Clickhouse、Iceberg 表等)拉取数据,或将数据写入外部存储。在上述案例中,我们将使用插入语句 (INSERT) 进行数据写入,并通过订阅机制通知事件处理器。

Streaming joins

流式联接(Streaming joins)是事件驱动应用中常见的需求,RisingWave 专为流处理设计,其在流式联接场景中表现出色。以下是流式联接的示例:

假设你拥有一家跨国公司,产品在全球范围内销售,而你现在位于北京总部,希望实时监控销售金额(以人民币计算)。数据库中有两张表可用:

  • sales_global:记录所有销售交易。
  • exchange_rate:记录特定时间点的最新汇率。

sales_global 和 exchange_rate 表

由于销售发生在全球范围内,交易可能以任意货币结算。在汇总销售金额之前,需要用当时的最新汇率将交易金额转换为人民币。例如,若要转换 2024-11-05T07:00:00, 10 USD
,需找到 exchange_rates
表中满足以下条件的行:

  • currency_from
    为 USD;
  • currency_to
    为 CNY;
  • ts
    是不超过 2024-11-05T07:00:00
    的最大时间戳。 

对应的 SQL 语句

这样的 Streaming join 对于列存储数据库来说就已经无法高效处理了在实时事件处理场景中,情况可能更加复杂,尤其当你的数据库中有大量维表时,联接表可能不止一个。

5Demo
为了帮助大家更好地理解如何通过流式数据库监控事件并结合 LLM 提供服务,我们构建了一个可运行的 Demo[1], 欢迎点击链接开启主动智能之旅。

参考资料
[1]

运行的 Demo: https://github.com/cloudcarver/event-driven-agent-demo

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了 150+ 名开源贡献者和 3000+ 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。

往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览

用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台

文章转载自RisingWave中文开源社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论