在本博客中,我们探讨了如何将 Databricks 强大的作业 API 与 Amazon Managed Apache Airflow (MWAA) 结合使用,并与 Cloudwatch 集成以监控具有基于 Databricks 的任务的有向无环图 (DAG)。此外,我们将展示如何根据 DAG 性能指标创建警报。
在我们进入本指南的操作部分之前,让我们快速了解什么是 Databricks 作业编排和 Amazon Managed Airflow (MWAA)?
Databricks 编排和警报
Databricks 中的作业编排是一项完全集成的功能。客户可以使用 Jobs API 或 UI 来创建和管理作业和功能,例如用于监控的电子邮件警报。借助这种强大的 API 驱动方法,Databricks 作业可以编排任何具有 API 的内容(例如,从 CRM 中提取数据)。Databricks 编排可以支持具有单任务或多任务选项的作业,以及具有 Delta Live Tables 的新增作业。
亚马逊托管 Airflow
Amazon Managed Workflows for Apache Airflow (MWAA) 是 Apache Airflow 的托管编排服务。MWAA 以 AWS 的安全性、可用性和可扩展性代表客户管理开源 Apache Airflow 平台。MWAA 通过预先存在的插件为客户提供与 AWS 服务和各种第三方服务轻松集成的额外好处,允许客户创建复杂的数据处理管道。
高级架构图
我们将创建一个简单的 DAG,它启动一个 Databricks 集群并执行一个笔记本。MWAA 监控执行情况。注意:我们有一个简单的作业定义,但 MWAA 可以编排各种复杂的工作负载。

设置环境
该博客假定您有权访问 Databricks 工作区。在此处免费注册并配置Databricks 集群。此外,创建一个 API 令牌以用于在 MWAA 中配置连接。

要创建 MWAA 环境,请遵循这些说明。
如何创建 Databricks 连接
第一步是在 MWAA 中配置 Databricks 连接。

示例 DAG
接下来将您的 DAG 上传到您在创建 MWAA 环境时指定的 S3 存储桶文件夹中。您的 DAG 将自动出现在 MWAA UI 上。

这是一个 Airflow DAG 示例,它为新的 Databricks 作业集群 Databricks 笔记本任务创建配置,并提交笔记本任务以在 Databricks 中执行。
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator, DatabricksRunNowOperator
from datetime import datetime, timedelta
#Define params for Submit Run Operator
new_cluster = {
'spark_version': '7.3.x-scala2.12',
'num_workers': 2,
'node_type_id': 'i3.xlarge',
"aws_attributes": {
"instance_profile_arn": "arn:aws:iam::XXXXXXX:instance-profile/databricks-data-role"
}
}
notebook_task = {
'notebook_path': '/Users/xxxxx@XXXXX.com/test',
}
#Define params for Run Now Operator
notebook_params = {
"Variable":5
}
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=2)
}
with DAG('databricks_dag',
start_date=datetime(2021, 1, 1),
schedule_interval='@daily',
catchup=False,
default_args=default_args
) as dag:
opr_submit_run = DatabricksSubmitRunOperator(
task_id='submit_run',
databricks_conn_id='databricks_default',
new_cluster=new_cluster,
notebook_task=notebook_task
)
opr_submit_run
从 GitHub链接获取最新版本的文件。
在 MWAA 中触发 DAG

触发后,您可以在 Databricks 集群 UI 页面上看到作业集群。

故障排除
Amazon MWAA 将 Amazon CloudWatch 用于所有 Airflow 日志。这些有助于对 DAG 故障进行故障排除。

CloudWatch 指标和警报
接下来,我们创建一个指标来监控 DAG 的成功完成。Amazon MWAA 支持许多指标。

我们使用 TaskInstanceFailures 创建警报。

对于阈值,我们选择零(即,我们希望在一小时内发生任何故障时得到通知)。

最后,我们选择电子邮件通知作为操作。

以下是 DAG 失败时生成的 Cloudwatch 电子邮件通知的示例。
您收到此电子邮件是因为您在美国东部(弗吉尼亚北部)区域的 Amazon CloudWatch 警报“DatabricksDAGFailure”已进入 ALARM 状态,因为“阈值已超过”。
结论
在这篇博客中,我们展示了如何创建一个 Airflow DAG,它创建、配置和提交一个新的 Databricks 作业集群、Databricks 笔记本任务以及在 Databricks 中执行的笔记本任务。我们利用 MWAA 与 CloudWatch 的开箱即用集成来监控我们的示例工作流并在出现故障时接收通知。
原文标题:Orchestrating Databricks Workloads on AWS With Managed Workflows for Apache Airflow
原文作者:Naseer Ahmed and Igor Alekseev
原文地址:https://www.databricks.com/blog/2022/01/27/orchestrating-databricks-workloads-on-aws-with-managed-workflows-for-apache-airflow.html




