简介
Compass是一个诊断大数据生态系统中计算引擎和调度器的平台,旨在提高故障排除的效率并降低问题调整的复杂性。它自动收集日志和指标,除了使用启发式规则来识别问题并提供调整建议,对于日志,还使用了ChatGPT还提供诊断建议, 日志将使用drain算法自动聚合为模板,可用于人工干预等,提升诊断自动化和优化方案能力。
其主要功能特性如下:
非侵入式,即时诊断,无需修改已有的调度平台,即可体验诊断效果。
支持多种主流调度平台,例如DolphinScheduler 2.x和3.x、Airflow或自研等。
支持多版本Spark、MapReduce、Flink、Hadoop 2.x和3.x 任务日志诊断和解析。
支持工作流层异常诊断,识别各种失败和基线耗时异常问题。
支持引擎层异常诊断,包含数据倾斜、大表扫描、内存浪费等14种异常类型。
支持各种日志匹配规则编写和异常阈值调整,可自行根据实际场景优化。
支持一键诊断全量(包含非调度平台提交任务)Spark/MapReduce任务。
支持ChatGPT对异常日志进行诊断,提供解决方案,使用了drain算法聚合模板,节约成本。
支持组件
社区
欢迎加入社区咨询使用或成为 Compass 开发者。以下是获得帮助的方法:
提交 issue。
提交 pull request, 请阅读 contributing guideline。
讨论 Idea & Question。
我们将会尽快回复。
罗盘已支持诊断类型概览:
| 引擎 | 诊断维度 | 诊断类型 | 类型说明 |
| Spark | 失败分析 | 运行失败 | 最终运行失败的任务 |
| 首次失败 | 重试次数大于1的成功任务 | ||
| 长期失败 | 最近10天运行失败的任务 | ||
| 耗时分析 | 基线时间异常 | 相对于历史正常结束时间,提前结束或晚点结束的任务 | |
| 基线耗时异常 | 相对于历史正常运行时长,运行时间过长或过短的任务 | ||
| 运行耗时长 | 运行时间超过2小时的任务 | ||
| 报错分析 | sql失败 | 因sql执行问题而导致失败的任务 | |
| shuffle失败 | 因shuffle执行问题而导致失败的任务 | ||
| 内存溢出 | 因内存溢出问题而导致失败的任务 | ||
| 资源分析 | 内存浪费 | 内存使用峰值与总内存占比过低的任务 | |
| CPU浪费 | driver/executor计算时间与总CPU计算时间占比过低的任务 | ||
| 效率分析 | 大表扫描 | 没有限制分区导致扫描行数过多的任务 | |
| OOM预警 | 广播表的累计内存与driver或executor任意一个内存占比过高的任务 | ||
| 数据倾斜 | stage中存在task处理的最大数据量远大于中位数的任务 | ||
| Job耗时异常 | job空闲时间与job运行时间占比过高的任务 | ||
| Stage耗时异常 | stage空闲时间与stage运行时间占比过高的任务 | ||
| Task长尾 | stage中存在task最大运行耗时远大于中位数的任务 | ||
| HDFS卡顿 | stage中存在task处理速率过慢的任务 | ||
| 推测执行Task过多 | stage中频繁出现task推测执行的任务 | ||
| 全局排序异常 | 全局排序导致运行耗时过长的任务 | ||
| MapReduce | 资源分析 | 内存浪费 | 内存使用峰值与总内存占比过低的任务 |
| 效率分析 | 大表扫描 | 扫描行数过多的任务 | |
| Task长尾 | map/reduce task最大运行耗时远大于中位数的任务 | ||
| 数据倾斜 | map/reduce task处理的最大数据量远大于中位数的任务 | ||
| 推测执行Task过多 | map/reduce task中频繁出现推测执行的任务 | ||
| GC异常 | GC时间相对CPU时间占比过高的任务 | ||
| Flink | 资源诊断 | 内存利用率高 | 计算内存的使用率,如果使用率高于阈值,则增加内存 |
| 内存利用率低 | 计算内存的使用率,如果使用率低于阈值,则降低内存 | ||
| JM内存优化 | 根据tm个数计算jm内存的建议值 | ||
| 作业无流量 | 检测作业的kafka source算子是否没有流量 | ||
| TM管理内存优化 | 计算作业管理内存的使用率,给出合适的管理内存建议值 | ||
| 部分TM空跑 | 检测是否有tm没有流量,并且cpu和内存也没有使用 | ||
| 并行度不够 | 检测作业是否因为并行度不够引起延迟 | ||
| CPU利用率高 | 计算作业的CPU均值使用率,如果高于阈值,则增加cpu | ||
| CPU利用率低 | 计算作业的CPU均值使用率,如果低于阈值,则降低cpu | ||
| CPU峰值利用率高 | 计算作业的CPU峰值使用率,如果高于阈值,则增加cpu | ||
| 异常诊断 | 存在慢算子 | 检测作业是否存在慢算子 | |
| 存在反压算子 | 检测作业是否存在反压算子 | ||
| 作业延迟高 | 检测作业的kafka延迟是否高于阈值 |
UI
Spark:

