
IDC:盖楼、装修、进货,物品摆放及销售,全部工作都由我们自己完成。
IaaS:楼不用盖了,我们改租赁店铺,然后需要自己装修,进货,物品摆放及销售
PaaS: 租赁的店铺房东已装修好,我们只需要进货、物品摆放及销售
FaaS:类似于连锁超市加盟,店铺已装修好,物品摆放及销售方案已确定,我们只需要进货就好。
这就是Serverless的概念。最早推出Serverless服务的是亚马逊,是在2014年率先推出了AWS Lambda 服务。紧接着众多IaaS及Pass厂商也争相入市,Google Cloud Functions, Azure Funcions, Oracle Functions,IBM OpenWhisk,阿里云函数计算,短短数年时间Serverless产品已遍地开花!

Serverless架构在Web应用、IoT实时数据处理、视频音频转码等多个领域均有广泛应用。下图为AWS实时数据流处理的场景。通过AWS Lamda我们可以设置触发器,将Kinesis中实时数据进行加工并加载到DynamoDB中。通过Lamda可以跟踪应用程序活动、顺序事务处理顺序、分析单击数据流、整理数据、生成指标、筛选日志、建立索引、分析社交媒体以及遥测和计量IoT设备数据。

下面我们在Oracle Cloud上做一个实践!看看Funtions具体是怎么使用的!
如下图,我们通过Oracle Functions将用户上传到Object Storage的csv文件,自动下载到Autonomous Data Warehouse(自治数仓),下载成功后删除Object Storage中的csv文件。

OK 咱们开始!
1. 配置Functions
1.1 配置Tenancy
创建名为function的Group

将对应的User添加到Group中

创建Compartment

创建VCN

建议使用智能向导进行创建

创建Policy

Policy如下
Allow service FaaS to read repos in tenancyAllow service FaaS to use virtual-network-family in tenancy

如果使用的User不是Administor用户时,需要添加如下Policy
Allow group <group-name> to manage repos in tenancyAllow group <group-name> to read metrics in tenancyAllow group <group-name> to use virtual-network-family in tenancyAllow group <group-name> to manage functions-family in tenancyAllow group <group-name> to use cloud-shell in tenancy
1.2 创建Application

选择VCN并创建

1.3 Cloud Shell环境配置

Fn Project CLI 配置
fn list contextfn use context us-ashburn-1fn update context oracle.compartment-id ocid1.compartment.oc1..aaaaaaaa7hr5yvis5aqlqdbbtfz3mb73db2lbq6h6pjqvwr4fnmv6ughcqmafn update context registry iad.ocir.io/idfjhvpj966p/function
创建Token

登录到Registry
docker login -u 'idfjhvpj966p/oracleidentitycloudservice/alvin.jin@oracle.com' iad.ocir.io[a_HKyxw].3}zrLI<ZUdfn list apps
2. 创建Dynamic Group
确认application列表
fn ls apps

创建Dynamic Group

配置Matching Rules
ALL {resource.type = 'fnfunc', resource.compartment.id = 'ocid1.compartment.oc1..aaaaaaaa7hr5yvis5aqlqdbbtfz3mb73db2lbq6h6pjqvwr4fnmv6ughcqma'}

创建Policy
allow dynamic-group function-dynamic to inspect vcns in tenancy

3. 创建Object Storage Bucket
需要创建两个Bucket,第一个Bucket是存储csv文件,第二个Bucket是将导入到ADW完毕后的csv文件同步到那里。
两个Bucket分别命名为"input-bucket"和 "processed-bucket"


创建一个Policy允许Dynamic Group操作Object Storage
Allow dynamic-group function-dynamic to manage objects in compartment function

4. 配置Autonomous Data Warehouse
创建ADW Instance

移动到 Service Console

复制ORDS(Oracle Rest Data Services) URL

