暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

大数据诊断平台Compass

sg1234 2024-04-27
1476

简介

Compass是一个诊断大数据生态系统中计算引擎和调度器的平台,旨在提高故障排除的效率并降低问题调整的复杂性。它自动收集日志和指标,除了使用启发式规则来识别问题并提供调整建议,对于日志,还使用了ChatGPT还提供诊断建议, 日志将使用drain算法自动聚合为模板,可用于人工干预等,提升诊断自动化和优化方案能力。

其主要功能特性如下:

  • 非侵入式,即时诊断,无需修改已有的调度平台,即可体验诊断效果。

  • 支持多种主流调度平台,例如DolphinScheduler 2.x和3.x、Airflow或自研等。

  • 支持多版本Spark、MapReduce、Flink、Hadoop 2.x和3.x 任务日志诊断和解析。

  • 支持工作流层异常诊断,识别各种失败和基线耗时异常问题。

  • 支持引擎层异常诊断,包含数据倾斜、大表扫描、内存浪费等14种异常类型。

  • 支持各种日志匹配规则编写和异常阈值调整,可自行根据实际场景优化。

  • 支持一键诊断全量(包含非调度平台提交任务)Spark/MapReduce任务。

  • 支持ChatGPT对异常日志进行诊断,提供解决方案,使用了drain算法聚合模板,节约成本。

支持组件

社区

欢迎加入社区咨询使用或成为 Compass 开发者。以下是获得帮助的方法:

  • 提交 issue。

  • 提交 pull request, 请阅读 contributing guideline。

  • 讨论 Idea & Question。

我们将会尽快回复。

罗盘已支持诊断类型概览:

引擎诊断维度诊断类型类型说明
Spark失败分析运行失败最终运行失败的任务
首次失败重试次数大于1的成功任务
长期失败最近10天运行失败的任务
耗时分析基线时间异常相对于历史正常结束时间,提前结束或晚点结束的任务
基线耗时异常相对于历史正常运行时长,运行时间过长或过短的任务
运行耗时长运行时间超过2小时的任务
报错分析sql失败因sql执行问题而导致失败的任务
shuffle失败因shuffle执行问题而导致失败的任务
内存溢出因内存溢出问题而导致失败的任务
资源分析内存浪费内存使用峰值与总内存占比过低的任务
CPU浪费driver/executor计算时间与总CPU计算时间占比过低的任务
效率分析大表扫描没有限制分区导致扫描行数过多的任务
OOM预警广播表的累计内存与driver或executor任意一个内存占比过高的任务
数据倾斜stage中存在task处理的最大数据量远大于中位数的任务
Job耗时异常job空闲时间与job运行时间占比过高的任务
Stage耗时异常stage空闲时间与stage运行时间占比过高的任务
Task长尾stage中存在task最大运行耗时远大于中位数的任务
HDFS卡顿stage中存在task处理速率过慢的任务
推测执行Task过多stage中频繁出现task推测执行的任务
全局排序异常全局排序导致运行耗时过长的任务
MapReduce资源分析内存浪费内存使用峰值与总内存占比过低的任务
效率分析大表扫描扫描行数过多的任务
Task长尾map/reduce task最大运行耗时远大于中位数的任务
数据倾斜map/reduce task处理的最大数据量远大于中位数的任务
推测执行Task过多map/reduce task中频繁出现推测执行的任务
GC异常GC时间相对CPU时间占比过高的任务
Flink资源诊断内存利用率高计算内存的使用率,如果使用率高于阈值,则增加内存
内存利用率低计算内存的使用率,如果使用率低于阈值,则降低内存
JM内存优化根据tm个数计算jm内存的建议值
作业无流量检测作业的kafka source算子是否没有流量
TM管理内存优化计算作业管理内存的使用率,给出合适的管理内存建议值
部分TM空跑检测是否有tm没有流量,并且cpu和内存也没有使用
并行度不够检测作业是否因为并行度不够引起延迟
CPU利用率高计算作业的CPU均值使用率,如果高于阈值,则增加cpu
CPU利用率低计算作业的CPU均值使用率,如果低于阈值,则降低cpu
CPU峰值利用率高计算作业的CPU峰值使用率,如果高于阈值,则增加cpu
异常诊断存在慢算子检测作业是否存在慢算子
存在反压算子检测作业是否存在反压算子
作业延迟高检测作业的kafka延迟是否高于阈值

