暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Serverless之FaaS(Function as a Service)实战

火火日记 2020-06-28
1052
随着近几年云计算技术的发展,Serverless无服务器架构成为了当下最火的技术之一。


那么究竟Serverless是什么呢?下面是百度百科中的解释。
无服务器计算是一种云服务,托管服务提供商会实时为你分配充足的资源,而不是让你预先为专用的服务器或容量付费。无服务器计算不是不需要服务器(无服务器字面上的意思是,不用去管服务器),只是立足于云基础设施之上建立新的抽象层,仅使用完成任务所需的非常精确的计算资源来执行开发人员编写的代码,不多也不少。当触发代码的预定义事件发生时,无服务器平台执行任务。

个人觉得百度百科中的解释并不是很好理解。Serverless作为云计算发展的产物,其核心与主要目的就是降本增效。我觉得Serverless的概念可以结合云计算的概念,并通过生活中的一些例子去做一个比喻。下面我们通过开超市的例子来对比一下IDC,IaaS,PaaS,SaaS,FaaS!
  • IDC:盖楼、装修、进货,物品摆放及销售,全部工作都由我们自己完成。

  • IaaS:楼不用盖了,我们改租赁店铺,然后需要自己装修,进货,物品摆放及销售

  • PaaS: 租赁的店铺房东已装修好,我们只需要进货、物品摆放及销售

  • FaaS:类似于连锁超市加盟,店铺已装修好,物品摆放及销售方案已确定,我们只需要进货就好。


这就是Serverless的概念。最早推出Serverless服务的是亚马逊,是在2014年率先推出了AWS Lambda 服务。紧接着众多IaaS及Pass厂商也争相入市,Google Cloud Functions, Azure Funcions, Oracle Functions,IBM OpenWhisk,阿里云函数计算,短短数年时间Serverless产品已遍地开花!



Serverless架构在Web应用、IoT实时数据处理、视频音频转码等多个领域均有广泛应用。下图为AWS实时数据流处理的场景。通过AWS Lamda我们可以设置触发器,将Kinesis中实时数据进行加工并加载到DynamoDB中。通过Lamda可以跟踪应用程序活动、顺序事务处理顺序、分析单击数据流、整理数据、生成指标、筛选日志、建立索引、分析社交媒体以及遥测和计量IoT设备数据。


下面我们在Oracle Cloud上做一个实践!看看Funtions具体是怎么使用的!

如下图,我们通过Oracle Functions将用户上传到Object Storage的csv文件,自动下载到Autonomous Data Warehouse(自治数仓),下载成功后删除Object Storage中的csv文件。


OK 咱们开始!


1. 配置Functions


1.1 配置Tenancy

创建名为function的Group


将对应的User添加到Group中


创建Compartment


创建VCN


建议使用智能向导进行创建


创建Policy


Policy如下

Allow service FaaS to read repos in tenancy
Allow service FaaS to use virtual-network-family in tenancy


如果使用的User不是Administor用户时,需要添加如下Policy

Allow group <group-name> to manage repos in tenancy
Allow group <group-name> to read metrics in tenancy
Allow group <group-name> to use virtual-network-family in tenancy
Allow group <group-name> to manage functions-family in tenancy
Allow group <group-name> to use cloud-shell in tenancy


1.2 创建Application


选择VCN并创建


1.3 Cloud Shell环境配置



Fn Project CLI 配置

fn list context
fn use context us-ashburn-1
fn update context oracle.compartment-id ocid1.compartment.oc1..aaaaaaaa7hr5yvis5aqlqdbbtfz3mb73db2lbq6h6pjqvwr4fnmv6ughcqma


fn update context registry iad.ocir.io/idfjhvpj966p/function


创建Token


登录到Registry

docker login -u 'idfjhvpj966p/oracleidentitycloudservice/alvin.jin@oracle.com' iad.ocir.io
[a_HKyxw].3}zrLI<ZUd


fn list apps


2. 创建Dynamic Group

确认application列表

fn ls apps



创建Dynamic Group


配置Matching Rules

