如何在apachebeam中使用mongodb/documentdb/redis作为侧输入?

inn6fuwd  于 2021-06-07  发布在  Redis
关注(0)|答案(1)|浏览(485)

我有一个用例,我曾经通过流(kinesis)获取数据,并希望对其执行一些转换。在转换过程中,我需要查找mongodb/documentdb/redis,其中存储了我们的参考数据。我正在通过apachebeam实现这个用例。
我需要使用这些数据库(mongodb/documentdb/redis)中的集合/表作为辅助输入,这样我就可以加载它一次并从那里进行查找。我想一次加载所有记录,并使其可用于侧输入。
我试过了,但是我的错误率越来越低了-
样本代码-

final PCollectionView<Map<String, String>> mongoInput = pipeline.apply("Read from MongoDB", MongoDbIO.read().withUri("mongodb://URL")
                .withDatabase("dbname").withCollection("collection_name"))
                .apply("Document to String", ParDo.of(new MongoToKeyValueDoFn()))
                .apply("create a view for side input", View.<String, String>asMap());

在mongotokeyvaluedofn类中,我将键和值放入hashmap并发出它。
错误-

The method apply(String, PTransform<? super PCollection<Map<String,String>>,OutputT>) in the type PCollection<Map<String,String>> is not applicable for the arguments (String, View.AsMap<String,String>)

有人能告诉我在输入端加载数据吗?

7uhlpewt

7uhlpewt1#

从查看的文档中:
如果已知kv<k,v>的pcollection对每个键的每个窗口都有一个值,则使用asmap()将其作为Map查看<k,v>
这里的问题是你用 MongoToKeyValueDoFn 输出a PCollection<Map<String, String>> ,即包含Map的pcollection,而asmap期望 PCollection<String, String> ,即字符串键和字符串值的p集合。
这里的解决办法是调整 MongoToKeyValueDoFn 为了遵循预期的格式,它应该输出键值对,而不是将它们存储在hashmap中。

相关问题