GreptimeDB 是一个高性能时序数据库,Greptime 团队提供了支持 MCP 协议的开源项目 greptimedb-mcp-server
[1]。
AWS CloudTrail 用于记录 AWS 上的操作,例如创建资源、权限授予等。
本文将 AWS CloudTrail 的日志导入到 GreptimeDB 中,并使用 GreptimeDB MCP Server,让大模型对 CloudTrail 日志进行分析,发现云安全威胁和恶意操作。
以下是一个聊天的效果,我们查询到 155.63.17.217
这一个IP 地址的异常操作:

环境准备
GreptimeDB 下载和安装
下载 GreptimeDB 二进制文件,解压后并进入对应的目录,使用以下的命令运行:
./greptime standalone start

使用 GreptimeDB Dashboard 查询数据(可选)
GreptimeDB 兼容 MySQL 和 PostgrepSQL 可以用 Navicat、DBeaver 等工具连接,也可以使用 Greptime 提供的 GreptimeDB dashboard[2]。

GreptimeDB MCP 服务器
GreptimeTeam/greptimedb-mcp-server
[1] 基于 Python 开发,可以使用 Git 克隆到本地:
git clone https://github.com/GreptimeTeam/greptimedb-mcp-server.git
确保本地已经安装 Python 的包管理工具 uv
,用于运行该项目,安装步骤可以查看这里[3]。
MCP 客户端
使用 MCP 的客户端来连接 MCP 服务器,例如 Claude Desktop、Cline、Cherry Studio、ChatWise。
本文以 ChatWise 为例,将下面的 JSON 修改 MCP Server 的路径后,复制导入 MCP 客户端:
{
"mcpServers": {
"greptimedb": {
"command": "uv",
"args": [
"--directory",
"/path/to/greptimedb-mcp-server",
"run",
"-m",
"greptimedb_mcp_server.server"
],
"env": {
"GREPTIMEDB_HOST": "localhost",
"GREPTIMEDB_PORT": "4002",
"GREPTIMEDB_USER": "root",
"GREPTIMEDB_PASSWORD": "",
"GREPTIMEDB_DATABASE": "public"
}
}
}
}

