将整行作为参数传递给flink table api中的用户定义表函数

pgky5nke  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(327)

我怎样才能通过一个完整的考试 Row 我的scalarfunction RowToTupleConverter 在下面的代码中?所有示例都只处理按名称传递单个或多个值,但我希望select语句的整个结果作为 Row . 我猜是用了*,但这不是一个有效的参数。

envT.registerFunction("toTuple", new RowToTupleConverter());
envT.createTemporaryView("t", envT.fromDataStream(ds));                     
Table result = envT.from("t").select("getAvroFieldString(f1, 'HASH_KEY') as hk,
               getAvroFieldLong(f1, 'LOAD_DATE') as ld, 'test' as NAME");
envT.toAppendStream(result.select("*").map("toTuple(*)"), new TupleTypeInfo[...]).print();

我不想处理单个字段,而是一整行,因为我正在构建所有泛型,因此scalarfunction需要一个类型为的参数 Row . 函数遍历行并创建 Tuple2<GenericRecord,GenericRecord>> 从行的值。
背景:
作业是这样构建的,因为我们需要使用合流模式注册表从kafka源中获取键和值,并且作业应该是通用的,以允许任意模式,允许在不更改代码库的情况下进行多个示例化。我们发现实现这一点的唯一方法是从 FlinkKafkaConsumer ,在哪里 Tuple2 在的示例中分别包含消息的键和值 GenericRecord ,并将其转换为flink表。自 GenericRecord 是一个黑盒到表的api,我在另一个线程中按照建议创建了简单的scalarfunctions,它提取了我需要的特定值。现在,这个部分仍然是硬编码的,但一旦一切正常,它也将是通用的。但是,我很难将结果表 Package 回 Tuple2 ,以便将转换后的记录写回另一个kafka主题,这就是为什么我引入了另一个scalarfunction来从一行Map到另一行 Tuple2<GenericRecord,GenericRecord>> .
这可能吗?如果可能,怎么可能?如果没有,我可以使用什么样的解决方法来解决这个问题?总的来说,我也很欣赏关于更优雅的方法的建议,但是从我对这个方向所做的大量研究来看,由于用例的性质,我怀疑是否有。不幸的是,移动到specificred不是一个选择。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题