暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

五分钟快速体验streampark提交Flink SQL/JAR作业

大数据从业者 2023-03-26
4155

项目背景  

StreamPark是一个流应用程序开发管理平台,提供了使用Apache Flink和Apache Spark编写流处理应用程序的开发框架,未来将支持更多其他引擎。StreamPark项目初衷就是:让流处理更简单。该项目降低了学习成本和开发门槛, 开发者只关心业务逻辑即可; 规范了项目的配置,鼓励函数式编程;定义了最佳的编程方式,提供一系列开箱即用的Connectors;标准化了配置、开发、测试、部署、监控、运维的整个过程;提供了scala和java两套api。       

源码编译  

下载源码
    git clone -b v2.0.0  https://github.com/apache/incubator-streampark.git
    执行编译脚本,选择编译模式、scala版本。
      sh build.sh
      编译完成,安装包路径:
        /home/mySourceCode/incubator-streampark/dist

        安装部署  

        解压安装包
          tarxvf apache-streampark_2.12-2.0.0-incubating-bin.tar.gz  
          修改配置
            vim conf/application.yml
            spring:
            profiles.active: mysql
            vim conf/application-mysql.yml
            mysql用户名、密码、IP
            初始化数据库文件,登录mysql执行
              source /home/myHadoopCluster/apache-streampark_2.12-2.0.0-incubating-bin/script/schema/mysql-schema.sql
              source /home/myHadoopCluster/apache-streampark_2.12-2.0.0-incubating-bin/script/data/mysql-data.sql
                
              启动服务
                添加export HADOOP_HOME=/usr/hdp/3.1.5.0-152/hadoop到bin/startup.sh。
                sh startup.sh
                默认用户名密码登录(admin/streampark)管理界面: http://felixzh:10000     

                设置FlinkHome  

                左侧目录StreamPark子目录Setting中设置Flink Home(/home/myHadoopCluster/flink-1.16.1/):

                SQL实践案例  

                左侧目录StreamPark子目录Application自带测试用例Flink SQL Demo:
                  CREATE TABLE datagen (
                  f_sequence INT,
                  f_random INT,
                  f_random_str STRING,
                  ts AS localtimestamp,
                  WATERMARK FOR ts AS ts
                  ) WITH (
                  'connector' = 'datagen',
                  -- optional options --
                  'rows-per-second'='5',
                  'fields.f_sequence.kind'='sequence',
                  'fields.f_sequence.start'='1',
                  'fields.f_sequence.end'='500',
                  'fields.f_random.min'='1',
                  'fields.f_random.max'='500',
                  'fields.f_random_str.length'='10'
                    );  
                  CREATE TABLE print_table (
                  f_sequence INT,
                  f_random INT,
                  f_random_str STRING
                  ) WITH (
                  'connector' = 'print'
                    );    
                  INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;
                  通过Operation->Edit Application选择Flink Version(即Setting中设置的Flink Home的别名) 和Execution Mode(即作业运行模式):
                  保存提交。
                  通过Operation->Launch Application按钮进行上线。
                  通过Operation->Start Application启动该测试用例的SQL作业。
                  支持从savepoint恢复启动,这里选OFF:
                  启动成功,如图所示:
                  吐槽一点:不支持select。

                  JAR实践案例  

                  通过Add New按钮新建Application,如下:
                  保存提交。
                  同样的方式发布上线、启动。
                  吐槽一点:上传Jar以后不会自动填充ProgramMainClass。Flink原生UI已经支持,我抽离出来对应类进行过单独测试。
                    https://github.com/felixzh2020/felixzh-learning-flink/tree/master/JarManifestParserDemo

                    文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论