在Cloud Terminal执行如下代码,创建collection
export ORDS_BASE_URL=https://OMUTWVTV4XP3C16-DB202005130920.adb.us-ashburn-1.oraclecloudapps.com/ords/curl -X PUT -u 'ADMIN:QWERasdf1234' -H "Content-Type: application/json" $ORDS_BASE_URL/admin/soda/latest/regionsnumbers
确认collection清单
curl -u 'ADMIN:QWERasdf1234' -H "Content-Type: application/json" $ORDS_BASE_URL/admin/soda/latest/
csv文件添加到Input Object Storage Bucket之后,可以查询 collection里的数据
curl -X POST -u 'ADMIN:QWERasdf1234' -H "Content-Type: application/json" --data '{}' $ORDS_BASE_URL/admin/soda/latest/regionsnumbers?action=query
如需要清空collection时,执行如下代码
curl -X POST -u 'ADMIN:QWERasdf1234' -H "Content-Type: application/json" $ORDS_BASE_URL/admin/soda/latest/regionsnumbers?action=truncate
创建一个func.py 文件,并将如下代码复制到文件中
## oci-load-file-into-adw-python version 1.0.## Copyright (c) 2020 Oracle, Inc.# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.#import ioimport jsonimport ociimport csvimport requestsfrom fdk import responsedef soda_insert(ordsbaseurl, schema, dbuser, dbpwd, document):auth=(dbuser, dbpwd)sodaurl = ordsbaseurl + schema + '/soda/latest/'collectionurl = sodaurl + "regionsnumbers"headers = {'Content-Type': 'application/json'}r = requests.post(collectionurl, auth=auth, headers=headers, data=json.dumps(document))r_json = {}try:r_json = json.loads(r.text)except ValueError as e:print(r.text, flush=True)raisereturn r_jsondef load_data(signer, namespace, bucket_name, object_name, ordsbaseurl, schema, dbuser, dbpwd):client = oci.object_storage.ObjectStorageClient(config={}, signer=signer)try:print("INFO - About to read object {0} in bucket {1}...".format(object_name, bucket_name), flush=True)# we assume the file can fit in memory, otherwise we have to use the "range" argument and loop through the filecsvdata = client.get_object(namespace, bucket_name, object_name)if csvdata.status == 200:print("INFO - Object {0} is read".format(object_name), flush=True)input_csv_text = str(csvdata.data.text)reader = csv.DictReader(input_csv_text.split('\n'), delimiter=',')for row in reader:print("INFO - inserting:")print("INFO - " + json.dumps(row), flush=True)insert_status = soda_insert(ordsbaseurl, schema, dbuser, dbpwd, row)if "id" in insert_status["items"][0]:print("INFO - Successfully inserted document ID " + insert_status["items"][0]["id"], flush=True)else:raise SystemExit("Error while inserting: " + insert_status)else:raise SystemExit("cannot retrieve the object" + str(object_name))except Exception as e:raise SystemExit(str(e))print("INFO - All documents are successfully loaded into the database", flush=True)def move_object(signer, namespace, source_bucket, destination_bucket, object_name):objstore = oci.object_storage.ObjectStorageClient(config={}, signer=signer)objstore_composite_ops = oci.object_storage.ObjectStorageClientCompositeOperations(objstore)resp = objstore_composite_ops.copy_object_and_wait_for_state(namespace,source_bucket,oci.object_storage.models.CopyObjectDetails(destination_bucket=destination_bucket,destination_namespace=namespace,destination_object_name=object_name,destination_region=signer.region,source_object_name=object_name),wait_for_states=[oci.object_storage.models.WorkRequest.STATUS_COMPLETED,oci.object_storage.models.WorkRequest.STATUS_FAILED])if resp.data.status != "COMPLETED":raise Exception("cannot copy object {0} to bucket {1}".format(object_name,destination_bucket))else:resp = objstore.delete_object(namespace, source_bucket, object_name)print("INFO - Object {0} moved to Bucket {1}".format(object_name,destination_bucket), flush=True)def handler(ctx, data: io.BytesIO=None):signer = oci.auth.signers.get_resource_principals_signer()object_name = bucket_name = namespace = ordsbaseurl = schema = dbuser = dbpwd = ""try:cfg = ctx.Config()input_bucket = cfg["input-bucket"]processed_bucket = cfg["processed-bucket"]ordsbaseurl = cfg["ords-base-url"]schema = cfg["db-schema"]dbuser = cfg["db-user"]dbpwd = cfg["dbpwd-cipher"]except Exception as e:print('Missing function parameters: bucket_name, ordsbaseurl, schema, dbuser, dbpwd', flush=True)raisetry:body = json.loads(data.getvalue())print("INFO - Event ID {} received".format(body["eventID"]), flush=True)print("INFO - Object name: " + body["data"]["resourceName"], flush=True)object_name = body["data"]["resourceName"]print("INFO - Bucket name: " + body["data"]["additionalDetails"]["bucketName"], flush=True)if body["data"]["additionalDetails"]["bucketName"] != input_bucket:raise ValueError("Event Bucket name error")print("INFO - Namespace: " + body["data"]["additionalDetails"]["namespace"], flush=True)namespace = body["data"]["additionalDetails"]["namespace"]except Exception as e:print('ERROR: bad Event!', flush=True)raiseload_data(signer, namespace, input_bucket, object_name, ordsbaseurl, schema, dbuser, dbpwd)move_object(signer, namespace, input_bucket, processed_bucket, object_name)return response.Response(ctx,response_data=json.dumps({"status": "Success"}),headers={"Content-Type": "application/json"})
创建一个requirements.txt文件,并将如下代码复制进去
fdkocirequests
创建一个func.yaml文件,并将如下代码复制进去
schema_version: 20180708name: oci-load-file-into-adw-pythonversion: 0.0.11runtime: pythonentrypoint: /python/bin/fdk /function/func.py handlermemory: 256
部署Functions
fn -v deploy --app helloworld-app
5. 配置Functions
通过fn CLI进行配置
fn config function helloworld-app oci-load-file-into-adw-python ords-base-url "https://OMUTWVTV4XP3C16-DB202005130920.adb.us-ashburn-1.oraclecloudapps.com/ords/"fn config function helloworld-app oci-load-file-into-adw-python db-schema "admin"fn config function helloworld-app oci-load-file-into-adw-python db-user "admin"fn config function helloworld-app oci-load-file-into-adw-python dbpwd-cipher "QWERasdf1234"fn config function helloworld-app oci-load-file-into-adw-python input-bucket "input-bucket"fn config function helloworld-app oci-load-file-into-adw-python processed-bucket "processed-bucket"
6. 创建Event Rule
需要创建一个触发器,当在Input Object Storage Bucket里存在csv文件时,通过触发器会自动运行之前创建好的func.yaml(数据加载的Python脚本)
路径如下:OCI console > Application Integration > Events Service Create Rule


配置完毕后,当新的csv文件被添加时触发器会感知,并自动执行Python脚本将csv文件数据加载到数据仓库中。
以上就是Serverless之Oracle Functions的实战内容。
谢谢!




