我怎样才能通过一个完整的考试 Row
我的scalarfunction RowToTupleConverter
在下面的代码中?所有示例都只处理按名称传递单个或多个值,但我希望select语句的整个结果作为 Row
. 我猜是用了*,但这不是一个有效的参数。
envT.registerFunction("toTuple", new RowToTupleConverter());
envT.createTemporaryView("t", envT.fromDataStream(ds));
Table result = envT.from("t").select("getAvroFieldString(f1, 'HASH_KEY') as hk,
getAvroFieldLong(f1, 'LOAD_DATE') as ld, 'test' as NAME");
envT.toAppendStream(result.select("*").map("toTuple(*)"), new TupleTypeInfo[...]).print();
我不想处理单个字段,而是一整行,因为我正在构建所有泛型,因此scalarfunction需要一个类型为的参数 Row
. 函数遍历行并创建 Tuple2<GenericRecord,GenericRecord>>
从行的值。
背景:
作业是这样构建的,因为我们需要使用合流模式注册表从kafka源中获取键和值,并且作业应该是通用的,以允许任意模式,允许在不更改代码库的情况下进行多个示例化。我们发现实现这一点的唯一方法是从 FlinkKafkaConsumer
,在哪里 Tuple2
在的示例中分别包含消息的键和值 GenericRecord
,并将其转换为flink表。自 GenericRecord
是一个黑盒到表的api,我在另一个线程中按照建议创建了简单的scalarfunctions,它提取了我需要的特定值。现在,这个部分仍然是硬编码的,但一旦一切正常,它也将是通用的。但是,我很难将结果表 Package 回 Tuple2
,以便将转换后的记录写回另一个kafka主题,这就是为什么我引入了另一个scalarfunction来从一行Map到另一行 Tuple2<GenericRecord,GenericRecord>>
.
这可能吗?如果可能,怎么可能?如果没有,我可以使用什么样的解决方法来解决这个问题?总的来说,我也很欣赏关于更优雅的方法的建议,但是从我对这个方向所做的大量研究来看,由于用例的性质,我怀疑是否有。不幸的是,移动到specificred不是一个选择。
暂无答案!
目前还没有任何答案,快来回答吧!