UI

Spark:       

图片

图片

图片

图片

图片

图片

图片

Flink:    

图片

图片

图片

图片

系统架构

系统架构图

图片

图片

架构说明

整体架构分3层:

  • 调度系统对接层:实现对接调度器、Yarn、Spark、Flink、HDFS等系统,同步任务及其日志元数据到诊断系统;

  • 服务层:包括数据采集、元数据关联&模型标准化、异常检测、资源诊断、Portal模块;

  • 基础组件层:包括MySQL、 OpenSearch、Kafka、Redis、Zookeeper等组件。

具体模块流程阶段:

  • 数据采集阶段:task-canal/adapter模块订阅同步调度系统的用户、DAG、作业、执行记录等工作流元数据同步至诊断平台;task-metadata模块定时同步Yarn ResourceManager、Spark HistoryServer App元数据至诊断系统,关联日志存储路径,为后续数据处理阶段作基础;

  • 数据关联与模型标准化阶段:task-syncer模块将同步的数据标准化为User、Project、Flow、Task、TaskInstance模型;task-application模块将工作流层与引擎层元数据关联;

  • 工作流层&引擎层异常检测阶段:至此已经获得数据标准模型,针对标准模型进一步Workflow异常检测流程。task-detect模块进行工作流层异常任务检测,例如运行失败、基线耗时异常等;task-parser模块进行引擎层异常任务检测,例如SQL失败、Shuffle失败等;task-flink模块进行flink作业资源及异常检测,例如cpu利用率低,内存利用率低等;

  • 业务视图:task-portal模块提供用户报告总览、一键诊断、工作流层任务诊断、引擎层作业Application诊断、诊断建议和详细报告、白名单等功能。

Compass(罗盘) 部署指南

Compass 依赖了Canal、PostgreSQL(或MySQL)、Kafka、Redis、Zookeeper、OpenSearch,需要提前准备好相关环境。

环境要求

DependencyVersionOptionalDescription
Canalv1.1.6+yesneeded by Airflow,DolphinScheduler
MySQL5.7+yes
PostgreSQL10.0+no
Kafkaallno
Redisallnodeployed in cluster mode
Zookeeper3.4.5noneeded by canal
OpenSearch1.3.12no

OpenSearch兼容Elasticsearch 7.0+。
Compass 支持单机和集群部署,可按模块弹性扩缩容。

编译

请使用JDK 8以及maven 3.6.0+进行编译,构建流程步骤如下:

git clone https://github.com/cubefs/compass.git
cd compass
mvn clean package -DskipTests -Pdist
或者
mvn clean package -DskipTests -Pdist,spark (打包时web展示只有spark诊断页面)
或者
mvn clean package -DskipTests -Pdist,flink (打包时web展示只有flink诊断页面)

使用docker compose启动应用服务

cp dist/compass-v1.1.2.tar.gz docker/playground
cd docker/playground/
docker compose --profile dependencies up -d
docker compose --profile compass-demo up -d
多请查看文档 docker-playground

初始化数据库

支持 PostgreSQL(默认) 或 MySQL(需要手动下载mysql-connector-java复制到各模块lib目录下,canal除外) 作为元数据存储。

表结构由两部分组成:document/sql/compass_.sql,document/sql/dolphinscheduler_.sql(需要根据实际使用版本修改,支持2.x和3.x)或document/sql/airflow_*.sql(支持2.x)。

如果您使用的是自研调度平台,请参考上述sql表结构。

修改配置和启动

compass/bin 和 compass/conf 是作为公共脚本和配置使用,方便统一启停和配置管理。

# 启动所有模块
./bin/start_all.sh
# 停止所有模块
./bin/stop_all.sh

compass_env.sh 配置说明

Kafka需要预先创建好topic: mysqldata,task-instance,task-application,exception-log

#!/bin/bash

# dolphinscheduler or airflow or custom
export SCHEDULER="dolphinscheduler"
export SPRING_PROFILES_ACTIVE="hadoop,${SCHEDULER}"

# Configuration for Scheduler MySQL, compass will subscribe data from scheduler database via canal
export SCHEDULER_MYSQL_ADDRESS="localhost:3306"
export SCHEDULER_MYSQL_DB="dolphinscheduler"
export SCHEDULER_DATASOURCE_URL="jdbc:mysql://${SCHEDULER_MYSQL_ADDRESS}/${SCHEDULER_MYSQL_DB}?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"
export SCHEDULER_DATASOURCE_USERNAME=""
export SCHEDULER_DATASOURCE_PASSWORD=""

