昨天公众号收到一个私信,说希望在Superset 开箱即用中使用警报和报告功能。
警报和报告需要 Celery 和 Redis 的支持。而我的这个开箱即用版本主要是为了体验 Superset, 并没有考虑添加 Reids 和 Celery 的支持,但既然有人提出需求了,我就顺便研究下 Windows 下部署 Celery,Redis,并用 Flask 做个简单测试吧。
有兴趣的可以先看下之前的一篇文章以点麦当劳为例,说说为啥我们需要任务队列,就是以麦当劳排队点餐为例,介绍了为啥需要Celery、Redis和Flask(当然你可以选择别的框架)。
下面就开始Windows的实现,算是为Superset 开箱即用添加更多功能做铺垫。
环境:
Windows 64 (无管理员权限) python-3.8.10-embed-amd64(和 Superset 开箱即用一样的) Redis Server 7.0.5 Celery 5.2.7 flower 1.2.0 Flask 2.2.2

Redis 下载与安装
这里测试的是非官方的免安装版本[1]下载解压,双击运行redis-server.exe
即可。
如果 redis server 没有设置密码,建议 redis 地址使用127.0.0.1
。
网上说 Celery 不支持localhost
,其实是因为没有设置密码或者没有设置保护模式为 no。
如果想使用 localhost,需要修改redis.conf
的protected-mode yes
为protected-mode no
或者在客户端执行
CONFIG SET protected-mode no
Celery 与 Flower 的安装
Celery 是一个 基于 python 开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理 Flower 是Celery[2] 分布式任务队列的实时监控和 web 管理 
Flower UI
Celery 的执行池支持 prefork,eventlet,gevent ,solo, processes ,threads,custom。如果不指定,则默认使用 prefork, 但该执行池不支持 Windows,可以考虑eventlet
或者gevent
。
安装
python -m pip install celery eventlet flower redis flask
测试代码
目录结构如下,
celery_demo
│ celery.py
│ tasks.py
flask_demo.py
flower.bat
worker.bat
test.py

相关代码如下,
celery.py
from celery import Celery
# 若redis设置了密码,URI应为 redis://:password@127.0.0.1:6379/0
app = Celery('celery_demo',
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/0',
include=['celery_demo.tasks'])
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
tasks.py
from time import sleep
from .celery import app
@app.task
def add(x, y):
sleep(5)
return x + y
@app.task
def mul(x, y):
sleep(10)
return x * y
test.py
#测试代码
from celery_demo import tasks
res1 = tasks.add.delay(12, 22)
print(res1.get(timeout=50))
# 输出 34 测试完成
flask_demo.py
from flask import Flask
from celery_demo.celery import app as celery
import time
app = Flask(__name__)
@app.route('/add')
def add():
from celery_demo import tasks
results = tasks.add.delay(1234,6789)
print('---------1234 + 6789 ------------')
return {"msg": "success", "result": results.get()}
@app.route('/mul')
def mul():
results = celery.send_task("celery_demo.tasks.mul",kwargs={"x":5,"y":6789})
print('---------5 x 6789 ------------')
result = results.get()
return {"msg": "success", "result": result}
if __name__ == '__main__':
app.run(debug=True)
worker.bat
python -m celery -A celery_demo worker -l info -P eventlet
flower.bat
python -m celery -A celery_demo flower --loglevel=info
运行顺序
worker.batflower.bat
(可选)python test.py
或者python flask_demo.py
资料
Flower 中文文档[3] 源代码[4]
参考资料
免安装版本: https://github.com/zkteco-home/redis-windows
[2]Celery: http://www.celeryproject.org/
[3]Flower中文文档: https://flower-docs-cn.readthedocs.io/zh/latest/
[4]源代码: https://github.com/alitrack/celery




