如何从flink processallwindowfunction返回集合

6ie5vjzr  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(378)

我试图返回元素列表(我简化了问题),但得到一个奇怪的异常:

class AggregateData extends ProcessAllWindowFunction[String, SimpleAggregate, TimeWindow ] {
  override def process(context: Context, elements: Iterable[String], out: Collector[SimpleAggregate]): Unit = {
    out.collect(elements.toList)
  }
}
env.readTextFile(...)
    .timeWindowAll(Time.minutes(1))
    .process(new AggregateData)
    .print()
  env.execute("foo")

Exception in thread "main" java.util.concurrent.ExecutionException: scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot initialize the compiler due to java.lang.BootstrapMethodError: java.lang.NoSuchMethodError: scala.collection.immutable.List.$anonfun$flatMap$1$adapted(Lscala/runtime/BooleanRef;Lscala/runtime/ObjectRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)Ljava/lang/Object;
at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2348)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2320)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题