暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用 GreptimeDB MCP 审计 AWS CloudTrail 日志

GreptimeDB 2025-05-13
338

GreptimeDB 是一个高性能时序数据库,Greptime 团队提供了支持 MCP 协议的开源项目 greptimedb-mcp-server
[1]。

AWS CloudTrail 用于记录 AWS 上的操作,例如创建资源、权限授予等。

本文将 AWS CloudTrail 的日志导入到 GreptimeDB 中,并使用 GreptimeDB MCP Server,让大模型对 CloudTrail 日志进行分析,发现云安全威胁和恶意操作。

以下是一个聊天的效果,我们查询到 155.63.17.217
 这一个IP 地址的异常操作:

(图 1:某 IP 地址的异常操作)

环境准备

GreptimeDB 下载和安装

  1. 下载 GreptimeDB 二进制文件,解压后并进入对应的目录,使用以下的命令运行:
./greptime standalone start

(图 2:启动 GreptimeDB)
  1. 使用 GreptimeDB Dashboard 查询数据(可选)

GreptimeDB 兼容 MySQL 和 PostgrepSQL 可以用 Navicat、DBeaver 等工具连接,也可以使用 Greptime 提供的 GreptimeDB dashboard[2]。

(图 3:GreptimeDB Dashboard)

GreptimeDB MCP 服务器

GreptimeTeam/greptimedb-mcp-server
[1] 基于 Python 开发,可以使用 Git 克隆到本地:

git clone https://github.com/GreptimeTeam/greptimedb-mcp-server.git

确保本地已经安装 Python 的包管理工具 uv
,用于运行该项目,安装步骤可以查看这里[3]。

MCP 客户端

使用 MCP 的客户端来连接 MCP 服务器,例如 Claude Desktop、Cline、Cherry Studio、ChatWise。

本文以 ChatWise 为例,将下面的 JSON 修改 MCP Server 的路径后,复制导入 MCP 客户端:

{
  "mcpServers": {
    "greptimedb": {
      "command""uv",
      "args": [
        "--directory",
        "/path/to/greptimedb-mcp-server",
        "run",
        "-m",
        "greptimedb_mcp_server.server"
      ],
      "env": {
        "GREPTIMEDB_HOST""localhost",
        "GREPTIMEDB_PORT""4002",
        "GREPTIMEDB_USER""root",
        "GREPTIMEDB_PASSWORD""",
        "GREPTIMEDB_DATABASE""public"
      }
    }
  }
}

(图 4:在 ChatWise 中配置模型)

数据导入

下载数据集

下载 CloudTrail 的数据集来进行测试。

关于此数据集可参考:

  • Summit Route — 来自 flaws.cloud 的 Cloudtrail 日志的公共数据集[4]
  • Threat detection🕵️‍♂️ in AWS using Amazon Athena Service[5]

下载到本地后,对文此件进行解压,如果是 Windows 可以采用 7z 进行解压,如果是 Unix/Linux 系统可以使用以下命令:

find . -type f -exec gunzip {} \;

导入数据到 GreptimDB

利用 Python 脚本将 JSON 格式的 CloudTrail 日志导入到 GreptimeDB,把该脚本存入下载数据集的同一目录下并运行:

import os
import glob
import json
import pymysql

# 配置
GREPTIMEDB_HOST = 'localhost'
GREPTIMEDB_PORT = 4002
GREPTIMEDB_USER = 'root'
GREPTIMEDB_PASSWORD = ''
DB_NAME = 'public'
TABLE_NAME = 'cloudtrail_events'

DATA_DIR = os.path.expanduser('./')
BATCH_SIZE = 1000

FIELDS = [
    'eventTime''eventID''eventName''eventSource''sourceIPAddress',
    'userAgent''awsRegion''recipientAccountId''errorCode''errorMessage',
    'userIdentity''eventType''requestParameters''requestID',
    'responseElements''eventVersion'
]

JSON_FIELDS = {'userIdentity''requestParameters''responseElements'}


def connect_db():
    return pymysql.connect(
        host=GREPTIMEDB_HOST,
        port=GREPTIMEDB_PORT,
        user=GREPTIMEDB_USER,
        password=GREPTIMEDB_PASSWORD,
        database=DB_NAME,
        cursorclass=pymysql.cursors.DictCursor,
        autocommit=True
    )

