我有一个抽象类,它的抽象方法创建 SourceFunction
,因此派生类可以返回简单或更复杂的源(例如。 KafkaConsumers
等等)。 ChangeMe
是一个简单的自动生成类,由avroschema编译创建。
public SourceFunction<ChangeMe> createSourceFunction(ParameterTool params) {
FromElementsFunction<ChangeMe> dataSource = null;
List<ChangeMe> changeMeList = Arrays.asList(
ChangeMe.newBuilder().setSomeField("Some field 1").build(),
ChangeMe.newBuilder().setSomeField("Some field 2").build(),
ChangeMe.newBuilder().setSomeField("Some field 3").build()
);
try {
dataSource = new FromElementsFunction<>(new AvroSerializer<>(ChangeMe.class), changeMeList);
}
catch (IOException ex){
}
return dataSource;
}
在我的flink工作中,我基本上有:
SourceFunction<ChangeMe> source = createSourceFunction(params);
DataStream<T> sourceDataStream = streamExecutionEnvironment.addSource(source);
DataStream<ChangeMe> changeMeEventsStream = this.getSourceDataStream(); // gets sourceDataStream above
changeMeEventsStream.print();
当我运行作业时,在调用print()时出现以下错误:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
……
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class org.apache.flink.streaming.api.functions.source.FromElementsFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).
我使用的是eclipse编译器,所以我认为应该包含类型信息(尽管我认为这只是针对lambdas的,上面没有)。我需要做什么才能让它正常运行?
1条答案
按热度按时间ddarikpa1#
如果要直接示例化
FromElementsFunction
,则必须手动提供TypeInformation
示例ChangeMe
调用时的类addSource
. 这是flink了解元素类型所必需的。下面的代码段应该可以做到这一点: