暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

干货|ByteHouse+Airflow:六步实现自动化数据管理流程

503

火山引擎云原生数据仓库ByteHouse与Apache Airflow强强结合,为管理和执行数据流程提供了强大而高效的解决方案。本文将带来ByteHouse与Apache Airflow结合使用的主要优势和特点,展示如何简化数据工作流程,并推动业务成功。

文 | Aelfric Lin  火山引擎ByteHouse团队


一、可扩展可靠的数据流程:Apache Airflow提供了一个强大的平台,用于设计和编排数据流程,更轻松的处理复杂的工作流程。搭配ByteHouse的云原生数据仓库解决方案,可以高效地存储和处理大量数据,确保数据流程的可扩展性和可靠性。

二、自动化工作流管理:Airflow的直观界面通过可视化的DAG(有向无环图)编辑器,使得创建和调度数据工作流程变得容易。通过与ByteHouse集成,可以自动化提取、转换和加载(ETL)过程,减少手动工作量,实现更高效的数据管理。

三、简单的部署和管理:Apache Airflow和ByteHouse均设计为简单的部署和管理。Airflow可以部署在本地或云端,而ByteHouse提供完全托管的云原生数据仓库解决方案。这种组合使得数据基础设施的设置和维护变得无缝化。



/ 业务场景


在这个客户场景中,一家名为“数据洞察有限公司(化名)”的分析公司,他们将Apache Airflow作为数据管道编排工具,同时选择ByteHouse作为数据仓库解决方案,以利用其强大的分析和机器学习功能。

数据洞察有限公司在电子商务行业运营,需要收集存储在AWS S3中的大量客户和交易数据,他们会定期将这些数据加载到ByteHouse,并执行各种分析任务,做到对业务运营情况的技术洞察。

/ 数据链路


数据洞察有限公司使用Apache Airflow,设置了一个基于特定事件或时间表的数据加载管道。例如,他们可以配置Airflow在每天的特定时间触发数据加载过程,或者当新的数据文件添加到指定的AWS S3存储桶时触发

当触发事件发生时,Airflow通过从AWS S3中检索相关数据文件来启动数据加载过程。它使用适当的凭据和API集成确保与S3存储桶的安全身份验证和连接。一旦数据从AWS S3中获取,Airflow会协调数据的转换和加载到ByteHouse中。它利用ByteHouse的集成能力,根据预定义的模式和数据模型高效地存储和组织数据。

成功将数据加载到ByteHouse后,可以利用ByteHouse的功能进行分析和机器学习任务,使用ByteHouse的类SQL语言查询数据,进行复杂的分析后生成报告,并洞察客户、销售的趋势以及产品性能。

此外,数据洞察有限公司还利用ByteHouse的功能创建可视化的交互式仪表板。通过动态仪表板显示实时指标,监控关键绩效指标,并可与其他工作人员同时操作。

最后,数据洞察有限公司利用ByteHouse的机器学习功能来开发预测模型、推荐系统或客户细分算法。ByteHouse提供了必要的计算能力和存储基础设施,用于训练和部署机器学习模型,使数据洞察有限公司能够获得有价值的预测和算法。


/ 总结


通过使用Apache Airflow作为数据管道编排工具,并将其与ByteHouse集成,数据洞察有限公司实现了从AWS S3加载数据到ByteHouse的流畅自动化流程。他们充分利用ByteHouse的强大分析、机器学习和仪表板功能,获得有价值的洞察,并推动组织内的数据驱动。



/ 步骤一:先决条件