def format_record(record):
    values = []
    for field in FIELDS:
        value = record.get(field)
        if field in JSON_FIELDS and isinstance(value, dict):
            values.append(json.dumps(value))  # Convert dict to JSON string
        elif value isNoneand field in JSON_FIELDS:
            values.append(None)  # Handle null JSON fields explicitly
        else:
            values.append(value)
    return values

def load_and_insert():
    conn = connect_db()
    cursor = conn.cursor()

    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
        event_time TIMESTAMP TIME INDEX,
        event_id VARCHAR PRIMARY KEY,
        event_name VARCHAR,
        event_source VARCHAR,
        source_ip VARCHAR,
        user_agent TEXT,
        aws_region VARCHAR,
        recipient_account_id VARCHAR,
        error_code VARCHAR,
        error_message TEXT,
        user_identity JSON,
        event_type VARCHAR,
        request_parameters JSON,
        request_id VARCHAR,
        response_elements JSON,
        event_version VARCHAR
    );
    """

    cursor.execute(create_table_sql)
    print("✅ Table checked/created.")

    batch = []
    for filename in glob.glob(os.path.join(DATA_DIR, '*.json')):
        print(f"📄 Processing {filename}")
        with open(filename, 'r'as f:
            data = json.load(f)
            for record in data.get('Records', []):
                batch.append(format_record(record))
                if len(batch) >= BATCH_SIZE:
                    insert_batch_fast(cursor, batch)
                    batch = []

    if batch:
        insert_batch_fast(cursor, batch)

    cursor.close()
    conn.close()

def insert_batch_fast(cursor, batch):
    ifnot batch:
        return

    placeholders = "(" + ", ".join(["%s"] * len(FIELDS)) + ")"
    all_placeholders = ", ".join([placeholders] * len(batch))
    flat_values = [item for row in batch for item in row]

    fields_list = ', '.join([
        'event_time''event_id''event_name''event_source',
        'source_ip''user_agent''aws_region''recipient_account_id',
        'error_code''error_message''user_identity''event_type',
        'request_parameters''request_id''response_elements''event_version'
    ])
    sql = f"INSERT INTO {TABLE_NAME} ({fields_list}) VALUES {all_placeholders}"
    cursor.execute(sql, flat_values)

def main():
    load_and_insert()

if __name__ == '__main__':
    main()

上述代码的作用是:

  • 创建 CloudTrail 的表;
  • 导入原始的 JSON 数据到 GreptimeDB 中。
(图 5:导入到 GreptimeDB)

在 GreptimeDB Dashboard 上查询数据,验证是否导入成功:

(图 6:查询导入结果)

大模型问答

参考 Promtp 如下:

# 监控指标分析模板
你是一安全日志分析助手,致力于帮助用户查询和分析 GreptimeDB 中的数据。您的任务是分析用户的数据需求,并提供 SQL 查询语句,从 GreptimeDB 中提取相关信息。


## 1. 概述
GreptimeDB 是一个统一指标、日志和事件的时间序列数据库。
1. 提示:此服务器提供提示,帮助您构建与 GreptimeDB 的交互。
2. 资源:您可以在资源中找到格式为"greptime://<table_name>/data"的表。
3. 工具:
- "execute_sql":执行 SQL 命令(MySQL 语法)。

## 2. 指导原则
1. 时间范围至关重要 - 请务必在查询中指定时间范围。
2. 要探索可用数据,请使用 SQL 命令,例如 (`DESCRIBE`、`SELECT`、`SHOW TABLES`)
3. 遵循 SQL 最佳实践:
- 使用适当的筛选条件来限制结果集
- 考虑对时间序列数据使用聚合函数
- 利用 GreptimeDB 的内置时间函数
4. 服务器将阻止危险操作。除非需要修改数据,否则请专注于读取操作
5. 使用简洁的 Markdown 格式,并添加适当的标题和项目符号。

## 3. 示例用例和查询
- 尝试通过特定用户 ARN 启动 EC2 实例
- 谁可以通过控制台访问
- 搜寻 AWS 控制台的登录失败
- 通过 AWS 管理控制台调查创建的事件(IAM、S3 和 EC2)资源。
- 通过 AWS 管理控制台、AWS CLI、SDK 或直接 API 调用调查创建的事件(IAM、S3 和 EC2)资源
- 搜寻 CloudTrail 中断
- 搜寻未经授权的呼叫
- 寻找 “whoami” 身份
- 通过创建 IAM 用户来入侵账户的努力
- 在 Secrets Manager 中搜寻访问密钥
- 搜寻 xlarge EC2 实例
- 搜寻 S3 存储桶暴力破解尝试
- 搜寻可疑用户代理(Kali、Parrot、PowerShell)
- 搜寻永久密钥创建
- 创建公有 S3 存储桶
- 暴力破解尝试代入角色
- 尝试对账户执行对帐作

```sql
-- 获取表结构
DESCRIBE ${table};
-- 获取近期数据样本
SELECT * FROM ${table}
${sample_queries} ORDER BY ${time_column} DESC LIMIT 100

-- 尝试启动 EC2实例由特定用户 ARN
SELECT 
  eventname, 
  count(*) AS eventcount 
FROM cloudtrail_logs WHERE 
  user_identity.arn = '${arn}' 
  AND source_ip='${ip}'
GROUP BY eventname ORDER BY eventcount DESC;

-- 日志配置变更的高风险操作
SELECT event_time, error_message, user_identity, source_ip, event_name, user_agent, aws_region FROM cloudtrail_events WHERE event_name IN ('StopLogging', 'DeleteTrail', 'UpdateTrail') AND event_time BETWEEN TIMESTAMP '2018-01-01T00:00:00Z' AND TIMESTAMP '2020-12-31T23:59:59Z' ORDER BY event_time DESC LIMIT 100

## 4. 补充说明
1. 如果您不知道如何回答特定问题,建议先探索模式以了解可用的数据结构。
2. 以清晰、翔实的方式解释查询结果。
3. 帮助他们有效地分析云安全日志数据。

示例

示例一

分析 cloudtrail_events
 表中在 2018 年 4 月存在的高危操作(注意这个表很大,谨慎使用 SQL)。

(图 7:示例一)

示例二

分析 AKIA01U43UX3RBRDXF4Q
 用户的操作是否存在高危(注意这个表很大,请谨慎使用 SQL)。

(图 8:示例二)

对上述的内容进行验证,确实是存在恶意的操作。来自 goodycy3’s gists 的博客分享了对关于数据集的分析,感兴趣的朋友可以尝试一下。


注:本文最终解释权归作者 JetDeng 和 GreptimeDB 所有。

Reference

[1] https://github.com/GreptimeTeam/greptimedb-mcp-server 

[2] https://github.com/GreptimeTeam/dashboard 

[3] https://docs.astral.sh/uv/getting-started/installation/ 

[4] https://summitroute.com/blog/2020/10/09/public_dataset_of_cloudtrail_logs_from_flaws_cloud/ 

[5] https://goodycyb.hashnode.dev/threat-detection-in-aws-using-amazon-athena-to-analyze-cloudtrail-logs-from-flawscloud


关于 Greptime

Greptime 格睿科技专注于打造新一代可观测数据库,服务开发者与企业用户,覆盖从从边缘设备到云端企业级部署的多样化需求。

  • GreptimeDB 开源版:开源、云原生,统一处理指标、日志和追踪数据,适合中小规模 IoT,个人项目与可观测性场景;
  • GreptimeDB 企业版:面向关键业务,提供更高性能、高安全性、高可用性和智能化运维服务;
  • GreptimeCloud 云服务:全托管云服务,零运维体验“企业级”可观测数据库,弹性扩展,按需付费。

欢迎加入开源社区参与贡献与交流!推荐从带有 good first issue
 标签的任务入手,一起共建可观测未来。

⭐ Star us on GitHub:https://github.com/GreptimeTeam/greptimedb 

📚 官网:https://greptime.cn/ 

📖 文档:https://docs.greptime.cn/ 

🌍 Twitter:https://twitter.com/Greptime 

💬 Slack:https://greptime.com/slack 

💼 LinkedIn:https://www.linkedin.com/company/greptime/


往期精彩文章:

点击「阅读原文」,立即体验 GreptimeDB!

文章转载自GreptimeDB,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论