如何在flink中使用具有泛型类型的类?我遇到了错误:
函数“main(streamingjob)”的返回类型。java:63)'由于类型擦除,无法自动确定。通过对转换调用的结果使用returns(…)方法,或者让函数实现“resulttypequeryable”接口,可以给出类型信息提示。
我使用的类的形式如下:
class MaybeProcessable<T> {
private final T value;
public MaybeProcessable(T value) {
this.value = value;
}
public T get() {
return value;
}
}
我以Flink的工作为例:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new PubSubSource(PROJECT_ID, SUBSCRIPTION_NAME))
.map(MaybeProcessable::new)
.map(MaybeProcessable::get)
.writeAsText("/tmp/flink-output", FileSystem.WriteMode.OVERWRITE);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
现在我可以使用 .returns()
功能:
.map(MaybeProcessable::new).returns(new MyCustomTypeInformationClass(String.class))
但这需要我编写自己的序列化程序。难道没有更简单的方法来实现这一点吗?
1条答案
按热度按时间dwbf0jvd1#
你可以用
.returns(TypeInformation.of(new TypeHint<MaybeProcessing<#CONCRETE_TYPE_HERE>>{})
对于每次重复使用泛型mapfunction来设置返回类型而不创建任何自定义类。