暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

通过Apache Beam进行大数据处理

原创 不言 2022-09-19
672

简介

世界每时每刻都有大量的数据从各种数据源产生。但是从中提取和处理信息是非常繁琐的。为了解决这些问题,Apache Beam应运而生。

Apache Beam是一个开源的统一编程模型,用于定义和执行数据处理管道、转换,包括ETL和处理批处理和流式数据。因此,使用你最喜欢的编程语言(目前是Python、Java和Go),来使用Apache Beam SDK,并在你喜欢的Runner上执行管道,如Apache Spark、Apache Flink、Cloud Dataflow、Amazon Kinesis等等。

数据导入和数据类型

我们的数据有两种类型:批处理数据和流式数据。根据不同的场景,选择不同的架构模型来处理我们的数据。在这里,我们将通过使用Python代码来进一步操作。Apache Beam SDK需要Python 3.6或更高版本。现在,使用以下命令安装apache beam SDK。

本地

pip install apache-beam

Google云平台

pip install apache-beam[gcp]

AWS

pip install apache-beam[aws]

因此,对于I/O操作,可以从不同数据源读写数据,如Avro、Parquet、BigQuery、PubSub、MongoDB、TFRecord等。

批数据

首先将历史数据收集到数据湖中,也就是原始数据(未经处理的数据)。然后做一些处理和转换,把数据存到存储服务中(S3 Bucker,云存储,内部存储设备等)。这就是所谓的从数据湖中提取数据。

beam.io.ReadFromText(‘’)

流式数据

虽然这是从数据中心、汽车、地图、医疗保健、日志设备和传感器等产生的实时数据。因此,对于导入流式数据,使用Apache Kafka或任何其他消息服务(如云Pub/Sub、SNS)。在Pub/Sub中,你可以根据需要来过滤数据。

beam.io.ReadFromPubSub(subscription=subscription_name)

处理和转换

首先,创建一个Pipeline对象并设置pipeline执行环境(Apache Spark、Apache Flink、Cloud Dataflow和Amazon Kinesis等)。现在,从一些外部存储或数据源创建一个Pcollection,然后应用PTransforms来转换Pcollection中的每个元素,产生输出Pcollection。

然后,你可以过滤、分组、分析或对数据做其他处理。最后,使用I/O库将最终的Pcollection存储到外部存储系统。当你运行这个pipeline时,它会创建一个pipeline的工作流图,它在runner引擎上异步执行。

Pipelines - 它封装了从外部源读取有界或无界数据的整个过程,对其进行转换,并相应地将输出保存到外部存储源,如BigQuery等。

Pcollections - 它定义了数据管道工作的数据,它可以是有界数据或无界数据。我们可以从任何外部系统(数据湖,地理数据,医疗保健)创建Pcollections。

PTransforms - 它将Pcollection作为一个输入数据,并使用处理函数(ParDo, Map, Filter等),并产生另一个Pcollection。

Pipeline IO - 从向各种外部数据源读/写数据。

窗口

窗口是种处理流式数据或无界数据的机制。窗口是根据时间戳值来划分数据的。当你在无界数据上创建一个管道,并做一些聚合的转换,如groupByKey和CoGroupByKey,因为要基于一些关键值来聚合数据,窗口这时变得特别重要。

下面是4种不同的窗口,它们划分了Pcollection中的元素:

  1. 固定时间窗口 — 固定大小的非重叠窗口。
  2. 滑动时间窗口 — 窗口持续时间内固定大小的重叠窗口。
  3. 会话窗口 — 特定间隔时间内的元素。
  4. 单一全局窗口 — 默认情况下,Pcollection的所有元素都在一个单一全局窗口中。
from apache_beam.transforms.window import ( TimestampedValue, Sessions, Duration, GlobalWindows, FixedWindows, SlidingWindows)

触发器

如果我们收到部分结果,它们可以在窗口结束前提前计算出来。因此,它将产生更早的结果。有一种叫做触发器的机制来控制迟到事件。

然而Apache Beam有4种不同类型的触发器:

  1. 事件时间触发器 — 基于事件时间。
  2. 处理时间触发器 — 基于处理时间。
  3. 数据驱动触发器 —基于到达的数据大小进行计算。
  4. SComposite触发器 — 允许把不同类型的触发器与谓词相组合。

部署在Google Cloud Dataflow Engine

一旦你完成了Beam pipeline,你可以在内部、云系统或本地系统上运行它。对于GCP环境,Cloud Dataflow是谷歌云提供的一项服务,用于执行Apache Beam pipeline。

现在,打开你的笔记本电脑,用Python创建Beam管道。

import apache_beam as beam
argv = [ ‘–project={0}’.format(project), ‘–job_name=ch04timecorr’, ‘–save_main_session’, ‘–staging_location=gs://{0}/healthcare/staging/’.format(bucket), ‘–temp_location=gs://{0}/healthcare/temp/’.format(bucket), ‘–setup_file=./setup.py’, ‘–max_num_workers=8’, ‘–region={}’.format(region), ‘–autoscaling_algorithm=THROUGHPUT_BASED’, ‘–runner=DataflowRunner’]
def run(project, bucket, dataset, region):
pipeline = beam.Pipeline(argv=argv) # do beam processing and operations pipeline.run()
if __name__ == ‘__main__’: import argparse parser = argparse.ArgumentParser(description=’Run pipeline on the GCP) parser.add_argument(‘-p’,’–project’, help=’Unique project ID’, required=True) parser.add_argument(‘-b’,’–bucket’, help=’Bucket where your data were ingested.’, required=True) parser.add_argument(‘-r’,’–region’, help=’Region in which to run the job.’, required=True) parser.add_argument(‘-d’,’–dataset’, help=’Google BigQuery dataset’, default=’healthcare’) args = vars(parser.parse_args()) run(project=args[‘project’], bucket=args[‘bucket’], dataset=args[‘dataset’], region=args[‘region’])

参考

https://beam.apache.org/blog/

https://www.loginradius.com/blog/engineering/apache-beam/

https://blog.knoldus.com/apache-beam-overview

原文标题:Big Data Processing with Apache Beam
原文作者:Shashikant Tanti
原文地址:https://blog.knoldus.com/big-data-processing-with-apache-beam/

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论