暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

自动更新选股模型,实时监控,基于 Apache DolphinSchedule 打造机器学习智能选股系统

海豚调度 2022-07-05
1209
作者 | 周捷光 白鲸开源高级算法工程师

摘要:Apache DolphinScheduler 已经在DataOps领域提供了强大的分布式可视化工作流调度能力。2022年,我们为其新增了机器学习任务调度的能力,逐步开箱即用式地支持主流的MLops项目/服务商的功能。


Apache DolphinScheduler 目前已经支持的MLOps工具包括MLflow,DVC,Jupyter,OpenMLDB等任务组件,可以让用户低成本,更容易地编排机器学习系统。


本文为大家介绍如何使用Apache DolphinScheduler来打造一个机器学习选股系统,每日自动更新选股模型,智能选股,在交易时间持续监控模型选股效果。





Apache DolphinScheduler

1

系统介绍概况


01

概况



系统展示
系统展示如下,为2022年6月24的情况:
  • Machine learning stock picking daily dashboard
    为2022-06-23晚上选出的股票,在2022年6月24的实时表现,包括涨跌幅,涨速等信息

  • The real-time average returns of the top 10 stocks are as follows
    实时展示了选出的10个股票在日内平均的收益走势情况

  • The distribution of the ups and downs of stocks selected by the system
    实时展示选出的10个股票的涨跌幅分布

  • The distribution of gains and losses across the stock market
    实时展示了整个市场的股票的涨跌幅分布

系统运行原理

选股逻辑

计算整个股票市场中符合五日均线高于10日均线信号的股票作为标的池,对于上述股票池中的每个股票构建以下特征用于训练模型:

1. 股价与布林带三条轨道的相对值
2. 股价与多条均线的相对值;多条均线的多个间隔的斜率
3. 当前K线的形态,用talib pattern计算,如是否为三只乌鸦,是否为十字星等,详情可见K线模式识别(https://www.jianshu.com/p/fd5c7f49db33)

备注:你也可以加上任何你认为有用的信息作为特征。

模型训练

以第二天是否上涨为准进行二分类,使用AutoML工具flaml进行5分钟训练,每天构建120天的数据集,其中后7天用于评估。

胜率与盈亏比得到的效果如下:

• 单纯使用五日均线高于10日均线买卖时:胜率0.46,盈亏比 1.15,期望约为-0.011;
• 以五日均线高于10日均线买卖为信号加上机器学习模型后,胜率为0.58,盈亏比为1.35,期望约为0.363。

以上为同时期不加模型策略与加模型策略的对比,期望从 -0.011 提升到 0.363。

备注:以上对比为2022-06-23当日模型训练(训练集22万条,测试集1.7万条)完后的评估指标,其中模型每天选择置信度前10(可以认为是模型认为上涨概率最大的前10)的股票;

实盘的表现,可见系统展示中2022-06-24的实时表现。

任务调度

系统中所有的任务调度均使用Apache DolphinScheduler完成,包括数据处理,特征工程,模型训练,模型评估,模型上线,批量实时预测,效果监控等。

Apache DolphinScheduler可以定时每天晚上自动更新模型,并上线进行预测。在当天模型表现不好时,可以调参调特征一键重新训练新的模型并评估上线。

还有其对任务的容错机制,可以保证系统能够稳定运行。

前端展示

前端展示使用Observable实现,得益于其Notebook丰富和易用的可视化数据分析特性,构建出监控实时选股系统的效果监控。

技术栈

主要涉及的技术能力如下:

关于量化知识,因用户水平不同可以进行不同的选股逻辑和特征工程。

该系统旨在帮助用户构建符合自己认知的选股系统(当你有很多的选股参考信息,又不知道如何组合才能更好的做决策时)。

项目中涉及到的工作流任务的实现可以在这里找到 GitHub - Apache DolphinScheduler-MLOps-Stock-Analysis(https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis)

02

模块介绍



模块介绍章节涉及到整个系统每个模块具体的实现代码与细节,你也可以根据阅读习惯,先跳到文章最后总结章节看完再回来这里继续阅读。

后端模块介绍

下图所示是一个整体的工作流 run_system,该工作流每天晚上定时启动,进行数据更新,模型训练,模型评估,模型部署和推理,然后推荐每天的股票池。

run_system 工作流包含4个子工作流:

• prepare_datas : 每日数据下载,信号计算,特征(在量化交易中成为因子)计算
• training_model:生成训练数据,训练模型,对模型进行评估
• deployment:部署模型
• batch_inference:生成要批量算法预测的股票,并进行预测。


prepare_datas

下图所示是数据准备的工作流 prepare_datas,该工作流会下载股票数据并进行信号计算和特征计算:

• download_data : 下载全市场的股票日线数据
calc_signals:进行信号计算(计算每天符合信号条件的股票,如每天5日均线与10日均线金叉的股票)
 calc_features: 特征(量化交易中称为因子)计算(计算每个股票每天的特征值,如收盘价与5日均线的相对值,股票是否是十字星形态等)


download_data

任务类型为Shell 任务类型,用于下载股票市场的日线数据。

# 添加项目路径,为Git clone 项目Apache DolphinScheduler-MLOps-Stock-Analysis 下来后地址,方便Python直接在别的目录运行该项目的脚本
    export PYTHONPATH=${project}

    # 激活Python环境
      source ${project}/env/bin/activate
      # 设置股票数据下载的路径
      data_path=${project}/data/daily

      # 下载数据到指定路径
        python -m dmsa.data.download ${data_path}
        复制

        其中自定义参数中的设置的意思为启动工作流时传入project参数的值,即可替换shell中的${project},如传入/home/user/Apache DolphinScheduler-MLOps-Stock-Analysis,则第一行会变成 export PYTHONPATH=/home/user/Apache DolphinScheduler-MLOps-Stock-Analysis,下文如无特殊,将不再介绍该Apache DolphinScheduler的特性。

        calc_signals

        任务类型为Shell 任务类型,根据前一个任务中下载完的数据计算所有的信号。

        # 添加项目路径,为Git clone 项目Apache DolphinScheduler-MLOps-Stock-Analysis 下来后地址,方便python直接在别的目录运行该项目的脚本
          export PYTHONPATH=${project}
          # 激活python环境
            source ${project}/env/bin/activate
            # 设置股票数据下载的路径
              data_path=${project}/data/daily
              # 根据feature_signal.txt的配置,计算信号,详情可以见项目中的实现
                python -m dmsa.data_processing.calc_signals \
                --data_path ${data_path} \
                --name_file ${project}/feature_signal.txt
                复制
                计算好的信号会自动存在Mysql中,Mysql的配置可以参考Apache DolphinScheduler-MLOps-Stock-Analysis(https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis),进行简单配置即可。

                calc_features

                任务类型为Shell 任务类型,根据前一个任务中下载完的数据计算所有的特征。

                # 添加项目路径,为Git clone 项目Apache DolphinScheduler-MLOps-Stock-Analysis 下来后地址,方便python直接在别的目录运行该项目的脚本
                  export PYTHONPATH=${project}
                  # 激活python环境
                    source ${project}/env/bin/activate
                    # 设置股票数据下载的路径
                      data_path=${project}/data/daily
                      # 根据feature_signal.txt的配置,计算特征,详情可以见项目中的实现
                        python -m dmsa.data_processing.calc_features \
                        --data_path ${data_path} \
                        --name_file ${project}/feature_signal.txt
                        复制
                        计算好的特征会自动存在Mysql中,Mysql的配置可以参考Apache DolphinScheduler-MLOps-Stock-Analysis(https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis),进行简单配置即可。

                        如果需要更强大的特征计算与存储能力,可以使用Apache DolphinScheduler中的OpenMLDB组件(https://dolphinscheduler.apache.org/zh-cn/docs/dev/user_doc/guide/task/openmldb.html进行修改即可(可参考Apache DolphinScheduler OpenMLDB Task:打造端到端MLOps工作流 https://openmldb.ai/docs/zh/main/use_case/dolphinscheduler_task_demo.html)。

                        training_model

                        下图为工作流 training_model 的各个任务的DAG图,该工作流主要执行以下任务:

                        • prepare_data : 准备模型训练数据和模型评估数据
                        • training:训练模型
                        • Deployment: 部署刚刚训练的模型用户推理评估数据
                        • evaluate:评估模型,因为该评估的指标为胜率以及盈亏比,所以进行另外的计算评估。
                        • close_service:评估完成后关闭刚刚部署的服务


                        prepare_data

                        任务类型为Shell 任务类型,准备训练数据用于训练模型,准备测试数据用于评估。

                          export PYTHONPATH=${project}
                          source ${project}/env/bin/activate
                          save_data_path=${project}/data/training

                          # 生成数据集
                            python -m dmsa.data_processing.build_datas \
                            --task_type train \
                            --config ${project}/feature_signal.txt \
                            --save_path ${save_data_path} \
                            --data_path ${project}/data/daily

                            # 把生成的数据集所对应的路径 save_data_path 赋值到变量 data_path 中,并通过自定义参数设为OUT,传递给下游任务
                              echo "#{setValue(data_path=${save_data_path})}"
                              复制

                              training

                              该任务类型为MLflow任务类型,由Apache DolphinScheduler内置提供,可以用于输入一份数据集,自动进行模型训练。

                              上图表示,使用AutoML根据传入的数据(csv格式)进行120秒训练,AutoML的工具使用flaml。设置模型实验名称为baseline,模型将上传至MLflow服务中心,并注册模型名字为baseline。

                              更多的特性,可以查看Apache DolphinScheduler MLflow 组件(https://dolphinscheduler.apache.org/zh-cn/docs/dev/user_doc/guide/task/mlflow.html)。

                              Deployment

                              该任务类型为MLflow任务类型,由Apache DolphinScheduler内置提供,可以部署MLflow服务中心的模型。

                              上图表示,使用将模型名字为baseline,版本号为Production的模型,以Docker形式部署并保留7070端口。

                              更多的特性,可以查看Apache DolphinScheduler MLflow 组件(https://dolphinscheduler.apache.org/zh-cn/docs/dev/user_doc/guide/task/mlflow.html)。

                              evaluate

                              任务类型为Shell 任务类型,用户请求服务获取评估数据的结果并进行评估。

                                export PYTHONPATH=${project}
                                source ${project}/env/bin/activate

                                # 表示每天选择分数最高的3个股票,测试效果
                                  python -m dmsa.evaluate.calc_evaluate \
                                  --source_data_path ${data_path}/source.csv \
                                  --evaluate_data ${data_path}/test.csv \
                                  --api 'http://127.0.0.1:7070/invocations' \
                                  --top_n 3
                                  复制

                                  close_service

                                  任务类型为Shell任务类型,用与关闭刚才启动的模型服务
                                  # 刚才的模型服务会运行一个docker容器提供服务,容器名字会统一命名为 ds-mlflow-{model_name}-{model_version}
                                  # 因此可以简单通过一行命令关闭服务
                                    docker rm -f ds-mlflow-baseline-Production
                                    复制

                                    Deployment

                                    与上面的Deployment几乎一样,另外配一个7000端口用于推理服务。

                                    batch_inference

                                    工作流 batch_inference 用于对需要在线跑的任务进行推理,包含两个任务:
                                    • build_inference_data:生成需要推理的数据
                                    • inference: 调用接口进行推理

                                    build_inference_data

                                    任务类型为Shell 任务类型,用于生成需要推理的数据。

                                      export PYTHONPATH=${project}
                                      source ${project}/env/bin/activate
                                      save_data_path=${project}/data/inference.csv

                                      # 用于生成需要推理的数据
                                        python -m dmsa.data_processing.build_datas \
                                        --task_type inference \
                                        --config ${project}/feature_signal.txt \
                                        --save_path $save_data_path


                                        echo "#{setValue(data_path=${save_data_path})}"
                                        复制

                                        inference

                                        任务类型为Shell 任务类型,根据上一个任务中生成的推理数据进行推理,并将分数前10的股票存到数据库中。

                                          export PYTHONPATH=${project}
                                          source ${project}/env/bin/activate
                                          # 推理,服务地址为 http://127.0.0.1:7000/invocations, 7000是刚才配的端口,invocations是MLflow服务默认的路径
                                            python -m dmsa.evaluate.inference \
                                            --evaluate_data ${data_path} \
                                            --api 'http://127.0.0.1:7000/invocations' \
                                            --top_n 10
                                            复制

                                            Monitoring

                                            另外还有一个实时监控的工作流 monitoring ,用于实时计算监控数据,并存入数据库中,实时监控模型选出股票的效果。

                                            目前包含两个任务,通过 Apache DolphinScheduler 每隔10秒调度启动:
                                            • spot:持续更新市场全貌数据,包括涨跌幅,涨速等
                                            • kline: 从K线层面持续对模型选择的股票的表现进行收益跟踪


                                            spot

                                            定时监控全貌的整体数据,包括获取全部股票当下时间点的涨跌幅,涨速等。

                                            Kline

                                            定时监控昨天选出来的股票在最新交易日的实时收益走势。

                                            前端模块介绍

                                            前端目前用Observable来进行展示,也可以使用Juptyer来替代。

                                            Observable

                                            Observable是一个以JavaScript基础进行拓展来做计算的Notebook,能够让很多不懂代码的数据分析,可视化工作人员简单、低成本地使用强大可视化工具的产品,个人版免费。

                                            目前主要展示了三个图示来监控模型效果:
                                            • 每日推荐股票的实时情况
                                            • 每日推荐股票的实时涨跌分布
                                            • 市场全貌的实时涨跌分布

                                            每日推荐股票的实时情况

                                            通过join 每日推荐股票标,以及市场全貌标来展示,每5秒刷新一次,下面两段代码可以直接copy到Observable的Notebook中即可:
                                            // 获取数据
                                              stockDatas = {
                                              let i = 0;
                                              while (true) {
                                              try {
                                              const data = await db.query("SELECT a.date, a.y_pred, a.score, b.* FROM candidate a LEFT JOIN spot b ON a.code = b.code WHERE a.y_pred=1;")
                                              yield Promises.delay(5000, data);
                                              } catch(e){
                                              console.log(e.name + ":" + e.message);
                                              }
                                              }
                                              }
                                              复制
                                              // 转成表格
                                                stocks = Inputs.table(stockDatas)
                                                复制


                                                每日推荐股票的实时收益曲线

                                                用过Observable其中自带的曲线图来表示
                                                  Plot.plot({
                                                  marks: [
                                                  Plot.ruleY([0]),
                                                  Plot.lineY(changesDatas, {x: "time", y: "changes"})
                                                  ]
                                                  })
                                                  复制

                                                  每日推荐股票的实时涨跌分布

                                                  用过Observable其中自带的柱状图来表示:
                                                    Plot.plot({
                                                    marks: [
                                                    Plot.rectY(stockDatas, Plot.binX({y: "count"}, {x: "涨跌幅"})),
                                                    Plot.ruleY([0])
                                                    ],
                                                    })
                                                    复制

                                                    市场全貌的实时涨跌分布

                                                    用过Observable其中自带的柱状图来表示:

                                                      // 从数据库中读取数据
                                                      spotDatas = db.query("SELECT * FROM spot;")
                                                      复制
                                                      // 画图
                                                      Plot.plot({
                                                      marks: [
                                                      Plot.rectY(spotDatas, Plot.binX({y: "count"}, {x: "涨跌幅"})),
                                                      Plot.ruleY([0])
                                                      ],
                                                      x: {ticks: 20}
                                                      })
                                                      复制

                                                      2

                                                      总结


                                                      本文展示了使用Apache DolphinScheduler调度机器学习系统中的各类模块,Observable作为前端展示与可视化能力的智能股票选股系统,希望能给大家带来以下收获:

                                                      1. 了解如何使用Apache DolphinScheduler构建一个选股系统。
                                                      2. 了解Apache DolphinScheduler灵活简单编排MLOps场景下各个模块的方法。
                                                      3. 了解Apache DolphinScheduler MLflow组件赋予用户的0成本使用机器学习能力。
                                                      4. 了解Observable这个强大的可视化产品的能力,以及与Apache DolphinScheduler的结合使用。

                                                      之后,我将持续为大家带来Apache DolphinScheduler在MLOps领域的更多尝试与介绍,以下为接下来会推出的文章:

                                                      • 《续:Apache DolphinSchedule的机器学习选股系统 手把手教学》
                                                      • 《基于Apache DolphinScheduler Jupyter组件的机器学习模型训练与可视化实践》

                                                      PS:

                                                      • 如果你对Apache DolphinScheduler构建机器学习系统感兴趣可以添加小助手微信(Leonard-ds),加入社区一起参与交流;
                                                      • 欢迎有Apache DolphinScheduler构建机器学习系统或者调度机器学习任务经验的同学投稿(添加小助手微信Leonard-ds)。

                                                      参与贡献


                                                      随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。


                                                      参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:


                                                      贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。


                                                      社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689


                                                      非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22


                                                      如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/docs/development/contribute.html


                                                      来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。


                                                      参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。


                                                      添加小助手微信时请说明想参与贡献。


                                                      来吧,开源社区非常期待您的参与。



                                                      更多精彩推荐

                                                      开源大数据 Studio 应用开发: Apache Dolphinscheduler + Notebook

                                                      ☞当 Apache DolphinScheduler 遇上 MLOps,机器学习模型部署到生产环境更快、更安全

                                                      ☞日均 6000+ 实例,TB 级数据流量,Apache DolphinScheduler 如何做联通医疗大数据平台的“顶梁柱”?

                                                      ☞中国联通改造 Apache DolphinScheduler 资源中心,实现计费环境跨集群调用与数据脚本一站式访问

                                                      ☞又是一年开源之夏,八大课题项目奖金等你来拿!


                                                      我知道你在看

                                                      文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                      评论