我在flink scala repl中尝试了这段代码,并在java中构建了richmapfunction,但scala无法工作。有人能给我关于scala方法重载策略的文档链接吗?
java富Map函数
public class RichMapFunctions {
public static class CountRichMapFunction<E, T> extends RichMapFunction<E, T> {
private static final long serialVersionUID = 1L;
private transient Counter counter;
@Override
public void open(Configuration parameters) throws Exception {
this.counter = getRuntimeContext().getMetricGroup().counter("demo");
}
@Override
public T map(E value) throws Exception {
this.counter.inc();
return (T) value;
}
}
}
java flinkimain扩展imain导入
private static String[] PACKAGE_IMPORTS = new String[] { //
"org.apache.flink.core.fs._", //
"org.apache.flink.core.fs.local._", //
"org.apache.flink.api.common.io._", //
"org.apache.flink.api.common.aggregators._", //
"org.apache.flink.api.common.accumulators._", //
"org.apache.flink.api.common.distributions._", //
"org.apache.flink.api.common.operators._", //
"org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint", //
"org.apache.flink.api.common.functions._", //
"org.apache.flink.api.java.io._", //
"org.apache.flink.api.java.aggregation._", //
"org.apache.flink.api.java.functions._", //
"org.apache.flink.api.java.operators._", //
"org.apache.flink.api.java.sampling._", //
"org.apache.flink.api.scala._", //
"org.apache.flink.api.scala.utils._", //
"org.apache.flink.streaming.api.scala._", //
"org.apache.flink.streaming.api.windowing.time._", //
"com.gaff.sappo.dag.streaming.interpreter.RichMapFunctions.CountRichMapFunction" //
};
repl run的scala脚本
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.map(new CountRichMapFunction()) // this code is error
.keyBy(0)
val window = counts.timeWindow(Time.seconds(5)).sum(1)
window.print()
env.execute("Window Stream WordCount")
错误json消息
{"data":"<console>:69: error: overloaded method value map with alternatives:"}
{"data":" [R](mapper: org.apache.flink.api.common.functions.MapFunction[(String, Int),R])(implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>"}
{"data":" [R](fun: ((String, Int)) => R)(implicit evidence$8: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]"}
{"data":" cannot be applied to (com.gaff.sappo.dag.streaming.interpreter.RichMapFunctions.CountRichMapFunction[Nothing,Nothing])"}
{"data":" .map(new CountRichMapFunction())"}
{"data":" ^"}
暂无答案!
目前还没有任何答案,快来回答吧!