应用得益于 Python 的灵活性和丰富的库支持,它近年来已成为后端开发中最流行的语言之一。
本文将演示如何在 Python 项目中使用 OceanBase。通过将 OceanBase 与流行的 Python 库(如 FastAPI、SQLAlchemy(ORM)和 Uvicorn(服务器))结合使用,我们将创建一个 REST API,作为无界面电商应用(Headless e-commerce application)的后端。
无界面应用(Headless application)指的是具备完整功能,但是没有图形界面的应用。在整个教程中,我们将探索 OceanBase 在 Python 项目中的能力,并展示如何对数据库中的物品和商店进行 CRUD 操作。通过将 OceanBase 集成到我们的电商应用中,我们可以确保我们的后端能够处理海量的数据和流量,同时保持最佳性能。
OceanBase 是一款高性能的分布式关系型数据库系统,旨在提供卓越的可扩展性和 ACID 能力。OceanBase 是开源的,是企业寻求强大可靠的分布式数据库解决方案的理想选择。
无界面电商应用 API
在本教程中,我们将创建一个 FastAPI 应用程序,该应用程序为无界面电商 API 提供对物品和商店资源的访问,并连接到 OceanBase 数据库。
无界面电商应用是一种创新的在线销售方法,它的设计将电商应用的前端和后端组件解耦。通过将呈现层(“头部”)与基础业务逻辑和数据管理(“主体”)分离,无界面电商应用使企业能够在其在线商店中实现更大的灵活性、可扩展性和定制性。
OceanBase 驱动的无界面电商 APP 由一个物品模块和一个商店模块组成,其中所有物品都属于其相应的商店。该 API 的特点包括:
1、物品管理
创建物品:用户可以创建具有名称、价格、描述和关联商店等详细信息的新物品。
检索物品:从商店中获取所有物品。
更新物品:通过其 ID 更新物品的详细信息。
删除物品:通过其 ID 删除物品。
2、商店管理
创建商店:用户可以创建具有名称的新商店。
检索商店:获取所有商店或按名称筛选商店。
更新单个商店:通过其 ID 更新商店。
删除商店:通过其 ID 删除商店。
3、数据库集成
该应用程序使用 SQLAlchemy 作为 ORM 与 OceanBase 数据库进行交互。它定义了模型、schemas 和存储库,以管理数据库中的物品和商店。
4、文档
该应用程序使用 FastAPI 内置的 OpenAPI 和 Swagger UI 支持进行自我文档化。用户可以通过自动生成的 Swagger UI 与 API 进行交互。
在本教程中,我们只构建一个基本的无界面 API,以展示 OceanBase 在 Python 应用程序中的能力。然而,为了提供更全面的解决方案,完整的无界面电商应用 API 通常会包括其他功能和集成。例如,本演示中缺少重要的电子商务功能,如用户身份验证和授权、购物车管理、订单管理和运输管理。
再次提醒大家,此项目的代码库已上传到 GitLab。您可以阅读本文最后一节,了解如何使用此代码库设置自己的项目。
前提
在开始之前,需要安装 Python 3 和 virtualenv。Virtualenv 是一个使用 pip 进行版本锁定和项目之间的依赖隔离等功能的包和虚拟环境管理器。
在这个项目中,我们还需要运行 OceanBase 集群。有几个选项可以实现这一点。可以在本地环境中安装 OceanBase,启动云中的虚拟机来运行它,或者使用 AWS 市场中的 OB Cloud,仅需点击几下即可设置集群。
为了简单起见,我将使用 EC2 方法和演示服务器。在实际生产使用环境中,请遵循 OceanBase 的官方指南,在 Kubernetes 集群中部署。
设置和安装
首先,在硬盘上的任何目录中创建一个名为 python-fastapi-oceanbase-api 的文件夹,用于存放该项目:
$ mkdir python-fastapi-oceanbase-api$ cd python-fastapi-oceanbase-api
导航到项目文件夹并激活虚拟环境:
python3 -m venv venvsource venv/bin/activate
1、使用 pip 安装所需的依赖项:
pip install fastapi uvicorn sqlalchemy作为初始设置,该命令安装用于构建 API 的 FastAPI, Uvicorn 作为 ASGI 服务器,SQLAlchemy 作为 ORM。
随着项目的深入,我们可能需要安装一些额外的软件包。在最终的 Gitlab repo 中,我还将包含一个 requirements.txt 文件,其中包含所有必需的包。
2、建立 OceanBase 连接
在项目根目录下创建一个名为 db.py 的新文件,并添加以下代码,使用 SQLAlchemy 配置数据库连接:
import osfrom dotenv import load_dotenvfrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmakerfrom sqlalchemy.ext.declarative import declarative_baseload_dotenv()DATABASE_URL = os.environ.get('DB_URL')print(DATABASE_URL)engine = create_engine(DATABASE_URL) # type: ignoreSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)Base = declarative_base()def get_db():db = SessionLocal()try:yield dbfinally:db.close()
在这里,我们将使用一个 .env 文件来存储我们的 OceanBase 数据库连接字符串。由于 OceanBase 兼容 MySQL,而 SQLAlchemy 目前不支持 OceanBase,因此我们可以使用 MySQL 协议。以下是 .env 文件中连接字符串的样式:
DB_URL = mysql+pymysql://user:password@EC2_IP:port/database_namedb.py 文件将导入必要的包来创建 OceanBase 的 SQLAlchemy 引擎和数据库会话,创建用于 OceanBase 的 SQLAlchemy 引擎,并定义 SessionLocal 类,表示数据库会话。
最后,用 declarative_base ( ) 创建了一个名为 Base 的基类,将被每个数据库模型或 ORM 类继承。get_db ( ) 函数为每个请求创建一个独立的数据库会话。
设计数据库模型
在本节中,我们将为存储和组织数据开发数据库模型。我们的应用程序需要两个数据库模型:Item 和 Store,以及它们各自的存储库。我们将使用在上一节中创建的 db.py 文件来开发我们的 SQLAlchemy 模型。该文件包括一个名为 Base 的类,作为定义模型的声明性基础。
创建一个名为 database 的文件夹,并添加两个文件:models.py 和 repositories.py 。我们将在 models.py 中定义所有数据库实体及其对应的存储库在 repositories.py 中实现。
models.py 文件的内容应该像这样:
from sqlalchemy import Column, ForeignKey, Integer, String, Floatfrom sqlalchemy.orm import relationshipfrom db import Baseclass Item(Base):__tablename__ = "items"id = Column(Integer, primary_key=True,index=True)name = Column(String(80), nullable=False, unique=True,index=True)price = Column(Float(precision=2), nullable=False)description = Column(String(200))store_id = Column(Integer,ForeignKey('stores.id'),nullable=False)def __repr__(self):return 'ItemModel(name=%s, price=%s,store_id=%s)' % (self.name, self.price,self.store_id)class Store(Base):__tablename__ = "stores"id = Column(Integer, primary_key=True,index=True)name = Column(String(80), nullable=False, unique=True)items = relationship("Item",primaryjoin="Store.id == Item.store_id",cascade="all, delete-orphan")def __repr__(self):return 'Store(name=%s)' % self.name
models.py 代码创建并管理 Item 和 Store 数据库模型。它首先定义了 Item 类,并将其映射到我们数据库中的 'items' 表。指定了表列及其数据类型,其中 store_id 作为 Stores 的外键引用。
使用 SQLAlchemy ORM 建立了一个关系,该关系在虚拟中包含来自相关表的值。最后,添加了其他辅助方法以在运行时显示 Store 对象。
repositories.py 文件包括一组可重用的函数,以便与存储在数据库中的数据进行交互。
from sqlalchemy.orm import Sessionfrom . import models, schemasclass ItemRepo:async def create(db: Session, item: schemas.ItemCreate):db_item = models.Item(name=item.name,price=item.price,description=item.description,store_id=item.store_id)db.add(db_item)db.commit()db.refresh(db_item)return db_itemdef fetch_by_id(db: Session,_id):return db.query(models.Item).filter(models.Item.id == _id).first()def fetch_by_name(db: Session,name):return db.query(models.Item).filter(models.Item.name == name).first()def fetch_all(db: Session, skip: int = 0, limit: int = 100):return db.query(models.Item).offset(skip).limit(limit).all()async def delete(db: Session,item_id):db_item= db.query(models.Item).filter_by(id=item_id).first()db.delete(db_item)db.commit()async def update(db: Session,item_data):updated_item = db.merge(item_data)db.commit()return updated_itemclass StoreRepo:async def create(db: Session, store: schemas.StoreCreate):db_store = models.Store(name=store.name)db.add(db_store)db.commit()db.refresh(db_store)return db_storedef fetch_by_id(db: Session,_id:int):return db.query(models.Store).filter(models.Store.id == _id).first()def fetch_by_name(db: Session,name:str):return db.query(models.Store).filter(models.Store.name == name).first()def fetch_all(db: Session, skip: int = 0, limit: int = 100):return db.query(models.Store).offset(skip).limit(limit).all()async def delete(db: Session,_id:int):db_store= db.query(models.Store).filter_by(id=_id).first()db.delete(db_store)db.commit()async def update(db: Session,store_data):db.merge(store_data)db.commit()
repositories.py 文件定义了辅助方法,以便在 Item 和 Store 数据库模型上执行 CRUD 操作。这些函数使与存储在数据库中的数据进行高效交互成为可能,简化了在应用程序中管理和访问信息的过程。
创建schema
现在我们将在 database 文件夹中创建一个名为 schemas.py 的文件。该文件将包含与我们的 SQLAlchemy 模型对应的 Pydantic 模型。本质上,这些 Pydantic 模型将概述我们应用程序的schema或有效数据结构。Pydantic 会自动处理所有数据验证。
Item schema 表示商店中的单个物品。该 schema 基于 ItemBase 类,包含以下属性:
name(str):物品的名称。
price(float):物品的价格。
description(Optional [str]):物品的可选描述,如果未提供则可以为None。
store_id(int):物品所在商店的ID。
id(int):物品的唯一标识符。
from typing import List, Optionalfrom pydantic import BaseModelclass ItemBase(BaseModel):name: strprice : floatdescription: Optional[str] = Nonestore_id: intclass ItemCreate(ItemBase):passclass Item(ItemBase):id: intclass Config:orm_mode = True
Store schema 表示包含物品的商店。该 schema 基于 StoreBase 类,包含以下属性:
name(str):商店的名称。
id(int):商店的唯一标识符。
items(List [Item]):由 Item schema表示的商店中可用物品的列表。默认情况下,该列表为空。
class StoreBase(BaseModel):name: strclass StoreCreate(StoreBase):passclass Store(StoreBase):id: intitems: List[Item] = []class Config:orm_mode = True
API
在设置好 OceanBase 连接之后,接下来构建一个 FastAPI 接口来与我们的数据库交互。在根目录下,我们将创建一个名为 main.py 的文件。
该文件应提供一个 REST API 接口,用于管理 OceanBase 中的 Items 和 Stores,允许客户端使用标准 HTTP 方法与资源交互。FastAPI 会生成适当的 Swagger 文档并处理请求/响应验证。
该文件将执行以下操作:
导入必要的包和模型,然后创建 FastAPI 应用程序。
from fastapi import Depends, FastAPI, HTTPExceptionfrom fastapi.responses import JSONResponsefrom database import modelsfrom db import get_db, engineimport database.models as modelsimport database.schemas as schemasfrom database.repositories import ItemRepo, StoreRepofrom sqlalchemy.orm import Sessionimport uvicornfrom typing import List,Optionalfrom fastapi.encoders import jsonable_encoderapp = FastAPI(title="Sample FastAPI Application",description="Sample FastAPI Application with Swagger and Sqlalchemy",version="1.0.0",)models.Base.metadata.create_all(bind=engine)@app.exception_handler(Exception)def validation_exception_handler(request, err):base_error_message = f"Failed to execute: {request.method}: {request.url}"return JSONResponse(status_code=400, content={"message": f"{base_error_message}. Detail: {err}"})
定义物品的 API 端点:
创建新物品:向 /items 发出 POST 请求,接受一个 ItemCreate schema,并返回创建的 Item。
获取所有物品:向 /items 发出 GET 请求,检索所有物品的列表,可按名称进行可选过滤。
获取特定物品:向 /items/{item_id} 发出 GET 请求,通过其 ID 检索物品。
删除物品:向 /items/{item_id} 发出 DELETE 请求,从数据库中删除指定 ID 的物品。
更新物品:向 /items/{item_id} 发出 PUT 请求,使用 Item schema中提供的数据更新指定 ID 的物品。
@app.post('/items', tags=["Item"],response_model=schemas.Item,status_code=201)async def create_item(item_request: schemas.ItemCreate, db: Session = Depends(get_db)):"""Create an Item and store it in the database"""db_item = ItemRepo.fetch_by_name(db, name=item_request.name)if db_item:raise HTTPException(status_code=400, detail="Item already exists!")return await ItemRepo.create(db=db, item=item_request)@app.get('/items', tags=["Item"],response_model=List[schemas.Item])def get_all_items(name: Optional[str] = None,db: Session = Depends(get_db)):"""Get all the Items stored in database"""if name:items =[]db_item = ItemRepo.fetch_by_name(db,name)items.append(db_item)return itemselse:return ItemRepo.fetch_all(db)@app.get('/items/{item_id}', tags=["Item"],response_model=schemas.Item)def get_item(item_id: int,db: Session = Depends(get_db)):"""Get the Item with the given ID provided by User stored in database"""db_item = ItemRepo.fetch_by_id(db,item_id)if db_item is None:raise HTTPException(status_code=404, detail="Item not found with the given ID")return db_item@app.delete('/items/{item_id}', tags=["Item"])async def delete_item(item_id: int,db: Session = Depends(get_db)):"""Delete the Item with the given ID provided by User stored in database"""db_item = ItemRepo.fetch_by_id(db,item_id)if db_item is None:raise HTTPException(status_code=404, detail="Item not found with the given ID")await ItemRepo.delete(db,item_id)return "Item deleted successfully!"@app.put('/items/{item_id}', tags=["Item"],response_model=schemas.Item)async def update_item(item_id: int,item_request: schemas.Item, db: Session = Depends(get_db)):"""Update an Item stored in the database"""db_item = ItemRepo.fetch_by_id(db, item_id)if db_item:update_item_encoded = jsonable_encoder(item_request)db_item.name = update_item_encoded['name']db_item.price = update_item_encoded['price']db_item.description = update_item_encoded['description']db_item.store_id = update_item_encoded['store_id']return await ItemRepo.update(db=db, item_data=db_item)else:raise HTTPException(status_code=400, detail="Item not found with the given ID")
定义商店的 API 端点:
创建新商店:向 /stores 发出 POST 请求,接受一个 StoreCreate schema,并返回创建的商店。
获取所有商店:向 /stores 发出 GET 请求,检索所有商店的列表,可按名称进行可选过滤。
获取特定商店:向 /stores/{store_id} 发出 GET 请求,通过其 ID 检索商店。
删除商店:向 /stores/{store_id} 发出 DELETE 请求,从数据库中删除指定 ID 的商店。
@app.post('/stores', tags=["Store"],response_model=schemas.Store,status_code=201)async def create_store(store_request: schemas.StoreCreate, db: Session = Depends(get_db)):"""Create a Store and save it in the database"""db_store = StoreRepo.fetch_by_name(db, name=store_request.name)print(db_store)if db_store:raise HTTPException(status_code=400, detail="Store already exists!")return await StoreRepo.create(db=db, store=store_request)@app.get('/stores', tags=["Store"],response_model=List[schemas.Store])def get_all_stores(name: Optional[str] = None,db: Session = Depends(get_db)):"""Get all the Stores stored in database"""if name:stores =[]db_store = StoreRepo.fetch_by_name(db,name)print(db_store)stores.append(db_store)return storeselse:return StoreRepo.fetch_all(db)@app.get('/stores/{store_id}', tags=["Store"],response_model=schemas.Store)def get_store(store_id: int,db: Session = Depends(get_db)):"""Get the Store with the given ID provided by User stored in database"""db_store = StoreRepo.fetch_by_id(db,store_id)if db_store is None:raise HTTPException(status_code=404, detail="Store not found with the given ID")return db_store@app.delete('/stores/{store_id}', tags=["Store"])async def delete_store(store_id: int,db: Session = Depends(get_db)):"""Delete the Item with the given ID provided by User stored in database"""db_store = StoreRepo.fetch_by_id(db,store_id)if db_store is None:raise HTTPException(status_code=404, detail="Store not found with the given ID")await StoreRepo.delete(db,store_id)return "Store deleted successfully!"
使用带有指定端口和重载选项的 Uvicorn 运行 FastAPI 应用程序。
if __name__ == "__main__":uvicorn.run("main:app", port=9000, reload=True)
运行APP
现在我们有了一个可用的 CRUD API 来支持我们的无界面电商应用程序,现在我们尝试在终端中输入以下命令来运行该应用程序:
python main.py然后我们就可以通过 http://127.0.0.1:9000 访问正在运行的开发服务器。通过这种设置,客户端可以使用标准 HTTP 方法轻松地与这些资源进行交互。
Swagger UI 提供了一个用户友好的界面,用于与 API 进行交互并探索其端点、请求参数和响应。也可以使用它来测试 API 请求并确保您的 API 按预期运行。
为了能够在我们的电子商务应用程序中创建物品,我们首先需要创建一个商店。我们可以向 /stores 端点发出 POST 请求来创建一个商店。在这个例子中,我将创建一个名为 GearGenius 的商店,它是一个销售科技小玩意的商店。
然后,我们可以使用 GET 方法在 /stores 端点中获取电子商务 APP 中的所有商店。每个商店都有一个商店 ID,我们可以在将物品添加到商店时使用该 ID。
现在,我们可以使用 /items 端点中的 POST 方法在特定商店中创建物品。在之前的步骤中,我们创建了一个 ID 为 2 的商店,我们可以在 store_id 属性中使用此数字。
然后,我们可以使用 /items 端点中的 GET 方法检查数据库中的所有物品。
你还可以向 /items/{item_id} 方法发出 DELETE 和 PUT 请求,以删除或更新商店中的物品。
结论
现在,我们已经成功地使用 FastAPI、 SQLAlchemy 和 Uvicorn 以及 OceanBase 数据库构建了一个 RESTful API。该 API 提供了用于在数据库中创建、检索、更新和删除物品和商店的端点。我们还探讨了如何使用 Swagger UI 与 API 进行交互并测试 API 请求。
我已将此项目的代码上传到了此 GitLab 存储库中。你可以克隆该项目,并在本地环境中运行,作为 Python OceanBase 应用程序的起点。
要设置,请复制 .env.example 文件并将其重命名为 .env,然后根据自己的数据库设置替换 DB_URL 变量。
你可以运行以下命令来安装所有必要的软件包:
pip install -r requirements.txt总的来说,本教程概述了在 Python 项目中如何使用 OceanBase。未来,你可以通过添加其他功能和功能(如用户管理和身份验证、支付集成和搜索)来增强此电子商务平台。你还可以基于此模板开始创建其他基于 API 的 OceanBase 应用程序。




