简介
世界每时每刻都有大量的数据从各种数据源产生。但是从中提取和处理信息是非常繁琐的。为了解决这些问题,Apache Beam应运而生。
Apache Beam是一个开源的统一编程模型,用于定义和执行数据处理管道、转换,包括ETL和处理批处理和流式数据。因此,使用你最喜欢的编程语言(目前是Python、Java和Go),来使用Apache Beam SDK,并在你喜欢的Runner上执行管道,如Apache Spark、Apache Flink、Cloud Dataflow、Amazon Kinesis等等。
数据导入和数据类型
我们的数据有两种类型:批处理数据和流式数据。根据不同的场景,选择不同的架构模型来处理我们的数据。在这里,我们将通过使用Python代码来进一步操作。Apache Beam SDK需要Python 3.6或更高版本。现在,使用以下命令安装apache beam SDK。
本地
pip install apache-beam
Google云平台
pip install apache-beam[gcp]
AWS
pip install apache-beam[aws]
因此,对于I/O操作,可以从不同数据源读写数据,如Avro、Parquet、BigQuery、PubSub、MongoDB、TFRecord等。
批数据
首先将历史数据收集到数据湖中,也就是原始数据(未经处理的数据)。然后做一些处理和转换,把数据存到存储服务中(S3 Bucker,云存储,内部存储设备等)。这就是所谓的从数据湖中提取数据。
beam.io.ReadFromText(‘’)
流式数据
虽然这是从数据中心、汽车、地图、医疗保健、日志设备和传感器等产生的实时数据。因此,对于导入流式数据,使用Apache Kafka或任何其他消息服务(如云Pub/Sub、SNS)。在Pub/Sub中,你可以根据需要来过滤数据。
beam.io.ReadFromPubSub(subscription=subscription_name)
处理和转换
首先,创建一个Pipeline对象并设置pipeline执行环境(Apache Spark、Apache Flink、Cloud Dataflow和Amazon Kinesis等)。现在,从一些外部存储或数据源创建一个Pcollection,然后应用PTransforms来转换Pcollection中的每个元素,产生输出Pcollection。
然后,你可以过滤、分组、分析或对数据做其他处理。最后,使用I/O库将最终的Pcollection存储到外部存储系统。当你运行这个pipeline时,它会创建一个pipeline的工作流图,它在runner引擎上异步执行。
Pipelines - 它封装了从外部源读取有界或无界数据的整个过程,对其进行转换,并相应地将输出保存到外部存储源,如BigQuery等。
Pcollections - 它定义了数据管道工作的数据,它可以是有界数据或无界数据。我们可以从任何外部系统(数据湖,地理数据,医疗保健)创建Pcollections。
PTransforms - 它将Pcollection作为一个输入数据,并使用处理函数(ParDo, Map, Filter等),并产生另一个Pcollection。
Pipeline IO - 从向各种外部数据源读/写数据。
窗口
窗口是种处理流式数据或无界数据的机制。窗口是根据时间戳值来划分数据的。当你在无界数据上创建一个管道,并做一些聚合的转换,如groupByKey和CoGroupByKey,因为要基于一些关键值来聚合数据,窗口这时变得特别重要。
下面是4种不同的窗口,它们划分了Pcollection中的元素:
- 固定时间窗口 — 固定大小的非重叠窗口。
- 滑动时间窗口 — 窗口持续时间内固定大小的重叠窗口。
- 会话窗口 — 特定间隔时间内的元素。
- 单一全局窗口 — 默认情况下,Pcollection的所有元素都在一个单一全局窗口中。
from apache_beam.transforms.window import (
TimestampedValue,
Sessions,
Duration,
GlobalWindows, FixedWindows, SlidingWindows)
触发器
如果我们收到部分结果,它们可以在窗口结束前提前计算出来。因此,它将产生更早的结果。有一种叫做触发器的机制来控制迟到事件。
然而Apache Beam有4种不同类型的触发器:
- 事件时间触发器 — 基于事件时间。
- 处理时间触发器 — 基于处理时间。
- 数据驱动触发器 —基于到达的数据大小进行计算。
- SComposite触发器 — 允许把不同类型的触发器与谓词相组合。
部署在Google Cloud Dataflow Engine
一旦你完成了Beam pipeline,你可以在内部、云系统或本地系统上运行它。对于GCP环境,Cloud Dataflow是谷歌云提供的一项服务,用于执行Apache Beam pipeline。
现在,打开你的笔记本电脑,用Python创建Beam管道。
import apache_beam as beam
argv = [
‘–project={0}’.format(project),
‘–job_name=ch04timecorr’,
‘–save_main_session’,
‘–staging_location=gs://{0}/healthcare/staging/’.format(bucket),
‘–temp_location=gs://{0}/healthcare/temp/’.format(bucket),
‘–setup_file=./setup.py’,
‘–max_num_workers=8’,
‘–region={}’.format(region),
‘–autoscaling_algorithm=THROUGHPUT_BASED’,
‘–runner=DataflowRunner’]
def run(project, bucket, dataset, region):
pipeline = beam.Pipeline(argv=argv)
# do beam processing and operations
pipeline.run()
if __name__ == ‘__main__’:
import argparse
parser = argparse.ArgumentParser(description=’Run pipeline on the GCP)
parser.add_argument(‘-p’,’–project’, help=’Unique project ID’, required=True)
parser.add_argument(‘-b’,’–bucket’, help=’Bucket where your data were ingested.’, required=True)
parser.add_argument(‘-r’,’–region’, help=’Region in which to run the job.’, required=True)
parser.add_argument(‘-d’,’–dataset’, help=’Google BigQuery dataset’, default=’healthcare’)
args = vars(parser.parse_args())
run(project=args[‘project’], bucket=args[‘bucket’], dataset=args[‘dataset’], region=args[‘region’])
参考
https://beam.apache.org/blog/
https://www.loginradius.com/blog/engineering/apache-beam/
https://blog.knoldus.com/apache-beam-overview
原文标题:Big Data Processing with Apache Beam
原文作者:Shashikant Tanti
原文地址:https://blog.knoldus.com/big-data-processing-with-apache-beam/