Flink:
系统架构
系统架构图
架构说明
整体架构分3层:
调度系统对接层:实现对接调度器、Yarn、Spark、Flink、HDFS等系统,同步任务及其日志元数据到诊断系统;
服务层:包括数据采集、元数据关联&模型标准化、异常检测、资源诊断、Portal模块;
基础组件层:包括MySQL、 OpenSearch、Kafka、Redis、Zookeeper等组件。
具体模块流程阶段:
数据采集阶段:task-canal/adapter模块订阅同步调度系统的用户、DAG、作业、执行记录等工作流元数据同步至诊断平台;task-metadata模块定时同步Yarn ResourceManager、Spark HistoryServer App元数据至诊断系统,关联日志存储路径,为后续数据处理阶段作基础;
数据关联与模型标准化阶段:task-syncer模块将同步的数据标准化为User、Project、Flow、Task、TaskInstance模型;task-application模块将工作流层与引擎层元数据关联;
工作流层&引擎层异常检测阶段:至此已经获得数据标准模型,针对标准模型进一步Workflow异常检测流程。task-detect模块进行工作流层异常任务检测,例如运行失败、基线耗时异常等;task-parser模块进行引擎层异常任务检测,例如SQL失败、Shuffle失败等;task-flink模块进行flink作业资源及异常检测,例如cpu利用率低,内存利用率低等;
业务视图:task-portal模块提供用户报告总览、一键诊断、工作流层任务诊断、引擎层作业Application诊断、诊断建议和详细报告、白名单等功能。
Compass(罗盘) 部署指南
Compass 依赖了Canal、PostgreSQL(或MySQL)、Kafka、Redis、Zookeeper、OpenSearch,需要提前准备好相关环境。
环境要求
| Dependency | Version | Optional | Description |
|---|---|---|---|
| Canal | v1.1.6+ | yes | needed by Airflow,DolphinScheduler |
| MySQL | 5.7+ | yes | |
| PostgreSQL | 10.0+ | no | |
| Kafka | all | no | |
| Redis | all | no | deployed in cluster mode |
| Zookeeper | 3.4.5 | no | needed by canal |
| OpenSearch | 1.3.12 | no |
OpenSearch兼容Elasticsearch 7.0+。
Compass 支持单机和集群部署,可按模块弹性扩缩容。
编译
请使用JDK 8以及maven 3.6.0+进行编译,构建流程步骤如下:
git clone https://github.com/cubefs/compass.git
cd compass
mvn clean package -DskipTests -Pdist
或者
mvn clean package -DskipTests -Pdist,spark (打包时web展示只有spark诊断页面)
或者
mvn clean package -DskipTests -Pdist,flink (打包时web展示只有flink诊断页面)
使用docker compose启动应用服务cp dist/compass-v1.1.2.tar.gz docker/playground
cd docker/playground/
docker compose --profile dependencies up -d
docker compose --profile compass-demo up -d多请查看文档 docker-playground
初始化数据库
支持 PostgreSQL(默认) 或 MySQL(需要手动下载mysql-connector-java复制到各模块lib目录下,canal除外) 作为元数据存储。
表结构由两部分组成:document/sql/compass_.sql,document/sql/dolphinscheduler_.sql(需要根据实际使用版本修改,支持2.x和3.x)或document/sql/airflow_*.sql(支持2.x)。
如果您使用的是自研调度平台,请参考上述sql表结构。
修改配置和启动
compass/bin 和 compass/conf 是作为公共脚本和配置使用,方便统一启停和配置管理。
# 启动所有模块
./bin/start_all.sh
# 停止所有模块
./bin/stop_all.sh
compass_env.sh 配置说明
Kafka需要预先创建好topic: mysqldata,task-instance,task-application,exception-log
#!/bin/bash
# dolphinscheduler or airflow or custom
export SCHEDULER="dolphinscheduler"
export SPRING_PROFILES_ACTIVE="hadoop,${SCHEDULER}"
# Configuration for Scheduler MySQL, compass will subscribe data from scheduler database via canal
export SCHEDULER_MYSQL_ADDRESS="localhost:3306"
export SCHEDULER_MYSQL_DB="dolphinscheduler"
export SCHEDULER_DATASOURCE_URL="jdbc:mysql://${SCHEDULER_MYSQL_ADDRESS}/${SCHEDULER_MYSQL_DB}?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"
export SCHEDULER_DATASOURCE_USERNAME=""
export SCHEDULER_DATASOURCE_PASSWORD=""
# Configuration for compass database(mysql or postgresql)
export DATASOURCE_TYPE="mysql"
export COMPASS_DATASOURCE_ADDRESS="localhost:3306"
export COMPASS_DATASOURCE_DB="compass"
export SPRING_DATASOURCE_URL="jdbc:${DATASOURCE_TYPE}://${COMPASS_DATASOURCE_ADDRESS}/${COMPASS_DATASOURCE_DB}"
export SPRING_DATASOURCE_USERNAME=""
export SPRING_DATASOURCE_PASSWORD=""
# Configuration for compass Kafka, used to subscribe data by canal and log queue, etc. (default version: 3.4.0)
export SPRING_KAFKA_BOOTSTRAPSERVERS="host1:port,host2:port"
# Configuration for compass redis, used to cache and log queue, etc . (cluster mode)
export SPRING_REDIS_CLUSTER_NODES="localhost:6379"
# Optional
export SPRING_REDIS_PASSWORD=""
# Zookeeper (cluster: 3.4.5, needed by canal)
export SPRING_ZOOKEEPER_NODES="localhost:2181"
# OpenSearch (default version: 1.3.12) or Elasticsearch (7.x~)
export SPRING_OPENSEARCH_NODES="localhost:9200"
# Optional
export SPRING_OPENSEARCH_USERNAME=""
# Optional
export SPRING_OPENSEARCH_PASSWORD=""
# Optional, needed by OpenSearch, keep empty if OpenSearch does not use truststore.
export SPRING_OPENSEARCH_TRUSTSTORE=""
# Optional, needed by OpenSearch, keep empty if OpenSearch does not use truststore.
export SPRING_OPENSEARCH_TRUSTSTOREPASSWORD=""
# Prometheus for flink, ignore it if you do not need flink.
export FLINK_PROMETHEUS_HOST="http://localhost:9090"
export FLINK_PROMETHEUS_TOKEN=""
export FLINK_PROMETHEUS_DATABASE=""
# Optional, needed by task-gpt module to get exception solution, ignore if you do not need it.
export CHATGPT_ENABLE=false
# Openai keys needed by enabling chatgpt, random access the key if there are multiple keys.
export CHATGPT_API_KEYS=sk-xxx1,sk-xxx2
# Optional, needed if setting proxy, or keep it empty.
export CHATGPT_PROXY="" # for example, https://proxy.ai
# chatgpt model
export CHATGPT_MODEL="gpt-3.5-turbo"
# chatgpt prompt
export CHATGPT_PROMPT="You are a senior expert in big data, teaching beginners. I will give you some anomalies and you will provide solutions to them."
# task-canal模块配置
# 调度平台MySQL订阅账号,确定是否已开启binlog
export CANAL_INSTANCE_MASTER_ADDRESS=${SCHEDULER_MYSQL_ADDRESS}
export CANAL_INSTANCE_DBUSERNAME=${SCHEDULER_DATASOURCE_USERNAME}
export CANAL_INSTANCE_DBPASSWORD=${SCHEDULER_DATASOURCE_PASSWORD}
# 需要订阅的库表配置过滤
if [ ${SCHEDULER} == "dolphinscheduler" ]; then
export CANAL_INSTANCE_FILTER_REGEX="${SCHEDULER_MYSQL_DB}.t_ds_user,${SCHEDULER_MYSQL_DB}.t_ds_project,${SCHEDULER_MYSQL_DB}.t_ds_task_definition,${SCHEDULER_MYSQL_DB}.t_ds_task_instance,${SCHEDULER_MYSQL_DB}.t_ds_process_definition,${SCHEDULER_MYSQL_DB}.t_ds_process_instance,${SCHEDULER_MYSQL_DB}.t_ds_process_task_relation"
elif [ ${SCHEDULER} == "airflow" ]; then
export CANAL_INSTANCE_FILTER_REGEX="${SCHEDULER_MYSQL_DB}.dag,${SCHEDULER_MYSQL_DB}.serialized_dag,${SCHEDULER_MYSQL_DB}.ab_user,${SCHEDULER_MYSQL_DB}.dag_run,${SCHEDULER_MYSQL_DB}.task_instance"
else
export CANAL_INSTANCE_FILTER_REGEX=".*\\..*"
fi
application-hadoop.yml 说明hadoop:
# task-applicaiton & task-parser 模块配置依赖
namenodes:
- nameservices: logs-hdfs # dfs.nameservices 属性值
namenodesAddr: [ "machine1.example.com", "machine2.example.com" ] # dfs.namenode.rpc-address.[nameservice ID].[name node ID] 属性值
namenodes: ["nn1", "nn2"] # dfs.ha.namenodes.[nameservice ID] 属性值
user: hdfs # 用户
password: # 密码,如果没开启鉴权,则不需要
port: 8020 # 端口
matchPathKeys: [ "flume" ] # task-application模块使用,调度平台日志hdfs路径关键字
# kerberos
enableKerberos: false
# /etc/krb5.conf
krb5Conf: ""
# hdfs/*@EXAMPLE.COM
principalPattern: ""
# admin
loginUser: ""
# /var/kerberos/krb5kdc/admin.keytab
keytabPath: ""
# task-metadata 模块配置依赖
yarn:
- clusterName: "bigdata"
resourceManager: [ "ip:port" ] # yarn.resourcemanager.webapp.address 属性值
jobHistoryServer: "ip:port" # mapreduce.jobhistory.webapp.address 属性值
spark:
sparkHistoryServer: [ "ip:port" ] # spark history ui 地址Compass 模块介绍
工程目录
compass
├── bin
│ ├── compass_env.sh 环境变量,基础组件配置
│ ├── start_all.sh 启动脚本
│ └── stop_all.sh 停止脚本
├── conf
│ └── application-hadoop.yml hadoop相关配置
├── task-application 关联任务实例、applicationId、hdfs_log_path
├── task-canal 订阅调度平台MySQL表元数据到Kafka
├── task-canal-adapter 同步调度平台MySQL表元数据Compass平台
├── task-detect 工作流层异常类型检测
├── task-metadata 同步Yarn、Spark任务元数据到OpenSearch
├── task-parser 日志解析和Spark任务异常检测
├── task-portal 异常任务的可视化服务
├── task-flink Flink任务资源及异常诊断
├── task-flink-core Flink任务诊断规则逻辑
├── task-portal 异常任务的可视化服务
├── task-gpt 聚合日志模板,并使用chatgpt给模板解决方案
└── task-syncer 调度平台任务关系表的抽象和映射历史数据同步
如果调度平台的数据库是MySQL,Compass数据库是PostgreSQL,可使用pgloader创建依赖表和同步历史全量数据
同步dolphinscheduler表:
LOAD DATABASE
FROM mysql://root:password@localhost:3306/dolphinscheduler
INTO postgresql://postgres@localhost:5432/compass
ALTER SCHEMA 'dolphinscheduler' RENAME TO 'public'
INCLUDING ONLY TABLE NAMES MATCHING 't_ds_process_definition','t_ds_process_instance','t_ds_process_task_relation','t_ds_project','t_ds_task_definition','t_ds_task_instance','t_ds_user';同步airflow表:
LOAD DATABASE
FROM mysql://root:password@localhost:3306/airflow
INTO postgresql://postgres@localhost:5432/compass_airflow
ALTER SCHEMA 'airflow' RENAME TO 'public'
INCLUDING ONLY TABLE NAMES MATCHING 'task_instance','dag_run','ab_user','dag','serialized_dag'
ALTER TABLE NAMES MATCHING 'task_instance' RENAME TO 'tb_task_instance'
ALTER TABLE NAMES MATCHING 'dag_run' RENAME TO 'tb_dag_run'
ALTER TABLE NAMES MATCHING 'ab_user' RENAME TO 'tb_ab_user'
ALTER TABLE NAMES MATCHING 'dag' RENAME TO 'tb_dag'
ALTER TABLE NAMES MATCHING 'serialized_dag' RENAME TO 'tb_serialized_dag';如果调度平台的数据库是MySQL,Compass数据库是MySQL,可使用task-canal-adapter接口进行同步历史全量数据
同步dolphinscheduler表:
curl "localhost:8181/etl/rdb/mysql1/t_ds_process_definition.yml" -X POST
curl "localhost:8181/etl/rdb/mysql1/t_ds_process_instance.yml" -X POST
curl "localhost:8181/etl/rdb/mysql1/t_ds_process_task_relation.yml" -X POST
curl "localhost:8181/etl/rdb/mysql1/t_ds_project.yml" -X POST
curl "localhost:8181/etl/rdb/mysql1/t_ds_task_definition.yml" -X POST
curl "localhost:8181/etl/rdb/mysql1/t_ds_task_instance.yml" -X POST
curl "localhost:8181/etl/rdb/mysql1/t_ds_user.yml" -X POST
同步airflow表:curl "localhost:8181/etl/rdb/mysql1/airflow_db_ab_user.yml" -X POST
curl "localhost:8181/etl/rdb/mysql1/airflow_db_dag_run.yml" -X POST
curl "localhost:8181/etl/rdb/mysql1/airflow_db_dag.yml" -X POST
curl "localhost:8181/etl/rdb/mysql1/airflow_db_task_instance.yml" -X POSTtask-canal如果您使用的是DolphinScheduler 或者Airflow或者自研的等调度平台, 元数据存储在MySQL,可使用canal.deployer 订阅MySQL binlog同步到Kafka,默认topic是mysqldata。
task-canal
├── bin
│ ├── compass_env.sh compass环境变量
│ ├── init_canal.sh 下载canal.deployer依赖包
│ ├── restart.sh
│ ├── startup.sh
│ └── stop.sh
├── canal.deployer-1.1.6.tar.gz compass不提供canal依赖包,可通过init_canal.sh下载,若无网络则自行下载到task-canal根目录
├── conf
│ ├── example
│ │ ├── instance.properties 源MySQL配置和库表配置
│ ├── canal_local.properties zk,kafka等配置
│ ├── canal.properties
│ ├── logback.xml
├── lib
└── plugin核心配置
conf/example/instance.properties
canal.instance.master.address=localhost:33066
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.filter.regex=.*\\..*
canal.mq.topic=mysqldata
# 动态topic和分区默认不配置,若数据量比较大,可按表Hash到相同topic不同分区,避免单分区压力过大
canal.mq.dynamicTopic = mysqldata:db\\..*
canal.mq.partitionsNum = 12
canal.mq.partitionHash = .*\\..*conf/canal.properties
canal.zkServers = localhost:2181
canal.serverMode = kafka
kafka.bootstrap.servers = localhost:9092task-canal-adapter
canal.adapter模块作用: 同步依赖调度平台的元数据表到compass,只同步任务相关和用户表,其他表按需同步
例如对于DolphinScheduler:t_ds_project.yml 定义同步了 ds_project表,若需要同步其他表可参考conf/rdb下配置
项目中已提供DolphinScheduler和Airflow平台同步模板,若使用其他平台可参考模板
task-canal-adapter
├── bin
│ ├── compass_env.sh
│ ├── init_canal_adapter.sh 下载canal.adapter压缩包和解压相关lib和plugin
│ ├── restart.sh
│ ├── startup.sh
│ └── stop.sh
├── canal.adapter-1.1.6.tar.gz compass不提供canal依赖包,可通过init_canal.sh下载,若无网络则自行下载到task-canal-adapter根目录
├── conf
│ ├── application.yml
│ └── rdb
│ ├── airflow_db_ab_user.yml
│ ├── airflow_db_dag_run.yml
│ ├── airflow_db_dag.yml
│ ├── airflow_db_task_instance.yml
│ ├── t_ds_process_definition.yml
│ ├── t_ds_process_instance.yml
│ ├── t_ds_process_task_relation.yml
│ ├── t_ds_project.yml
│ ├── t_ds_task_definition.yml
│ ├── t_ds_task_instance.yml
│ ├── t_ds_user.yml
│ └── template.yml
├── lib
└── plugin表数据全量同步接口
示例:curl "localhost:8181/etl/rdb/mysql1/template.yml" -X POST
其中template.yml即为conf/rdb下的配置文件
核心配置
conf/application.yml
canal.conf:
srcDataSources:
defaultDS:
# Scheduling platform MySQL synchronization account
url: ${CANAL_ADAPTER_SOURCE_DATASOURCE_URL}
username: ${CANAL_ADAPTER_SOURCE_DATASOURCE_USERNAME}
password: ${CANAL_ADAPTER_SOURCE_DATASOURCE_PASSWORD}
canalAdapters:
- instance: mysqldata # kafka topic
groups:
- groupId: g1
outerAdapters:
- name: rdb
key: mysql1
properties:
# Compass platform datasource account
jdbc.url: ${CANAL_ADAPTER_DESTINATION_DATASOURCE_URL}
jdbc.username: ${CANAL_ADAPTER_DESTINATION_DATASOURCE_USERNAME}
jdbc.password: ${CANAL_ADAPTER_DESTINATION_DATASOURCE_PASSWORD}conf/rdb/template.ymldataSourceKey: defaultDS
destination: mysqldata
groupId: g1
outerAdapterKey: mysql1
concurrent: false
dbMapping:
database: ${SCHEDULER_MYSQL_DB}
# 调度平台MySQL表
table: example
# compass平台MySQL表
targetTable: example
# 主键配置
targetPk:
id: id
mapAll: true
commitBatch: 1task-syncertask-syncer模块是关联调度平台和compass的抽象层,使得compass能够兼容和诊断不同的调度平台任务,该模块抽象定义了compass核心依赖的关系表:
user:登录和权限校验,隔离不同用户权限
project:项目关系
flow:工作流定义关系
task:具体任务定义关系
task_instance:任务运行实例
其中关系是user -> project -> flow -> task -> task_instance,可根据实际调度平台自行定义关系
task-syncer
├── bin
│ ├── compass_env.sh
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── application-airflow.yml
│ ├── application-dolphinscheduler.yml
│ ├── application.yml
│ └── logback.xml
├── lib核心配置
conf/application-xxx.yml定义了数据同步表字段之间的映射关系,实现源表和目标表的转化,
columnMapping 实现了字段之间的映射
columnValueMapping 实现了字段值的映射
constantColumn 实现了常量列的映射
columnDep 实现了列字段值依赖查询,可自定义SQL实现表字段之间的关联
下面以同步DolphinScheduler调度平台示例说明
user表映射:
# DolphinScheduler库名
- schema: "dolphinscheduler"
# DolphinScheduler user表
table: "t_ds_user"
# compass user表
targetTable: "user"
# columnMapping用于字段映射,key是compass定义的字段,值是DolphinScheduler定义的字段
columnMapping:
user_id: "id"
username: "user_name"
password: "user_password"
is_admin: "user_type"
email: "email"
phone: "phone"
create_time: "create_time"
update_time: "update_time"
# 字段值映射, 目标字段值, 源字段值, 字段类型
columnValueMapping:
is_admin: [ { targetValue: "0", originValue: [ "0" ] }, { targetValue: "1", originValue: [ "1" ] } ]
# 常量列定义
constantColumn:
scheduler_type: "DolphinScheduler"task_instance表映射:
- schema: "dolphinscheduler"
table: "t_ds_task_instance"
targetTable: "task_instance"
columnMapping:
id: "id"
project_name: ""
flow_name: ""
task_name: "name"
start_time: "start_time"
end_time: "end_time"
execution_time: ""
task_state: "state"
task_type: "task_type"
retry_times: "retry_times"
max_retry_times: "max_retry_times"
worker_group: "worker_group"
create_time: "create_time"
update_time: "update_time"
columnValueMapping:
task_state:
- { targetValue: "success", originValue: [ "7", "14" ] }
- { targetValue: "fail", originValue: [ "6", "9" ] }
- { targetValue: "other", originValue: [ "0", "1", "2", "3", "4", "5", "8", "10", "11", "12", "13" ] }
columnDep:
# 列字段值依赖,由于该表缺失了project_name, flow_name, execution_time字段,因此需要关联其他表查询
columns: [ "project_name", "flow_name", "execution_time" ]
queries: [ "select t2.schedule_time as execution_time, t3.name as flow_name, t4.name as project_name from t_ds_task_instance as t1 inner join t_ds_process_instance as t2 on t1.process_instance_id = t2.id inner join t_ds_process_definition as t3 on t2.process_definition_code = t3.code inner join t_ds_project as t4 on t3.project_code=t4.code where t1.id=${id}" ]task-applicationtask-application模块关联task_name、applicationId、hdfs_log_path,该模块需要读取调度平台日志,推荐使用flume收集到hdfs,方便统一做日志诊断和分析。
task-application/
├── bin
│ ├── compass_env.sh
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── application-airflow.yml
│ ├── application-dolphinscheduler.yml
│ ├── application-hadoop.yml
│ ├── application.yml
│ └── logback.xml
├── lib核心配置
conf/application-hadoop.yml
hadoop:
namenodes:
- nameservices: logs-hdfs
namenodesAddr: [ "host1", "host2" ]
namenodes: ["namenode1", "namenode2"]
user: hdfs
password:
port: 8020
matchPathKeys: [ "flume" ]conf/application-dolphinscheduler/airflow/custom.yml
该配置涉及日志路径规则的拼接,即日志绝对路径的确定。以flume收集dolphinscheduler到hdfs为例,airflow等同理。表t_ds_task_instance记录了日志路径log_path,但这个是worker主机中的目录,上传到hdfs的目录有所变化。
例如:
scheduler worker log_path: /home/service/app/dolphinscheduler/logs/8590950992992_2/33552/33934.log
hdfs log_path: hdfs://log-hdfs:8020/flume/dolphinscheduler/2023-03-30/8590950992992_2/33552/xxx
因此需要根据上面的变化关系,通过逐级目录确定绝对路径,然后最终确定 task_name,application_id,hdfs_log_path 之间的关系存储到表task_application中。
custom:
# 从上到下串行执行解析到任务的applicationId
rules:
- logPathDep:
# 变量依赖查询
query: "select CASE WHEN end_time IS NOT NULL THEN DATE_ADD(end_time, INTERVAL 1 second) ELSE start_time END as end_time,log_path from t_ds_task_instance where id=${id}" # 查询, id 是 task-instance表的id
logPathJoins:
# end_time: 2023-02-18 01:43:11
# log_path: ../logs/6354680786144_1/3/4.log
- { "column": "", "data": "/flume/dolphinscheduler" } # 配置存储调度日志的hdfs根目录
- { "column": "end_time", "regex": "^.*(?<date>\\d{4}-\\d{2}-\\d{2}).+$", "name": "date" }
- { "column": "log_path", "regex": "^.*logs/(?<logpath>.*)$", "name": "logpath" }
extractLog: # 根据组装的日志路径解析日志
regex: "^.*Submitted application (?<applicationId>application_[0-9]+_[0-9]+).*$" # 匹配规则
name: "applicationId" # 匹配文本名,最后必须有applicationId注意:原生flume-taildir-source插件是不支持递归遍历子目录文件的,需要进行改造。如果您日志已经收集,可忽略。如果您还没有收集,可修改TaildirMatcher.getMatchingFilesNoCache()方法实现。如果你使用的是Airflow,生成的日志目录可能包含不符合hdfs目录规则,sink to hdfs时需要修改替换目录特殊字符为下划线‘_’。
task-metadata
task-metadata模块是用于同步Yarn、Spark任务applicationId列表,关联applicationId的driver、executor、eventlog日志存储路径
task-metadata
├── bin
│ ├── compass_env.sh
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── application.yml
│ └── logback.xml
├── lib核心配置
conf/application.yml
hadoop:
yarn:
- clusterName: "bigdata"
resourceManager: [ "ip:port" ]
jobHistoryServer: "ip:port"
spark:
sparkHistoryServer: [ "ip:port" ]task-detect
task-detect模块是针对工作流层异常检测,异常类型包括运行失败、基线时间异常、基线耗时异常、首次失败、长期失败、运行耗时长
task-detect
├── bin
│ ├── compass_env.sh
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── application.yml
├── lib核心配置
conf/application.yml
custom:
detectionRule:
# 运行耗时长配置,单位小时
durationWarning: 2
# 长期失败配置,单位天
alwaysFailedWarning: 10task-parser
task-parser模块是针对Spark任务和相关日志进行解析诊断,异常类型包括:SQL失败、Shuffle失败、内存溢出、内存浪费、CPU浪费、大表扫描、OOM预警、 数据倾斜、Job耗时异常、Stage耗时异常、Task长尾、HDFS卡顿、推迟执行Task过多、全局排序异常等
task-parser
├── bin
│ ├── compass_env.sh
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── applicationbk.yml
│ ├── application-hadoop.yml
│ ├── application.yml
│ ├── logback.xml
│ ├── rules.json
│ └── scripts
│ └── logRecordConsumer.lua
├── lib核心配置conf/rules.json 该配置是用于编写日志解析规则
logType: scheduler/driver/executor/yarn 日志类型,若有其他日志,可自行实现
action: 定义每个匹配规则名称
desc:action描述
category:定义规则类型,例如shuffleFailed/sqlFailed等
step: 匹配顺序,默认升序
parserType: 匹配类型,默认 DEFAULT(按行或者块匹配),JOIN(把结果合并成一行再匹配)
parserTemplate: 文本解析模板,由首行、中间行和结束行组成。
如果只是简单按行匹配,则只需要填写parserTemplate.heads即可;
如果需要按文本块匹配,例如异常栈,则需要填写parserTemplate.heads和parserTemplate.tails确定首行和结束行规则;
如果需要在文本块中匹配某一行,则需要填写parserTemplate.middles中间行规则。
groupNames:用户提取正则匹配分组名称的值
children: 用于嵌套规则,例如文本中有多个相同的异常栈(开始和结束标志一样),如果需要区分成不同的action,那么就可以嵌套规则实现
{
"logType": "scheduler",
"actions": [
{
"action": "otherError",
"desc": "其他错误信息",
"category": "otherException",
"step": 1,
"skip": false,
"parserType": "DEFAULT",
"parserTemplate": {
"heads": [
"^.+ERROR.+$"
],
"middles": [],
"tails": []
},
"groupNames": [],
"children": []
}
]
}conf/application.yml
custom.detector用于配置检测Spark Event Log,比如Spark环境变量、内存浪费、大表扫描等异常检测类型
custom:
detector:
sparkEnvironmentConfig:
jvmInformation:
- Java Version
sparkProperties:
- spark.driver.memoryOverhead
- spark.driver.memory
- spark.executor.memoryOverhead
- spark.executor.memory
- spark.executor.cores
- spark.dynamicAllocation.maxExecutors
- spark.default.parallelism
- spark.sql.shuffle.partitions
systemProperties:
- sun.java.command
...task-gpttask-gpt模块用于聚合日志模板,并使用chatgpt给模板解决方案
核心配置
conf/application.yml
chatgpt:
enable: true
apiKeys: "sk-xxx1,sk-xxx2"
proxy: "https://proxy"
model: "gpt-3.5-turbo"
prompt: "你是一位资深大数据专家,教导初始者,我会给你一些异常,你将提供异常的解决方案"task-portal 与 task-ui
task-portal 与 task-ui 可视化前后端模块,提供诊断建议、报告总览、一键诊断、任务运行、APP运行、白名单等服务
task-ui前端默认一起编译放在task-portal/portal目录下
如果您需要单独部署前端,需要修改 task-ui/.env.production 下 VITE_APP_PROD_BACKEND,指定您的后端地址或者域名即可
web ui默认路径: http://localhost:7075/compass/
swagger ui默认路径:http://localhost:7075/compass/swagger-ui/index.html
关于用户和密码问题:
如果您是使用DolphinScheduler或Airflow调度平台,即compass_env.sh中配置export SCHEDULER="dolphinscheduler / airflow"时,账号密码和调度平台相同(需要已经同步数据)
如果您是自研调度或者测试,请设置 compass_env.sh 中 export SCHEDULER="custom",执行 document/sql/compass.sql 之后,默认账密是compass,compass,该模式没进行账号密码校验,请注意数据安全
task-portal
├── bin
│ ├── compass_env.sh
│ ├── startup.sh
│ └── stop.sh
├── conf
├── lib
├── portal
│ ├── assets
│ └── index.html离线任务上报元数据诊断
支持第三方上报Spark/MapReduce任务application元数据进行诊断,如果不需要同步调度平台元数据和日志,只要启动task-portal和task-parser模块。
请求接口:http://[compass_host]/compass/openapi/offline/app/metadata
请求方式: POST
参数类型 :JSON
原文转载地址 https://mp.weixin.qq.com/s/L93siT-eAVBNcJE24ktSeg