# Configuration for compass database(mysql or postgresql)
export DATASOURCE_TYPE="mysql"
export COMPASS_DATASOURCE_ADDRESS="localhost:3306"
export COMPASS_DATASOURCE_DB="compass"
export SPRING_DATASOURCE_URL="jdbc:${DATASOURCE_TYPE}://${COMPASS_DATASOURCE_ADDRESS}/${COMPASS_DATASOURCE_DB}"
export SPRING_DATASOURCE_USERNAME=""
export SPRING_DATASOURCE_PASSWORD=""

# Configuration for compass Kafka, used to subscribe data by canal and log queue, etc. (default version: 3.4.0)
export SPRING_KAFKA_BOOTSTRAPSERVERS="host1:port,host2:port"

# Configuration for compass redis, used to cache and log queue, etc . (cluster mode)
export SPRING_REDIS_CLUSTER_NODES="localhost:6379"
# Optional
export SPRING_REDIS_PASSWORD=""

# Zookeeper (cluster: 3.4.5, needed by canal)
export SPRING_ZOOKEEPER_NODES="localhost:2181"

# OpenSearch (default version: 1.3.12) or Elasticsearch (7.x~)
export SPRING_OPENSEARCH_NODES="localhost:9200"
# Optional
export SPRING_OPENSEARCH_USERNAME=""
# Optional
export SPRING_OPENSEARCH_PASSWORD=""
# Optional, needed by OpenSearch, keep empty if OpenSearch does not use truststore.
export SPRING_OPENSEARCH_TRUSTSTORE=""
# Optional, needed by OpenSearch, keep empty if OpenSearch does not use truststore.
export SPRING_OPENSEARCH_TRUSTSTOREPASSWORD=""

# Prometheus for flink, ignore it if you do not need flink.
export FLINK_PROMETHEUS_HOST="http://localhost:9090"
export FLINK_PROMETHEUS_TOKEN=""
export FLINK_PROMETHEUS_DATABASE=""

# Optional, needed by task-gpt module to get exception solution, ignore if you do not need it.
export CHATGPT_ENABLE=false
# Openai keys needed by enabling chatgpt, random access the key if there are multiple keys.
export CHATGPT_API_KEYS=sk-xxx1,sk-xxx2
# Optional, needed if setting proxy, or keep it empty.
export CHATGPT_PROXY="" # for example, https://proxy.ai
# chatgpt model
export CHATGPT_MODEL="gpt-3.5-turbo"
# chatgpt prompt
export CHATGPT_PROMPT="You are a senior expert in big data, teaching beginners. I will give you some anomalies and you will provide solutions to them."

# task-canal模块配置

# 调度平台MySQL订阅账号,确定是否已开启binlog
export CANAL_INSTANCE_MASTER_ADDRESS=${SCHEDULER_MYSQL_ADDRESS}
export CANAL_INSTANCE_DBUSERNAME=${SCHEDULER_DATASOURCE_USERNAME}
export CANAL_INSTANCE_DBPASSWORD=${SCHEDULER_DATASOURCE_PASSWORD}
# 需要订阅的库表配置过滤
if [ ${SCHEDULER} == "dolphinscheduler" ]; then
  export CANAL_INSTANCE_FILTER_REGEX="${SCHEDULER_MYSQL_DB}.t_ds_user,${SCHEDULER_MYSQL_DB}.t_ds_project,${SCHEDULER_MYSQL_DB}.t_ds_task_definition,${SCHEDULER_MYSQL_DB}.t_ds_task_instance,${SCHEDULER_MYSQL_DB}.t_ds_process_definition,${SCHEDULER_MYSQL_DB}.t_ds_process_instance,${SCHEDULER_MYSQL_DB}.t_ds_process_task_relation"
elif [ ${SCHEDULER} == "airflow" ]; then
  export CANAL_INSTANCE_FILTER_REGEX="${SCHEDULER_MYSQL_DB}.dag,${SCHEDULER_MYSQL_DB}.serialized_dag,${SCHEDULER_MYSQL_DB}.ab_user,${SCHEDULER_MYSQL_DB}.dag_run,${SCHEDULER_MYSQL_DB}.task_instance"