ALL {resource.type = 'fnfunc', resource.compartment.id = 'ocid1.compartment.oc1..aaaaaaaa7hr5yvis5aqlqdbbtfz3mb73db2lbq6h6pjqvwr4fnmv6ughcqma'}



创建Policy

allow dynamic-group function-dynamic to inspect vcns in tenancy


3. 创建Object Storage Bucket

需要创建两个Bucket,第一个Bucket是存储csv文件,第二个Bucket是将导入到ADW完毕后的csv文件同步到那里。

两个Bucket分别命名为"input-bucket"和 "processed-bucket"


创建一个Policy允许Dynamic Group操作Object Storage

Allow dynamic-group function-dynamic to manage objects in compartment function


4. 配置Autonomous Data Warehouse

创建ADW Instance


移动到 Service Console


复制ORDS(Oracle Rest Data Services) URL


在Cloud Terminal执行如下代码,创建collection



export ORDS_BASE_URL=https://OMUTWVTV4XP3C16-DB202005130920.adb.us-ashburn-1.oraclecloudapps.com/ords/
curl -X PUT -u 'ADMIN:QWERasdf1234' -H "Content-Type: application/json" $ORDS_BASE_URL/admin/soda/latest/regionsnumbers


确认collection清单

curl -u 'ADMIN:QWERasdf1234' -H "Content-Type: application/json" $ORDS_BASE_URL/admin/soda/latest/


csv文件添加到Input Object Storage Bucket之后,可以查询 collection里的数据

curl -X POST -u 'ADMIN:QWERasdf1234' -H "Content-Type: application/json" --data '{}' $ORDS_BASE_URL/admin/soda/latest/regionsnumbers?action=query


如需要清空collection时,执行如下代码

curl -X POST -u 'ADMIN:QWERasdf1234' -H "Content-Type: application/json" $ORDS_BASE_URL/admin/soda/latest/regionsnumbers?action=truncate


创建一个func.py 文件,并将如下代码复制到文件中

#
# oci-load-file-into-adw-python version 1.0.
#
# Copyright (c) 2020 Oracle, Inc.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
#


import io
import json
import oci
import csv
import requests


from fdk import response




def soda_insert(ordsbaseurl, schema, dbuser, dbpwd, document):
auth=(dbuser, dbpwd)
sodaurl = ordsbaseurl + schema + '/soda/latest/'
collectionurl = sodaurl + "regionsnumbers"
headers = {'Content-Type': 'application/json'}
r = requests.post(collectionurl, auth=auth, headers=headers, data=json.dumps(document))
r_json = {}
try:
r_json = json.loads(r.text)
except ValueError as e:
print(r.text, flush=True)
raise
return r_json




def load_data(signer, namespace, bucket_name, object_name, ordsbaseurl, schema, dbuser, dbpwd):
client = oci.object_storage.ObjectStorageClient(config={}, signer=signer)
try:
print("INFO - About to read object {0} in bucket {1}...".format(object_name, bucket_name), flush=True)
# we assume the file can fit in memory, otherwise we have to use the "range" argument and loop through the file
csvdata = client.get_object(namespace, bucket_name, object_name)
if csvdata.status == 200:
print("INFO - Object {0} is read".format(object_name), flush=True)
input_csv_text = str(csvdata.data.text)
reader = csv.DictReader(input_csv_text.split('\n'), delimiter=',')
for row in reader:
print("INFO - inserting:")
print("INFO - " + json.dumps(row), flush=True)
insert_status = soda_insert(ordsbaseurl, schema, dbuser, dbpwd, row)
if "id" in insert_status["items"][0]:
print("INFO - Successfully inserted document ID " + insert_status["items"][0]["id"], flush=True)
else:
raise SystemExit("Error while inserting: " + insert_status)
else:
raise SystemExit("cannot retrieve the object" + str(object_name))
except Exception as e:
raise SystemExit(str(e))
print("INFO - All documents are successfully loaded into the database", flush=True)




