暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

使用 Databricks 和 Apache Airflow 更轻松地构建数据和 ML 管道

原创 谭磊Terry 恩墨学院 2022-09-23
811

我们很高兴地宣布 Apache Airflow 对 Databricks 支持的一系列增强。这些新功能使在流行的开源编排器中构建强大的数据和机器学习 (ML) 管道变得容易。借助最新的增强功能(例如新的 DatabricksSqlOperator),客户现在可以使用 Airflow 在 Databricks 上使用标准 SQL 查询和摄取数据,在笔记本上运行分析和 ML 任务,触发 Delta Live Tables 以转换 Lakehouse 中的数据等等。

Apache Airflow 是一个流行的可扩展平台,可使用 Python 以编程方式创作、调度和监控数据和机器学习管道(在 Airflow 中称为 DAG)。Airflow 包含大量内置操作符,可以轻松与从数据库到云存储的所有内容进行交互。Databricks 自 2017 年以来一直支持Airflow,使 Airflow 用户能够在 Databricks 的 Lakehouse 平台上触发结合笔记本、JAR 和 Python 脚本的工作流,该平台可扩展到地球上最具挑战性的数据和 ML 工作流。

让我们通过一个真实的任务来了解新功能:构建一个简单的数据管道,将 API 中的新到达天气数据加载到 Delta 表中,而无需使用 Databricks 笔记本来执行该工作。出于这篇博文的目的,我们将在 Azure 上完成所有工作,但在 AWS 和 GCP 上的过程几乎相同。此外,我们将在SQL 端点上执行所有步骤,但如果您更喜欢使用通用 Databricks 集群,则该过程非常相似。最后一个示例 DAG 在 Airflow UI 中将如下所示:

image.png

为简洁起见,我们将从这篇博文中省略一些代码。

安装和配置 Airflow

这篇博文假设您安装了 Airflow 2.1.0 或更高版本并配置了Databricks 连接。为 Apache Airflow 安装最新版本的 Databricks 提供程序:

pip install apache-airflow-providers-databricks

创建一个表来存储天气数据

我们将 Airflow DAG 定义为每天运行。第一个任务create_table运行一条 SQL 语句,如果该表不存在,则在默认模式中创建一个名为airflow_weather的表。此任务演示了DatabricksSqlOperator,它可以在 Databricks 计算上运行任意 SQL 语句,包括 SQL 端点。

with DAG(
        "load_weather_into_dbsql",
        start_date=days_ago(0),
        schedule_interval="@daily",
        default_args=default_args,
        catchup=False,
) as dag:
  table = "default.airflow_weather"
  schema = "date date, condition STRING, humidity double, precipitation double, " \
           "region STRING, temperature long, wind long, " \
           "next_days ARRAY<struct>" 

  create_table = DatabricksSqlOperator(
    task_id="create_table",
    sql=[f"create table if not exists {table}({schema}) using delta"],
  )
</struct>

从 API 检索天气数据并上传到云存储

接下来,我们使用 PythonOperator 向天气 API 发出请求,将结果存储在一个临时位置的 JSON 文件中。

一旦我们在本地获得天气数据,我们使用 LocalFilesystemToWasbOperator 将其上传到云存储,因为我们使用的是 Azure 存储。当然,Airflow 也支持将文件上传到 Amazon S3 或 Google Cloud Storage:

get_weather_data = PythonOperator(task_id="get_weather_data",
                                  python_callable=get_weather_data,
                                  op_kwargs={"output_path": "/tmp/{{ds}}.json"},
                                  )

copy_data_to_adls = LocalFilesystemToWasbOperator(
  task_id='upload_weather_data',
  wasb_conn_id='wasbs-prod,
  file_path="/tmp/{{ds}}.json",
  container_name='test',
  blob_name="airflow/landing/{{ds}}.json",
)

请注意,上面使用 {{ds}} 变量来指示 Airflow 将变量替换为计划任务运行的日期,从而为我们提供一致、不冲突的文件名。

将数据提取到表中

最后,我们准备将数据导入表中。为此,我们使用方便的DatabricksCopyIntoOperator,它会生成一个COPY INTO SQL 语句。COPY INTO 命令是一种简单而强大的方法,可以将文件从云存储中幂等地摄取到表中:

import_weather_data = DatabricksCopyIntoOperator(
    task_id="import_weather_data",
    expression_list="date::date, * except(date)",
    table_name=table,
    file_format="JSON",
     file_location="abfss://mycontainer@mystoreaccount.dfs.core.windows.net/airflow/landing/", files=["{{ds}}.json"])

就是这样!我们现在有一个可靠的数据管道,只需几行代码,就可以将 API 中的数据提取到表中。

但这还不是全部……

我们也很高兴地宣布改进,使 Airflow 与 Databricks 的集成变得轻而易举。

  • DatabricksSubmitRunOperator 已升级为使用最新的 Jobs API v2.1。借助新 API,可以更轻松地为使用 DatabricksSubmitRunOperator 提交的作业配置访问控制,因此开发人员或支持团队可以轻松访问作业 UI 和日志。
  • Airflow 现在可以触发 Delta Live Table 管道。
  • Airflow DAG 现在可以为 JAR 任务类型传递参数。
  • 可以将 Databricks Repos 更新到特定的分支或标签,以确保作业始终使用最新版本的代码。
  • 在 Azure 上,可以使用 Azure Active Directory 令牌而不是个人访问令牌 (PAT)。例如,如果 Airflow 在具有托管标识的 Azure VM 上运行,Databricks 操作员可以使用托管标识向 Azure Databricks 进行身份验证,而无需 PAT 令牌。在此处了解有关此和其他身份验证增强功能的更多信息。

Databricks 上的 Airflow 用户的未来一片光明

我们对这些改进感到兴奋,并期待看到 Airflow 社区使用 Databricks 构建的内容。我们很想听听您对我们接下来应该添加哪些功能的反馈。

原文标题:Build Data and ML Pipelines More Easily With Databricks and Apache Airflow
原文作者:Alex Ott, Bilal Aslam, Lennart Kats, Shant Hovsepian and Robert Saxby
原文地址:https://www.databricks.com/blog/2022/04/29/build-data-and-ml-pipelines-more-easily-with-databricks-and-apache-airflow.html

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论