else
  export CANAL_INSTANCE_FILTER_REGEX=".*\\..*"
fi

application-hadoop.yml 说明

hadoop:
  # task-applicaiton & task-parser 模块配置依赖
  namenodes:
    - nameservices: logs-hdfs               # dfs.nameservices 属性值
      namenodesAddr: [ "machine1.example.com", "machine2.example.com" ]   # dfs.namenode.rpc-address.[nameservice ID].[name node ID] 属性值
      namenodes: ["nn1", "nn2"] # dfs.ha.namenodes.[nameservice ID] 属性值
      user: hdfs                            # 用户
      password:                             # 密码,如果没开启鉴权,则不需要
      port: 8020                            # 端口
      matchPathKeys: [ "flume" ]            # task-application模块使用,调度平台日志hdfs路径关键字
      # kerberos
      enableKerberos: false
      # /etc/krb5.conf
      krb5Conf: ""
      # hdfs/*@EXAMPLE.COM
      principalPattern:  ""
      # admin
      loginUser: ""
      # /var/kerberos/krb5kdc/admin.keytab
      keytabPath: ""
  
  # task-metadata 模块配置依赖
  yarn:
    - clusterName: "bigdata"
      resourceManager: [ "ip:port" ] # yarn.resourcemanager.webapp.address 属性值
      jobHistoryServer: "ip:port" # mapreduce.jobhistory.webapp.address 属性值
  spark:
    sparkHistoryServer: [ "ip:port" ] # spark history ui 地址

Compass 模块介绍

工程目录

compass
├── bin
│   ├── compass_env.sh                  环境变量,基础组件配置
│   ├── start_all.sh                    启动脚本
│   └── stop_all.sh                     停止脚本
├── conf
│   └── application-hadoop.yml          hadoop相关配置
├── task-application                    关联任务实例、applicationId、hdfs_log_path
├── task-canal                          订阅调度平台MySQL表元数据到Kafka
├── task-canal-adapter                  同步调度平台MySQL表元数据Compass平台
├── task-detect                         工作流层异常类型检测
├── task-metadata                       同步Yarn、Spark任务元数据到OpenSearch
├── task-parser                         日志解析和Spark任务异常检测
├── task-portal                         异常任务的可视化服务
├── task-flink                          Flink任务资源及异常诊断
├── task-flink-core                     Flink任务诊断规则逻辑
├── task-portal                         异常任务的可视化服务
├── task-gpt                            聚合日志模板,并使用chatgpt给模板解决方案
└── task-syncer                         调度平台任务关系表的抽象和映射

历史数据同步

如果调度平台的数据库是MySQL,Compass数据库是PostgreSQL,可使用pgloader创建依赖表和同步历史全量数据

同步dolphinscheduler表:

LOAD DATABASE
     FROM mysql://root:password@localhost:3306/dolphinscheduler
     INTO postgresql://postgres@localhost:5432/compass
     ALTER SCHEMA 'dolphinscheduler' RENAME TO 'public'
     INCLUDING ONLY TABLE NAMES MATCHING 't_ds_process_definition','t_ds_process_instance','t_ds_process_task_relation','t_ds_project','t_ds_task_definition','t_ds_task_instance','t_ds_user';

同步airflow表:

LOAD DATABASE
     FROM mysql://root:password@localhost:3306/airflow
     INTO postgresql://postgres@localhost:5432/compass_airflow
     ALTER SCHEMA 'airflow' RENAME TO 'public'
     INCLUDING ONLY TABLE NAMES MATCHING 'task_instance','dag_run','ab_user','dag','serialized_dag'
     ALTER TABLE NAMES MATCHING 'task_instance' RENAME TO 'tb_task_instance'
     ALTER TABLE NAMES MATCHING 'dag_run'  RENAME TO 'tb_dag_run'
     ALTER TABLE NAMES MATCHING 'ab_user'  RENAME TO 'tb_ab_user'
     ALTER TABLE NAMES MATCHING 'dag'  RENAME TO 'tb_dag'
     ALTER TABLE NAMES MATCHING 'serialized_dag'  RENAME TO 'tb_serialized_dag';

如果调度平台的数据库是MySQL,Compass数据库是MySQL,可使用task-canal-adapter接口进行同步历史全量数据

同步dolphinscheduler表:

curl "localhost:8181/etl/rdb/mysql1/t_ds_process_definition.yml" -X POST