def move_object(signer, namespace, source_bucket, destination_bucket, object_name):
objstore = oci.object_storage.ObjectStorageClient(config={}, signer=signer)
objstore_composite_ops = oci.object_storage.ObjectStorageClientCompositeOperations(objstore)
resp = objstore_composite_ops.copy_object_and_wait_for_state(
namespace,
source_bucket,
oci.object_storage.models.CopyObjectDetails(
destination_bucket=destination_bucket,
destination_namespace=namespace,
destination_object_name=object_name,
destination_region=signer.region,
source_object_name=object_name
),
wait_for_states=[
oci.object_storage.models.WorkRequest.STATUS_COMPLETED,
oci.object_storage.models.WorkRequest.STATUS_FAILED])
if resp.data.status != "COMPLETED":
raise Exception("cannot copy object {0} to bucket {1}".format(object_name,destination_bucket))
else:
resp = objstore.delete_object(namespace, source_bucket, object_name)
print("INFO - Object {0} moved to Bucket {1}".format(object_name,destination_bucket), flush=True)




def handler(ctx, data: io.BytesIO=None):
signer = oci.auth.signers.get_resource_principals_signer()
object_name = bucket_name = namespace = ordsbaseurl = schema = dbuser = dbpwd = ""
try:
cfg = ctx.Config()
input_bucket = cfg["input-bucket"]
processed_bucket = cfg["processed-bucket"]
ordsbaseurl = cfg["ords-base-url"]
schema = cfg["db-schema"]
dbuser = cfg["db-user"]
dbpwd = cfg["dbpwd-cipher"]
except Exception as e:
print('Missing function parameters: bucket_name, ordsbaseurl, schema, dbuser, dbpwd', flush=True)
raise
try:
body = json.loads(data.getvalue())
print("INFO - Event ID {} received".format(body["eventID"]), flush=True)
print("INFO - Object name: " + body["data"]["resourceName"], flush=True)
object_name = body["data"]["resourceName"]
print("INFO - Bucket name: " + body["data"]["additionalDetails"]["bucketName"], flush=True)
if body["data"]["additionalDetails"]["bucketName"] != input_bucket:
raise ValueError("Event Bucket name error")
print("INFO - Namespace: " + body["data"]["additionalDetails"]["namespace"], flush=True)
namespace = body["data"]["additionalDetails"]["namespace"]
except Exception as e:
print('ERROR: bad Event!', flush=True)
raise
load_data(signer, namespace, input_bucket, object_name, ordsbaseurl, schema, dbuser, dbpwd)
move_object(signer, namespace, input_bucket, processed_bucket, object_name)


return response.Response(
ctx,
response_data=json.dumps({"status": "Success"}),
headers={"Content-Type": "application/json"}
)


创建一个requirements.txt文件,并将如下代码复制进去

fdk
oci
requests


创建一个func.yaml文件,并将如下代码复制进去

schema_version: 20180708
name: oci-load-file-into-adw-python
version: 0.0.11
runtime: python
entrypoint: /python/bin/fdk /function/func.py handler
memory: 256


部署Functions

fn -v deploy --app helloworld-app


5. 配置Functions

通过fn CLI进行配置

fn config function helloworld-app oci-load-file-into-adw-python ords-base-url "https://OMUTWVTV4XP3C16-DB202005130920.adb.us-ashburn-1.oraclecloudapps.com/ords/"
fn config function helloworld-app oci-load-file-into-adw-python db-schema "admin"
fn config function helloworld-app oci-load-file-into-adw-python db-user "admin"
fn config function helloworld-app oci-load-file-into-adw-python dbpwd-cipher "QWERasdf1234"
fn config function helloworld-app oci-load-file-into-adw-python input-bucket "input-bucket"
fn config function helloworld-app oci-load-file-into-adw-python processed-bucket "processed-bucket"


6. 创建Event Rule

需要创建一个触发器,当在Input Object Storage Bucket里存在csv文件时,通过触发器会自动运行之前创建好的func.yaml(数据加载的Python脚本)

路径如下:OCI console > Application Integration > Events Service Create Rule



配置完毕后,当新的csv文件被添加时触发器会感知,并自动执行Python脚本将csv文件数据加载到数据仓库中。


以上就是Serverless之Oracle Functions的实战内容。

谢谢!


文章转载自火火日记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论