暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink trouble shooting(1)

BigData Scholar 2021-08-09
2455

Flink trouble shooting(1)

报错1:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.Catalog

报错2:No ExecutorFactory found to execute the application

一个简单的热门商品统计代码,用table API & SQL实现,flink版本从1.10.1变成1.13.0后,就出现:

Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.Catalog

pom.xml如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>UserBehaviorAnalysis</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <kafka.version>2.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>
                            jar-with-dependencies
                        </descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

table API  scala代码如下:

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStreamStreamExecutionEnvironment}
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions}
import org.apache.flink.table.api._
import org.apache.flink.types.Row

object HotItemTableApi {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val path = this.getClass.getClassLoader.getResource("UserBehavior.csv").getPath
    val dataStream: DataStream[UserBehavior] = env
      .readTextFile(path)
      .map(line => {
        val userBehaviorArray: Array[String] = line.split(",")
        UserBehavior(userBehaviorArray(0).toLong, userBehaviorArray(1).toLong, userBehaviorArray(2).toLong, userBehaviorArray(3), userBehaviorArray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp*1000)

    val settings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)



    val dataTable: Table = tableEnv.fromDataStream(dataStream, $"itemId", $"behavior", $"timestamp".rowtime() as "ts")


    val aggTable: Table = dataTable
      .filter($"behavior" === "pv")
      .window(Slide over 1.hours every 5.minutes on $"ts" as "sw")
      .groupBy($"itemId", $"sw")
      .select($"itemId", $"sw".end as "windowEnd", $"itemId".count as "cnt")

    tableEnv.createTemporaryView("aggTable", aggTable)

    tableEnv.sqlQuery(
      """
        |select
        | *
        |from
        |(
        |select
        |   *,
        |   row_number()over(partition by windowEnd order by cnt desc) as rn
        |from aggTable
        |) t where rn <= 3
        |"
"".stripMargin).toRetractStream[Row].print()

    env.execute("sql")
  }
}


代码没有错误,但执行时,这一行报错

val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)

错误信息为:

Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.Catalog

查遍了资料都完全没有类似错误

心态炸裂!!!!!!!!!!!!!!!!!!!

于是先放弃table API的方式,将之前写的DataStream API的代码执行一遍。

代码如下:

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListStateListStateDescriptor}
import org.apache.flink.api.java.tuple.{TupleTuple1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.sql.Timestamp
import scala.collection.mutable.ListBuffer

case class UserBehavior(userId: Long, itemId: Long, category: Long,behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

object HotItems {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val path = this.getClass.getClassLoader.getResource("UserBehavior.csv").getPath
    env
      .readTextFile(path)
      .map(line => {
        val userBehaviorArray: Array[String] = line.split(",")
        UserBehavior(userBehaviorArray(0).toLong, userBehaviorArray(1).toLong, userBehaviorArray(2).toLong,userBehaviorArray(3), userBehaviorArray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000)
      .filter(_.behavior == "pv")
      .keyBy("itemId")
      .timeWindow(Time.hours(1), Time.minutes(5))
      .aggregate(new CountAgg(), new ResultWindowFunc())
      .keyBy("windowEnd")
      .process(new HotTopItems(3))
      .print()


    env.execute("test4")
  }

  class CountAgg(extends AggregateFunction[UserBehaviorLongLong{
    override def createAccumulator(): Long = 0L

    override def add(in: UserBehavior, acc: Long): Long = acc + 1

    override def getResult(acc: Long): Long = acc

    override def merge(acc: Long, acc1: Long): Long = acc + acc1
  }

  class ResultWindowFunc(extends WindowFunction[LongItemViewCountTupleTimeWindow{
    override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
      val itemId = key.asInstanceOf[Tuple1[Long]].f0
      out.collect(ItemViewCount(itemId = itemId, windowEnd = window.getEnd, count = input.iterator.next))
    }
  }

  class HotTopItems(size: Intextends KeyedProcessFunction[Tuple,ItemViewCount,String{

    private var itemState: ListState[ItemViewCount] = _

    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount]))
    }
    override def processElement(i: ItemViewCount, context: KeyedProcessFunction[TupleItemViewCountString]#Context, collector: Collector[String]): Unit = {
      itemState.add(i)
      context.timerService().registerEventTimeTimer(i.windowEnd + 1)
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[TupleItemViewCountString]#OnTimerContext, out: Collector[String]): Unit = {
      super.onTimer(timestamp, ctx, out)
      val listBuffer = new ListBuffer[ItemViewCount]()
      import scala.collection.JavaConversions._
      for(item <- itemState.get){
        listBuffer += item
      }

      itemState.clear()

      val topList: ListBuffer[ItemViewCount] = listBuffer.sortBy(_.count)(Ordering.Long.reverse).take(size)

      val sb = new StringBuilder()
      sb.append("=====================\n")
      sb.append("time: ").append(new Timestamp(timestamp - 1)).append("\n")
      for(i <- topList.indices){
        val currentItem = topList(i)
        sb.append("No").append(i)
          .append(": itemID = ").append(currentItem.itemId)
          .append("  count = ").append(currentItem.count).append("\n")
      }
      sb.append("=====================\n\n")
      Thread.sleep(1000)
      out.collect(sb.toString)
    }
  }

}


发现之前成功运行的代码也出现错误:

No ExecutorFactory found to execute the application

于是再进行面向Google编程,终于查到问题。

https://programmerah.com/flink-1-1-error-no-executorfactory-found-to-execute-the-application-24628/

升级为v1.13.0后pom.xml需要加上 Flink clients 依赖,记得把<scope>标签去掉

https://stackoverflow.com/questions/63600971/no-executorfactory-found-to-execute-the-application-in-flink-1-11-1

按以上方式操作后,DataStream API的代码执行成功,然后再抱着试一试的心态执行了Table API的代码,居然也执行成功了,所以报那个错误是什么意思,从解决结果来看跟catalog并没有什么关系,百思不得其解,希望有懂哥指点~




文章转载自BigData Scholar,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论