curl "localhost:8181/etl/rdb/mysql1/t_ds_process_instance.yml" -X POST

curl "localhost:8181/etl/rdb/mysql1/t_ds_process_task_relation.yml" -X POST

curl "localhost:8181/etl/rdb/mysql1/t_ds_project.yml" -X POST

curl "localhost:8181/etl/rdb/mysql1/t_ds_task_definition.yml" -X POST

curl "localhost:8181/etl/rdb/mysql1/t_ds_task_instance.yml" -X POST

curl "localhost:8181/etl/rdb/mysql1/t_ds_user.yml" -X POST


同步airflow表:
curl "localhost:8181/etl/rdb/mysql1/airflow_db_ab_user.yml" -X POST

curl "localhost:8181/etl/rdb/mysql1/airflow_db_dag_run.yml" -X POST

curl "localhost:8181/etl/rdb/mysql1/airflow_db_dag.yml" -X POST

curl "localhost:8181/etl/rdb/mysql1/airflow_db_task_instance.yml" -X POST
task-canal

如果您使用的是DolphinScheduler 或者Airflow或者自研的等调度平台, 元数据存储在MySQL,可使用canal.deployer 订阅MySQL binlog同步到Kafka,默认topic是mysqldata。

task-canal
├── bin
│   ├── compass_env.sh           compass环境变量
│   ├── init_canal.sh            下载canal.deployer依赖包
│   ├── restart.sh         
│   ├── startup.sh
│   └── stop.sh
├── canal.deployer-1.1.6.tar.gz   compass不提供canal依赖包,可通过init_canal.sh下载,若无网络则自行下载到task-canal根目录
├── conf
│   ├── example
│   │   ├── instance.properties   源MySQL配置和库表配置
│   ├── canal_local.properties    zk,kafka等配置
│   ├── canal.properties
│   ├── logback.xml
├── lib
└── plugin

核心配置

conf/example/instance.properties

canal.instance.master.address=localhost:33066
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.filter.regex=.*\\..*
canal.mq.topic=mysqldata

# 动态topic和分区默认不配置,若数据量比较大,可按表Hash到相同topic不同分区,避免单分区压力过大
canal.mq.dynamicTopic = mysqldata:db\\..*
canal.mq.partitionsNum =  12
canal.mq.partitionHash = .*\\..*

conf/canal.properties

canal.zkServers = localhost:2181
canal.serverMode = kafka
kafka.bootstrap.servers = localhost:9092

task-canal-adapter

canal.adapter模块作用: 同步依赖调度平台的元数据表到compass,只同步任务相关和用户表,其他表按需同步

例如对于DolphinScheduler:t_ds_project.yml 定义同步了 ds_project表,若需要同步其他表可参考conf/rdb下配置

项目中已提供DolphinScheduler和Airflow平台同步模板,若使用其他平台可参考模板

task-canal-adapter
├── bin
│   ├── compass_env.sh
│   ├── init_canal_adapter.sh       下载canal.adapter压缩包和解压相关lib和plugin
│   ├── restart.sh
│   ├── startup.sh
│   └── stop.sh
├── canal.adapter-1.1.6.tar.gz      compass不提供canal依赖包,可通过init_canal.sh下载,若无网络则自行下载到task-canal-adapter根目录
├── conf
│   ├── application.yml
│   └── rdb
│       ├── airflow_db_ab_user.yml
│       ├── airflow_db_dag_run.yml
│       ├── airflow_db_dag.yml
│       ├── airflow_db_task_instance.yml
│       ├── t_ds_process_definition.yml
│       ├── t_ds_process_instance.yml
│       ├── t_ds_process_task_relation.yml
│       ├── t_ds_project.yml
│       ├── t_ds_task_definition.yml
│       ├── t_ds_task_instance.yml
│       ├── t_ds_user.yml
│       └── template.yml
├── lib
└── plugin

表数据全量同步接口

示例:curl "localhost:8181/etl/rdb/mysql1/template.yml" -X POST

其中template.yml即为conf/rdb下的配置文件

核心配置

conf/application.yml

