我想通过flink streams了解一些Kafka的主题。下面是流的代码。
object VehicleFuelEventStream {
def sink[IN](hosts: String, table: String, ds: DataStream[IN]): CassandraSink[IN] = {
CassandraSink.addSink(ds)
.setQuery(s"INSERT INTO global.$table values (id, vehicle_id, consumption, from, to, version) values (?, ?, ?, ?, ?, ?);")
.setClusterBuilder(new ClusterBuilder() {
override def buildCluster(builder: Cluster.Builder): Cluster = {
builder.addContactPoints(hosts.split(","): _*).build()
}
}).build()
}
def dataStream[T: TypeInformation](env: StreamExecutionEnvironment,
source: SourceFunction[String],
flinkConfigs: FlinkStreamProcessorConfigs,
windowFunction: WindowFunction[VehicleFuelEvent, (String, Double, SortedMap[Date, Double], Long, Long), String, TimeWindow]): DataStream[(String, Double, SortedMap[Date, Double], Long, Long)] = {
val fuelEventStream = env.addSource(source)
.map(e => EventSerialiserRepo.fromJsonStr[VehicleFuelEvent](e))
.filter(_.isSuccess)
.map(_.get)
.assignTimestampsAndWatermarks(VehicleFuelEventTimestampExtractor.withWatermarkDelay(flinkConfigs.windowWatermarkDelay))
.keyBy(_.vehicleId) // key by VehicleId
.timeWindow(Time.minutes(flinkConfigs.windowTimeWidth.toMinutes))
.apply(windowFunction)
fuelEventStream
}
}
当PlayFramework通过GoogleGuice在启动时创建它的依赖项时,流被触发,如下所示。
@Singleton
class VehicleEventKafkaConsumer @Inject()(conf: Configuration,
lifecycle: ApplicationLifecycle,
repoFactory: StorageFactory,
cache: CacheApi,
cassandra: Cassandra,
fleetConfigs: FleetConfigManager) {
private val kafkaConfigs = KafkaConsumerConfigs(conf)
private val flinkConfigs = FlinkStreamProcessorConfigs(conf)
private val topicsWithClassTags = getClassTagsForTopics
private val cassandraConfigs = CassandraConfigs(conf)
private val repoCache = mutable.HashMap.empty[String, CachedSubjectStatePersistor]
private val props = new Properties()
// add props
private val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(flinkConfigs.checkpointingInterval.toMillis)
if (flinkConfigs.enabled) {
topicsWithClassTags.toList
.map {
case (topic, tag) if tag.runtimeClass.isAssignableFrom(classOf[VehicleFuelEvent]) =>
Logger.info(s"starting for - $topic and $tag")
val source = new FlinkKafkaConsumer09[String](topic, new SimpleStringSchema(), props)
val fuelEventStream = VehicleFuelEventStream.dataStream[String](env, source, flinkConfigs, new VehicleFuelEventWindowFunction)
VehicleFuelEventStream.sink(cassandraConfigs.hosts, flinkConfigs.cassandraTable, fuelEventStream)
case (topic, _) =>
Logger.info(s"no stream processor found for topic $topic")
}
Logger.info("starting flink stream processors")
env.execute("flink vehicle event processors")
} else
Logger.info("Flink stream processor is disabled!")
}
我在应用程序启动时遇到以下错误。
03/13/2018 05:47:23 TriggerWindow(TumblingEventTimeWindows(1800000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e899e41f}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.scala:582)) -> Sink: Cassandra Sink(4/4) switched to RUNNING
2018-03-13 05:47:23,262 - [info] o.a.f.r.t.Task - TriggerWindow(TumblingEventTimeWindows(1800000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e899e41f}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.scala:582)) -> Sink: Cassandra Sink (3/4) (d5124be9bcef94bd0e305c4b4546b055) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: streams.fuelevent.VehicleFuelEventWindowFunction
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
2018-03-13 05:47:23,262 - [info] o.a.f.r.t.Task - TriggerWindow(TumblingEventTimeWindows(1800000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e899e41f}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.scala:582)) -> Sink: Cassandra Sink (2/4) (6b3de15a4f6
.....
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:394)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: streams.fuelevent.VehicleFuelEventStream$$anonfun$4
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
2018-03-13 05:47:23,336 - [info] o.a.f.r.t.Task - Source: Custom Source -> Map -> Filter -> Map -> Timestamps/Watermarks (4/4) (97b2313c985592fdec0ac4f7fba8062f) switched from RUNNING to FAILED.
依赖项
// kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "1.0.0"
//flink
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.4.2"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.4.0"
libraryDependencies += "org.apache.flink" %% "flink-connector-cassandra" % "1.4.2"
任何帮助都能解决这个问题。
1条答案
按热度按时间tkqqtvp11#
看起来flink找不到这个类streams.fuelevent.vehiclefueleventstream
是flink类路径中的类还是jar文件中的类?
Flink医生能给我们带来一些线索https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html