作者:Mike Wang|RisingWave Software Engineer
基于大语言模型(LLM)开发的智能问答助手,会在用户提问时生成回答,其表现非常出色,已经深度融入我们的日常生活。然而,实现这些”魔法“的前提是,我们需要向 LLM 提供高质量且描述精确的问题。
那么,能不能实现一种无需人类指令即可主动行动的智能助手呢?在现实中,每时每刻都有无数事件发生,主动型智能助手必须具备“眼睛”和“耳朵”,能够实时感知周围发生的一切,才能在事件发生的第一时间采取行动。
1主动智能助手
让我们看看几个关于主动智能助手的例子:
销售额打破月度记录时,给我发邮件。 我快到家时,打开空调。 TSLA 股票收益率超过 3% 时,卖出 40% 的股票。
目前,大多数 LLM 都能给你发邮件,或者通过 Home Assistant API 打开空调,甚至卖出 TSLA 股票。但问题在于,这些任务只有在你明确提出要求后才能执行。如果遇到具体的事件,比如“收益率超过 3%”,或者“你即将到家时”,它无法自动触发这些操作。虽然开发人员可以为每种特定情景单独开发功能,但如果想在其他条件满足时卖出股票呢?或者希望 LLM 处理所有事件呢?
这样的处理方式就显得不仅低效,而且成本高昂了,因为每秒钟可能有数千甚至数百万事件发生。实际上,LLM 需要的是一种机制,让它能够创建事件监听器。
2系统架构
如果你想在早上 7 点起床,一般你会设置一个闹钟。当时间到达 7 点时,闹钟会“触发”铃声将你叫醒。这里,事件是“时间”,而需要关注的事件是“时间到达 7 点”。
同样,对于 LLM,它可以设置一个事件监听器,当”需要关注的事件”发生时触发下一步行为。

事件驱动智能助手的工作流程
上图展示了事件驱动智能助手的工作流程。这里使用流式数据库来构建事件驱动智能助手,它能够存储、处理、转换事件,并支持用户查询数据。
3构建事件监听器
首先,用户需要描述要监控的事件,例如,“当有 @risingwave.com 的用户注册了就告诉我。” 在这个例子中,智能助手帮我们管理网站。它可以访问数据,以帮助确定刚刚在网站上注册的人来自哪家公司。这些数据可能是后端的日志,或者是用户表中提交的 INSERT 语句。
假设这个事件的具体形式如下:
{"id": "x7cj7Hjis52-H", "payload": {"email": "peter.shen@example.com"}, "timestamp": "2024-11-14T06:00:00"}
{"id": "gdgs+52djKFO", "payload": {"email": "john.doe@example.com"}, "timestamp": "2024-11-14T07:00:00"}
{"id": "289ghnwNFoiu9dK", "payload": {"email": "example.user@risingwave.com"}, "timestamp": "2024-11-14T08:00:00"}......
所有这些事件会被导入流式数据库。LLM 会在数据库中创建规则以筛选事件。当需要关注的事件发生时,流式数据库会通知事件处理器,事件处理器则会通过包含事件和上下文的提示词调用 LLM。例如:
发送消息给用户:example.user@risingwave.com 刚刚注册了网站。
接着,LLM 使用工具调用发送消息通知用户。以下是 SQL 示例:
SELECT
'发送消息给用户:' || (payload).email || ' 刚刚注册了网站。' AS prompt
FROM register_events
WHERE created_at > '2024-11-14 20:00:00'
AND (payload).email LIKE '%@risingwave.com'
这段 SQL 首先通过 created_at > '2024-11-14 20:00:00'
筛选出所有历史事件,因为用户希望从现在开始监控 @risingwave.com 用户的注册情况。接着,它只保留 payload
中 email
字段以 risingwave.com
结尾的事件,并将这些注册事件转换为触发 LLM 的提示。这就是主动型助手的“眼睛”和“耳朵”。
那么为什么这段 SQL 能够筛选事件并触发 LLM?这段 SQL 用于构建一个物化视图,其完整的 SQL 语句如下:
CREATE MATERIALIZED VIEW event_listener_risingwavecom_register AS
SELECT ... FROM ...
物化视图用于存储查询的最新结果。你可以直接查询这个物化视图,也可以基于它创建新的物化视图,例如:
SELECT * FROM event_listener_risingwavecom_register
在流式数据库中,物化视图会在上游数据发生变化时自动、增量地更新。因此,当 register_events
表插入一条新事件时,流式数据库会按照 SQL 语句中定义的规则处理该事件。如果新插入行的 payload
中 email
字段以 “@risingwave.com” 结尾,那么物化视图中将插入一个新的提示。而物化视图的更新本身也是一个事件,RisingWave 提供了发布/订阅机制,可以在物化视图更新时通知下游处理。
4为什么选择流式数据库
SQL 接口至关重要
SQL 是一种描述目标结果的语言,无需手动编写计算过程。数据库会根据 SQL 自动生成优化的计算图。例如,如果想获取销售金额最高的前 10 条交易记录,只需写下:
SELECT * FROM sales_global ORDER BY amount_usd LIMIT 10;
MATERIALIZED VIEW来维护这个查询的最新结果:
CREATE MATERIALIZED VIEW top_10_sales AS
SELECT * FROM sales_global ORDER BY amount_usd LIMIT 10;
每次查询物化视图 SELECT * FROM top_10_sales
都会得到最新的前 10 条销售记录。当上游数据发生变化时,物化视图会自动增量更新。
如果让 LLM 写 Python 代码来实现这个需求,LLM 不仅需要写出正确的计算过程,还需要实现一个分布式的增量更新 Top-K 算法来处理事件。这需要复杂的提示工程,让 LLM 成为流处理和分布式系统的专家。而使用 SQL 时,LLM 只需描述需要的结果。这意味着更少的 LLM 逻辑步骤、更少的提示工程、更少的领域知识需求,从而大幅降低成本。
输入与输出
Streaming joins
流式联接(Streaming joins)是事件驱动应用中常见的需求,RisingWave 专为流处理设计,其在流式联接场景中表现出色。以下是流式联接的示例:
假设你拥有一家跨国公司,产品在全球范围内销售,而你现在位于北京总部,希望实时监控销售金额(以人民币计算)。数据库中有两张表可用:
sales_global:记录所有销售交易。 exchange_rate:记录特定时间点的最新汇率。

sales_global 和 exchange_rate 表
由于销售发生在全球范围内,交易可能以任意货币结算。在汇总销售金额之前,需要用当时的最新汇率将交易金额转换为人民币。例如,若要转换 2024-11-05T07:00:00, 10 USD
,需找到 exchange_rates
表中满足以下条件的行:
currency_from
为 USD;currency_to
为 CNY;ts
是不超过2024-11-05T07:00:00
的最大时间戳。

这样的 Streaming join 对于列存储数据库来说就已经无法高效处理了。而在实时事件处理场景中,情况可能更加复杂,尤其当你的数据库中有大量维表时,联接表可能不止一个。

可运行的 Demo: https://github.com/cloudcarver/event-driven-agent-demo
关于 RisingWave


往期推荐
技术内幕