canal.conf:
  srcDataSources:
    defaultDS:
      # Scheduling platform MySQL synchronization account
      url: ${CANAL_ADAPTER_SOURCE_DATASOURCE_URL}
      username: ${CANAL_ADAPTER_SOURCE_DATASOURCE_USERNAME}
      password: ${CANAL_ADAPTER_SOURCE_DATASOURCE_PASSWORD}

  canalAdapters:
  - instance: mysqldata # kafka topic
    groups:
    - groupId: g1
      outerAdapters:
      - name: rdb
        key: mysql1
        properties:
          # Compass platform datasource account
          jdbc.url: ${CANAL_ADAPTER_DESTINATION_DATASOURCE_URL}
          jdbc.username: ${CANAL_ADAPTER_DESTINATION_DATASOURCE_USERNAME}
          jdbc.password: ${CANAL_ADAPTER_DESTINATION_DATASOURCE_PASSWORD}
conf/rdb/template.yml
dataSourceKey: defaultDS
destination: mysqldata
groupId: g1
outerAdapterKey: mysql1
concurrent: false
dbMapping:
  database: ${SCHEDULER_MYSQL_DB}
  # 调度平台MySQL表
  table: example
  # compass平台MySQL表
  targetTable: example
  # 主键配置
  targetPk:
    id: id
  mapAll: true
  commitBatch: 1
task-syncer

task-syncer模块是关联调度平台和compass的抽象层,使得compass能够兼容和诊断不同的调度平台任务,该模块抽象定义了compass核心依赖的关系表:

user:登录和权限校验,隔离不同用户权限

project:项目关系

flow:工作流定义关系

task:具体任务定义关系

task_instance:任务运行实例

其中关系是user -> project -> flow -> task -> task_instance,可根据实际调度平台自行定义关系

task-syncer
├── bin
│   ├── compass_env.sh
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── application-airflow.yml
│   ├── application-dolphinscheduler.yml
│   ├── application.yml
│   └── logback.xml
├── lib

核心配置

conf/application-xxx.yml定义了数据同步表字段之间的映射关系,实现源表和目标表的转化,

columnMapping 实现了字段之间的映射

columnValueMapping 实现了字段值的映射

constantColumn 实现了常量列的映射

columnDep 实现了列字段值依赖查询,可自定义SQL实现表字段之间的关联

下面以同步DolphinScheduler调度平台示例说明

user表映射:

# DolphinScheduler库名
- schema: "dolphinscheduler" 
  # DolphinScheduler user表
  table: "t_ds_user"
  # compass user表        
  targetTable: "user"
  # columnMapping用于字段映射,key是compass定义的字段,值是DolphinScheduler定义的字段   
  columnMapping: 
    user_id: "id"           
    username: "user_name"
    password: "user_password"
    is_admin: "user_type"
    email: "email"
    phone: "phone"
    create_time: "create_time"
    update_time: "update_time"
  # 字段值映射, 目标字段值, 源字段值, 字段类型
  columnValueMapping:
    is_admin: [ { targetValue: "0", originValue: [ "0" ] }, { targetValue: "1", originValue: [ "1" ] } ]
  # 常量列定义
  constantColumn:
    scheduler_type: "DolphinScheduler"

task_instance表映射:

- schema: "dolphinscheduler"
  table: "t_ds_task_instance"
  targetTable: "task_instance"
  columnMapping:
    id: "id"
    project_name: ""
    flow_name: ""
    task_name: "name"
    start_time: "start_time"
    end_time: "end_time"
    execution_time: ""
    task_state: "state"
    task_type: "task_type"
    retry_times: "retry_times"
    max_retry_times: "max_retry_times"
    worker_group: "worker_group"
    create_time: "create_time"
    update_time: "update_time"
  columnValueMapping:
    task_state:
      - { targetValue: "success", originValue: [ "7", "14" ] }
      - { targetValue: "fail", originValue: [ "6", "9" ] } 
      - { targetValue: "other", originValue: [ "0", "1", "2", "3", "4", "5", "8", "10", "11", "12", "13" ] }
  columnDep:
    # 列字段值依赖,由于该表缺失了project_name, flow_name, execution_time字段,因此需要关联其他表查询
    columns: [ "project_name", "flow_name", "execution_time" ]
    queries: [ "select t2.schedule_time as execution_time, t3.name as flow_name, t4.name as project_name from t_ds_task_instance as t1 inner join t_ds_process_instance as t2 on t1.process_instance_id = t2.id inner join t_ds_process_definition as t3 on t2.process_definition_code = t3.code inner join t_ds_project as t4 on t3.project_code=t4.code where t1.id=${id}" ]
task-application

