apache flink:类不能通过给定的类加载器解析

eit6fx6z  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(877)

我想通过flink streams了解一些Kafka的主题。下面是流的代码。

object VehicleFuelEventStream {

  def sink[IN](hosts: String, table: String, ds: DataStream[IN]): CassandraSink[IN] = {
    CassandraSink.addSink(ds)
      .setQuery(s"INSERT INTO global.$table values (id, vehicle_id, consumption, from, to, version) values (?, ?, ?, ?, ?, ?);")
      .setClusterBuilder(new ClusterBuilder() {
        override def buildCluster(builder: Cluster.Builder): Cluster = {
          builder.addContactPoints(hosts.split(","): _*).build()
        }
      }).build()
  }

  def dataStream[T: TypeInformation](env: StreamExecutionEnvironment,
                                     source: SourceFunction[String],
                                     flinkConfigs: FlinkStreamProcessorConfigs,
                                     windowFunction: WindowFunction[VehicleFuelEvent, (String, Double, SortedMap[Date, Double], Long, Long), String, TimeWindow]): DataStream[(String, Double, SortedMap[Date, Double], Long, Long)] = {

    val fuelEventStream = env.addSource(source)
      .map(e => EventSerialiserRepo.fromJsonStr[VehicleFuelEvent](e))
      .filter(_.isSuccess)
      .map(_.get)
      .assignTimestampsAndWatermarks(VehicleFuelEventTimestampExtractor.withWatermarkDelay(flinkConfigs.windowWatermarkDelay))
      .keyBy(_.vehicleId) // key by VehicleId
      .timeWindow(Time.minutes(flinkConfigs.windowTimeWidth.toMinutes))
      .apply(windowFunction)

    fuelEventStream
  }
}

当PlayFramework通过GoogleGuice在启动时创建它的依赖项时,流被触发,如下所示。

@Singleton
class VehicleEventKafkaConsumer @Inject()(conf: Configuration,
                                          lifecycle: ApplicationLifecycle,
                                          repoFactory: StorageFactory,
                                          cache: CacheApi,
                                          cassandra: Cassandra,
                                          fleetConfigs: FleetConfigManager) {

  private val kafkaConfigs = KafkaConsumerConfigs(conf)
  private val flinkConfigs = FlinkStreamProcessorConfigs(conf)
  private val topicsWithClassTags = getClassTagsForTopics
  private val cassandraConfigs = CassandraConfigs(conf)

  private val repoCache = mutable.HashMap.empty[String, CachedSubjectStatePersistor]

  private val props = new Properties()
  // add props

  private val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.enableCheckpointing(flinkConfigs.checkpointingInterval.toMillis)

  if (flinkConfigs.enabled) {
    topicsWithClassTags.toList
      .map {
        case (topic, tag) if tag.runtimeClass.isAssignableFrom(classOf[VehicleFuelEvent]) =>
          Logger.info(s"starting for - $topic and $tag")
          val source = new FlinkKafkaConsumer09[String](topic, new SimpleStringSchema(), props)
          val fuelEventStream = VehicleFuelEventStream.dataStream[String](env, source, flinkConfigs, new VehicleFuelEventWindowFunction)
          VehicleFuelEventStream.sink(cassandraConfigs.hosts, flinkConfigs.cassandraTable, fuelEventStream)

        case (topic, _) =>
          Logger.info(s"no stream processor found for topic $topic")
      }

    Logger.info("starting flink stream processors")
    env.execute("flink vehicle event processors")

  } else
    Logger.info("Flink stream processor is disabled!")
}

我在应用程序启动时遇到以下错误。

03/13/2018 05:47:23 TriggerWindow(TumblingEventTimeWindows(1800000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e899e41f}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.scala:582)) -> Sink: Cassandra Sink(4/4) switched to RUNNING 
2018-03-13 05:47:23,262 - [info] o.a.f.r.t.Task - TriggerWindow(TumblingEventTimeWindows(1800000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e899e41f}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.scala:582)) -> Sink: Cassandra Sink (3/4) (d5124be9bcef94bd0e305c4b4546b055) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: streams.fuelevent.VehicleFuelEventWindowFunction
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:745)
2018-03-13 05:47:23,262 - [info] o.a.f.r.t.Task - TriggerWindow(TumblingEventTimeWindows(1800000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e899e41f}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.scala:582)) -> Sink: Cassandra Sink (2/4) (6b3de15a4f6

.....

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
    at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:394)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:103)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: streams.fuelevent.VehicleFuelEventStream$$anonfun$4
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
2018-03-13 05:47:23,336 - [info] o.a.f.r.t.Task - Source: Custom Source -> Map -> Filter -> Map -> Timestamps/Watermarks (4/4) (97b2313c985592fdec0ac4f7fba8062f) switched from RUNNING to FAILED.

依赖项

// kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "1.0.0"

//flink
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.4.2"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.4.0"
libraryDependencies += "org.apache.flink" %% "flink-connector-cassandra" % "1.4.2"

任何帮助都能解决这个问题。

tkqqtvp1

tkqqtvp11#

看起来flink找不到这个类streams.fuelevent.vehiclefueleventstream
是flink类路径中的类还是jar文件中的类?
Flink医生能给我们带来一些线索https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html

相关问题