暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

每隔5分钟输出最近一小时内点击量最多的前N个商品(SQL实现版)

原创 ♠K 2023-08-31
87

package com.zjc.flow_analysis.hotitems_analysis

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Slide}
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import java.sql.Timestamp
import java.util.Properties

object HotItemsSQL {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop103:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val inputStream = env.addSource(new FlinkKafkaConsumer[String]("hotItems", new SimpleStringSchema(), properties))
val dataStream = inputStream.map(data => {
val arrayData = data.split(",")
UserBehavior(arrayData(0).toLong, arrayData(1).toLong, arrayData(2).toLong, arrayData(3).toString,arrayData(4).toLong)
}).assignAscendingTimestamps(_.timestamp * 1000L)

val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
// 将dataStream转为表
tableEnv.createTemporaryView("dataTable",dataStream, 'itemId, 'behavior, 'timestamp.rowtime as 'ts)
val resultTalbe = tableEnv.sqlQuery(
"""
|select *
|from (
| select *,
| row_number() over(partition by windowEnd order by cnt desc) as row_num
| from (
| select itemId, count(itemId) as cnt,
| hop_end(ts, interval '5' minute, interval '1' hour) as windowEnd
| from dataTable
| where behavior='pv'
| group by itemId, hop(ts, interval '5' minute, interval '1' hour)
| )
|)
|where row_num <= 5
|""".stripMargin
)
resultTalbe.toRetractStream[(Long, Long,Timestamp, Long)].print("result")
env.execute("商品热门统计(sql版实现)")

}
}

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论