task-application模块关联task_name、applicationId、hdfs_log_path,该模块需要读取调度平台日志,推荐使用flume收集到hdfs,方便统一做日志诊断和分析。

task-application/
├── bin
│   ├── compass_env.sh
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── application-airflow.yml
│   ├── application-dolphinscheduler.yml
│   ├── application-hadoop.yml
│   ├── application.yml
│   └── logback.xml
├── lib

核心配置

conf/application-hadoop.yml

hadoop:
  namenodes:
    - nameservices: logs-hdfs
      namenodesAddr: [ "host1", "host2" ]
      namenodes: ["namenode1", "namenode2"]
      user: hdfs
      password:
      port: 8020
      matchPathKeys: [ "flume" ]

conf/application-dolphinscheduler/airflow/custom.yml

该配置涉及日志路径规则的拼接,即日志绝对路径的确定。以flume收集dolphinscheduler到hdfs为例,airflow等同理。表t_ds_task_instance记录了日志路径log_path,但这个是worker主机中的目录,上传到hdfs的目录有所变化。

例如:

scheduler worker log_path: /home/service/app/dolphinscheduler/logs/8590950992992_2/33552/33934.log

hdfs log_path: hdfs://log-hdfs:8020/flume/dolphinscheduler/2023-03-30/8590950992992_2/33552/xxx

因此需要根据上面的变化关系,通过逐级目录确定绝对路径,然后最终确定 task_name,application_id,hdfs_log_path 之间的关系存储到表task_application中。

custom:
  # 从上到下串行执行解析到任务的applicationId
  rules:
    - logPathDep:
        # 变量依赖查询
        query: "select CASE WHEN end_time IS NOT NULL THEN DATE_ADD(end_time, INTERVAL 1 second) ELSE start_time END as end_time,log_path from t_ds_task_instance where id=${id}"     # 查询, id 是 task-instance表的id
      logPathJoins: 
        # end_time: 2023-02-18 01:43:11
        # log_path: ../logs/6354680786144_1/3/4.log
        - { "column": "", "data": "/flume/dolphinscheduler" } # 配置存储调度日志的hdfs根目录
        - { "column": "end_time", "regex": "^.*(?<date>\\d{4}-\\d{2}-\\d{2}).+$", "name": "date" }
        - { "column": "log_path", "regex": "^.*logs/(?<logpath>.*)$", "name": "logpath" }
      extractLog: # 根据组装的日志路径解析日志
        regex: "^.*Submitted application (?<applicationId>application_[0-9]+_[0-9]+).*$"     # 匹配规则
        name: "applicationId"      # 匹配文本名,最后必须有applicationId

注意:原生flume-taildir-source插件是不支持递归遍历子目录文件的,需要进行改造。如果您日志已经收集,可忽略。如果您还没有收集,可修改TaildirMatcher.getMatchingFilesNoCache()方法实现。如果你使用的是Airflow,生成的日志目录可能包含不符合hdfs目录规则,sink to hdfs时需要修改替换目录特殊字符为下划线‘_’。

task-metadata

task-metadata模块是用于同步Yarn、Spark任务applicationId列表,关联applicationId的driver、executor、eventlog日志存储路径

task-metadata
├── bin
│   ├── compass_env.sh
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── application.yml
│   └── logback.xml
├── lib

核心配置

conf/application.yml

hadoop:
  yarn:
    - clusterName: "bigdata"
      resourceManager: [ "ip:port" ]
      jobHistoryServer: "ip:port"

  spark:
    sparkHistoryServer: [ "ip:port" ]

task-detect

task-detect模块是针对工作流层异常检测,异常类型包括运行失败、基线时间异常、基线耗时异常、首次失败、长期失败、运行耗时长

task-detect
├── bin
│   ├── compass_env.sh
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── application.yml
├── lib

核心配置

conf/application.yml

custom:
  detectionRule:
    # 运行耗时长配置,单位小时
    durationWarning: 2
    # 长期失败配置,单位天
    alwaysFailedWarning: 10 

task-parser

task-parser模块是针对Spark任务和相关日志进行解析诊断,异常类型包括:SQL失败、Shuffle失败、内存溢出、内存浪费、CPU浪费、大表扫描、OOM预警、 数据倾斜、Job耗时异常、Stage耗时异常、Task长尾、HDFS卡顿、推迟执行Task过多、全局排序异常等