在您的虚拟/本地环境中安装pip。
在您的虚拟/本地环境中安装ByteHouse CLI并登录到ByteHouse账户。
参考ByteHouse CLI以获取安装帮助。
macOS上使用Homebrew的示例
    brew install bytehouse-cli

    / 步骤二:安装Apache Airflow


    在本教程中,我们使用pip在您的本地或虚拟环境中安装Apache Airflow。了解更多信息,请参阅官方Airflow文档。
      # airflow需要一个目录,~/airflow是默认目录,
      # 但如果您喜欢,可以选择其他位置
      #(可选)
      export AIRFLOW_HOME=~/airflow


      AIRFLOW_VERSION=2.1.3
      PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"


      # 例如:3.6
      CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
      pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
      如果使用pip无法安装,请尝试使用pip3 install进行安装。安装完成后,运行命令airflow info以获取有关Airflow的更多信息。


      / 步骤三:Airflow初始化


      通过执行以下命令来初始化Airflow的Web服务器:
        # 初始化数据库
        airflow db init




        airflow users create \
        --username admin \
        --firstname admin \
        --lastname admin \
        --role Admin \
        --email admin


        # 启动Web服务器,默认端口是8080
        # 或修改airflow.cfg设置web_server_port
        airflow webserver --port 8080

        设置好Web服务器后,您可以访问http://localhost:8080/使用先前设置的用户名和密码登录Airflow控制台。


        在新的终端中,使用以下命令设置Airflow调度器。然后刷新http://localhost:8080/


        / 步骤四:YAML配置


        使用 cd ~/airflow 命令进入Airflow文件夹。
        打开名为 airflow.cfg 的配置文件,添加配置并连接到数据库。默认情况下,可以使用SQLite,但也可以连接到MySQL。
          # 默认情况下是SQLite,也可以连接到MySQL
          sql_alchemy_conn = mysql+pymysql://airflow:airflow@xxx.xx.xx.xx:8080/airflow


          # authenticate = False
          # 禁用Alchemy连接池以防止设置Airflow调度器时出现故障 https://github.com/apache/airflow/issues/10055
          sql_alchemy_pool_enabled = False


          # 存放Airflow流水线的文件夹,通常是代码库中的子文件夹。该路径必须是绝对路径。
          dags_folder = home/admin/airflow/dags



          / 步骤五:创建有向无环图(DAG)作业


          在Airflow路径下创建一个名为dags的文件夹,然后创建test_bytehouse.py以启动一个新的DAG作业。
            ~/airflow
            mkdir dags
            cd dags
            nano test_bytehouse.py
            在test_bytehouse.py中添加以下代码,该作业可以连接到ByteHouse CLI,并使用BashOperator运行任务、查询或将数据加载到ByteHouse中。
              from datetime import timedelta
              from textwrap import dedent


              from airflow import DAG
              from airflow.operators.bash import BashOperator
              from airflow.utils.dates import days_ago


              default_args = {
              'owner': 'airflow',
              'depends_on_past': False,
              'email': ['airflow@example.com'],
              'email_on_failure': False,
              'email_on_retry': False,
              'retries': 1,
              'retry_delay': timedelta(minutes=5),
              }
              with DAG(
              'test_bytehouse',
              default_args=default_args,
              description='A simple tutorial DAG',
              schedule_interval=timedelta(days=1),
              start_date=days_ago(1),
              tags=['example'],
              ) as dag:

              tImport = BashOperator(
              task_id='ch_import',
              depends_on_past=False,
              bash_command='$Bytehouse_HOME/bytehouse-cli -cf root/bytehouse-cli/conf.toml "INSERT INTO korver.cell_towers_1 FORMAT csv INFILE \'/opt/bytehousecli/data.csv\' "',
              )


              tSelect = BashOperator(
              task_id='ch_select',
              depends_on_past=False,
              bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml -q "select * from korver.cell_towers_1 limit 10 into outfile \'/opt/bytehousecli/dataout.csv\' format csv "'
              )

              tSelect >> tImport

              在当前文件路径下运行python test_bytehouse.py 以在Airflow中创建DAG。
              在浏览器中刷新网页,可以在DAG列表中看到新创建的名为test_bytehouse的DAG。


              / 步骤六:执行DAG /


              在终端中运行以下Airflow命令,来查看DAG列表和test_bytehouse DAG中的子任务,再分别测试查询执行和数据导入任务。
                #打印"test_bytehouse" DAG中的任务列表
                [root@VM-64-47-centos dags]# airflow tasks list test_bytehouse
                ch_import
                ch_select


                #打印"test_bytehouse" DAG中任务的层次结构
                [root@VM-64-47-centos dags]# airflow tasks list test_bytehouse --tree
                <Task(BashOperator): ch_select>
                <Task(BashOperator): ch_import>

                运行完DAG后,查看ByteHouse账户中的查询历史页面和数据库模块,获取查询/加载数据成功执行的结果。


                产品介绍

                火山引擎ByteHouse
                统一的大数据分析平台。目前提供企业版和云数仓两种版本,企业版是基于开源的企业级分析型数据库,支持用户交互式分析PB级别数据,通过多种自研表引擎,灵活支持各类数据分析和应用;云数仓版作为云原生的数据分析平台,实现统一的离线和实时数据分析,并通过弹性扩展的计算层和分布式存储层,有效降低企业大数据分析。后台回复数字“6”了解产品



                文章转载自字节跳动数据平台,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论