数据导入
下载数据集
下载 CloudTrail 的数据集来进行测试。
关于此数据集可参考:
Summit Route — 来自 flaws.cloud 的 Cloudtrail 日志的公共数据集[4] Threat detection🕵️♂️ in AWS using Amazon Athena Service[5]
下载到本地后,对文此件进行解压,如果是 Windows 可以采用 7z 进行解压,如果是 Unix/Linux 系统可以使用以下命令:
find . -type f -exec gunzip {} \;
导入数据到 GreptimDB
利用 Python 脚本将 JSON 格式的 CloudTrail 日志导入到 GreptimeDB,把该脚本存入下载数据集的同一目录下并运行:
import os
import glob
import json
import pymysql
# 配置
GREPTIMEDB_HOST = 'localhost'
GREPTIMEDB_PORT = 4002
GREPTIMEDB_USER = 'root'
GREPTIMEDB_PASSWORD = ''
DB_NAME = 'public'
TABLE_NAME = 'cloudtrail_events'
DATA_DIR = os.path.expanduser('./')
BATCH_SIZE = 1000
FIELDS = [
'eventTime', 'eventID', 'eventName', 'eventSource', 'sourceIPAddress',
'userAgent', 'awsRegion', 'recipientAccountId', 'errorCode', 'errorMessage',
'userIdentity', 'eventType', 'requestParameters', 'requestID',
'responseElements', 'eventVersion'
]
JSON_FIELDS = {'userIdentity', 'requestParameters', 'responseElements'}
def connect_db():
return pymysql.connect(
host=GREPTIMEDB_HOST,
port=GREPTIMEDB_PORT,
user=GREPTIMEDB_USER,
password=GREPTIMEDB_PASSWORD,
database=DB_NAME,
cursorclass=pymysql.cursors.DictCursor,
autocommit=True
)
def format_record(record):
values = []
for field in FIELDS:
value = record.get(field)
if field in JSON_FIELDS and isinstance(value, dict):
values.append(json.dumps(value)) # Convert dict to JSON string
elif value isNoneand field in JSON_FIELDS:
values.append(None) # Handle null JSON fields explicitly
else:
values.append(value)
return values
def load_and_insert():
conn = connect_db()
cursor = conn.cursor()
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
event_time TIMESTAMP TIME INDEX,
event_id VARCHAR PRIMARY KEY,
event_name VARCHAR,
event_source VARCHAR,
source_ip VARCHAR,
user_agent TEXT,
aws_region VARCHAR,
recipient_account_id VARCHAR,
error_code VARCHAR,
error_message TEXT,
user_identity JSON,
event_type VARCHAR,
request_parameters JSON,
request_id VARCHAR,
response_elements JSON,
event_version VARCHAR
);
"""
cursor.execute(create_table_sql)
print("✅ Table checked/created.")
batch = []
for filename in glob.glob(os.path.join(DATA_DIR, '*.json')):
print(f"📄 Processing {filename}")
with open(filename, 'r') as f:
data = json.load(f)
for record in data.get('Records', []):
batch.append(format_record(record))
if len(batch) >= BATCH_SIZE:
insert_batch_fast(cursor, batch)
batch = []
if batch:
insert_batch_fast(cursor, batch)
cursor.close()
conn.close()
def insert_batch_fast(cursor, batch):
ifnot batch:
return
placeholders = "(" + ", ".join(["%s"] * len(FIELDS)) + ")"
all_placeholders = ", ".join([placeholders] * len(batch))
flat_values = [item for row in batch for item in row]
fields_list = ', '.join([
'event_time', 'event_id', 'event_name', 'event_source',
'source_ip', 'user_agent', 'aws_region', 'recipient_account_id',
'error_code', 'error_message', 'user_identity', 'event_type',
'request_parameters', 'request_id', 'response_elements', 'event_version'
])
sql = f"INSERT INTO {TABLE_NAME} ({fields_list}) VALUES {all_placeholders}"
cursor.execute(sql, flat_values)
def main():
load_and_insert()
if __name__ == '__main__':
main()
上述代码的作用是:
创建 CloudTrail 的表; 导入原始的 JSON 数据到 GreptimeDB 中。

在 GreptimeDB Dashboard 上查询数据,验证是否导入成功:

大模型问答
参考 Promtp 如下:
# 监控指标分析模板
你是一安全日志分析助手,致力于帮助用户查询和分析 GreptimeDB 中的数据。您的任务是分析用户的数据需求,并提供 SQL 查询语句,从 GreptimeDB 中提取相关信息。
## 1. 概述
GreptimeDB 是一个统一指标、日志和事件的时间序列数据库。
1. 提示:此服务器提供提示,帮助您构建与 GreptimeDB 的交互。
2. 资源:您可以在资源中找到格式为"greptime://<table_name>/data"的表。
3. 工具:
- "execute_sql":执行 SQL 命令(MySQL 语法)。
## 2. 指导原则
1. 时间范围至关重要 - 请务必在查询中指定时间范围。
2. 要探索可用数据,请使用 SQL 命令,例如 (`DESCRIBE`、`SELECT`、`SHOW TABLES`)
3. 遵循 SQL 最佳实践:
- 使用适当的筛选条件来限制结果集
- 考虑对时间序列数据使用聚合函数
- 利用 GreptimeDB 的内置时间函数
4. 服务器将阻止危险操作。除非需要修改数据,否则请专注于读取操作
5. 使用简洁的 Markdown 格式,并添加适当的标题和项目符号。
## 3. 示例用例和查询
- 尝试通过特定用户 ARN 启动 EC2 实例
- 谁可以通过控制台访问
- 搜寻 AWS 控制台的登录失败
- 通过 AWS 管理控制台调查创建的事件(IAM、S3 和 EC2)资源。
- 通过 AWS 管理控制台、AWS CLI、SDK 或直接 API 调用调查创建的事件(IAM、S3 和 EC2)资源
- 搜寻 CloudTrail 中断
- 搜寻未经授权的呼叫
- 寻找 “whoami” 身份
- 通过创建 IAM 用户来入侵账户的努力
- 在 Secrets Manager 中搜寻访问密钥
- 搜寻 xlarge EC2 实例
- 搜寻 S3 存储桶暴力破解尝试
- 搜寻可疑用户代理(Kali、Parrot、PowerShell)
- 搜寻永久密钥创建
- 创建公有 S3 存储桶
- 暴力破解尝试代入角色
- 尝试对账户执行对帐作
```sql
-- 获取表结构
DESCRIBE ${table};
-- 获取近期数据样本
SELECT * FROM ${table}
${sample_queries} ORDER BY ${time_column} DESC LIMIT 100
-- 尝试启动 EC2实例由特定用户 ARN
SELECT
eventname,
count(*) AS eventcount
FROM cloudtrail_logs WHERE
user_identity.arn = '${arn}'
AND source_ip='${ip}'
GROUP BY eventname ORDER BY eventcount DESC;
-- 日志配置变更的高风险操作
SELECT event_time, error_message, user_identity, source_ip, event_name, user_agent, aws_region FROM cloudtrail_events WHERE event_name IN ('StopLogging', 'DeleteTrail', 'UpdateTrail') AND event_time BETWEEN TIMESTAMP '2018-01-01T00:00:00Z' AND TIMESTAMP '2020-12-31T23:59:59Z' ORDER BY event_time DESC LIMIT 100
## 4. 补充说明
1. 如果您不知道如何回答特定问题,建议先探索模式以了解可用的数据结构。
2. 以清晰、翔实的方式解释查询结果。
3. 帮助他们有效地分析云安全日志数据。
示例
示例一
分析 cloudtrail_events
表中在 2018 年 4 月存在的高危操作(注意这个表很大,谨慎使用 SQL)。

示例二
分析 AKIA01U43UX3RBRDXF4Q
用户的操作是否存在高危(注意这个表很大,请谨慎使用 SQL)。

对上述的内容进行验证,确实是存在恶意的操作。来自 goodycy3’s gists 的博客分享了对关于数据集的分析,感兴趣的朋友可以尝试一下。
注:本文最终解释权归作者 JetDeng 和 GreptimeDB 所有。
Reference
[1] https://github.com/GreptimeTeam/greptimedb-mcp-server
[2] https://github.com/GreptimeTeam/dashboard
[3] https://docs.astral.sh/uv/getting-started/installation/
[4] https://summitroute.com/blog/2020/10/09/public_dataset_of_cloudtrail_logs_from_flaws_cloud/
[5] https://goodycyb.hashnode.dev/threat-detection-in-aws-using-amazon-athena-to-analyze-cloudtrail-logs-from-flawscloud
关于 Greptime
Greptime 格睿科技专注于打造新一代可观测数据库,服务开发者与企业用户,覆盖从从边缘设备到云端企业级部署的多样化需求。
GreptimeDB 开源版:开源、云原生,统一处理指标、日志和追踪数据,适合中小规模 IoT,个人项目与可观测性场景; GreptimeDB 企业版:面向关键业务,提供更高性能、高安全性、高可用性和智能化运维服务; GreptimeCloud 云服务:全托管云服务,零运维体验“企业级”可观测数据库,弹性扩展,按需付费。
欢迎加入开源社区参与贡献与交流!推荐从带有 good first issue
标签的任务入手,一起共建可观测未来。

⭐ Star us on GitHub:https://github.com/GreptimeTeam/greptimedb
📚 官网:https://greptime.cn/
📖 文档:https://docs.greptime.cn/
🌍 Twitter:https://twitter.com/Greptime
💬 Slack:https://greptime.com/slack
💼 LinkedIn:https://www.linkedin.com/company/greptime/
往期精彩文章:



点击「阅读原文」,立即体验 GreptimeDB!