task-parser
├── bin
│   ├── compass_env.sh
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── applicationbk.yml
│   ├── application-hadoop.yml
│   ├── application.yml
│   ├── logback.xml
│   ├── rules.json
│   └── scripts
│       └── logRecordConsumer.lua
├── lib
核心配置

conf/rules.json 该配置是用于编写日志解析规则

logType: scheduler/driver/executor/yarn 日志类型,若有其他日志,可自行实现

action: 定义每个匹配规则名称

desc:action描述

category:定义规则类型,例如shuffleFailed/sqlFailed等

step: 匹配顺序,默认升序

parserType: 匹配类型,默认 DEFAULT(按行或者块匹配),JOIN(把结果合并成一行再匹配)

parserTemplate: 文本解析模板,由首行、中间行和结束行组成。

如果只是简单按行匹配,则只需要填写parserTemplate.heads即可;

如果需要按文本块匹配,例如异常栈,则需要填写parserTemplate.heads和parserTemplate.tails确定首行和结束行规则;

如果需要在文本块中匹配某一行,则需要填写parserTemplate.middles中间行规则。

groupNames:用户提取正则匹配分组名称的值

children: 用于嵌套规则,例如文本中有多个相同的异常栈(开始和结束标志一样),如果需要区分成不同的action,那么就可以嵌套规则实现

  { 
    "logType": "scheduler",
    "actions": [
      {
        "action": "otherError",
        "desc": "其他错误信息",
        "category": "otherException",
        "step": 1,
        "skip": false,
        "parserType": "DEFAULT",
        "parserTemplate": {
          "heads": [
            "^.+ERROR.+$"
          ],
          "middles": [],
          "tails": []
        },
        "groupNames": [],
        "children": []
      }
    ]
  }

conf/application.yml

custom.detector用于配置检测Spark Event Log,比如Spark环境变量、内存浪费、大表扫描等异常检测类型

custom:
  detector:
    sparkEnvironmentConfig:
      jvmInformation:
        - Java Version
      sparkProperties:
        - spark.driver.memoryOverhead
        - spark.driver.memory
        - spark.executor.memoryOverhead
        - spark.executor.memory
        - spark.executor.cores
        - spark.dynamicAllocation.maxExecutors
        - spark.default.parallelism
        - spark.sql.shuffle.partitions
      systemProperties:
        - sun.java.command
     ...
task-gpt

task-gpt模块用于聚合日志模板,并使用chatgpt给模板解决方案

核心配置

conf/application.yml

chatgpt:
  enable: true
  apiKeys: "sk-xxx1,sk-xxx2"
  proxy: "https://proxy"
  model: "gpt-3.5-turbo"
  prompt: "你是一位资深大数据专家,教导初始者,我会给你一些异常,你将提供异常的解决方案"

task-portal 与 task-ui

task-portal 与 task-ui 可视化前后端模块,提供诊断建议、报告总览、一键诊断、任务运行、APP运行、白名单等服务

task-ui前端默认一起编译放在task-portal/portal目录下

如果您需要单独部署前端,需要修改 task-ui/.env.production 下 VITE_APP_PROD_BACKEND,指定您的后端地址或者域名即可

web ui默认路径: http://localhost:7075/compass/

swagger ui默认路径:http://localhost:7075/compass/swagger-ui/index.html

关于用户和密码问题:

如果您是使用DolphinScheduler或Airflow调度平台,即compass_env.sh中配置export SCHEDULER="dolphinscheduler / airflow"时,账号密码和调度平台相同(需要已经同步数据)

如果您是自研调度或者测试,请设置 compass_env.sh 中 export SCHEDULER="custom",执行 document/sql/compass.sql 之后,默认账密是compass,compass,该模式没进行账号密码校验,请注意数据安全

task-portal
├── bin
│   ├── compass_env.sh
│   ├── startup.sh
│   └── stop.sh
├── conf
├── lib
├── portal
│   ├── assets
│   └── index.html

离线任务上报元数据诊断

支持第三方上报Spark/MapReduce任务application元数据进行诊断,如果不需要同步调度平台元数据和日志,只要启动task-portal和task-parser模块。

请求接口:http://[compass_host]/compass/openapi/offline/app/metadata

请求方式: POST

参数类型 :JSON





原文转载地址 https://mp.weixin.qq.com/s/L93siT-eAVBNcJE24ktSeg

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论