我正在尝试运行由本地flink集群支持的apachebeam,以便使用kafka主题,如readfromkafka文档中所述。
代码基本上是这个管道和梁示例中描述的一些其他设置
with beam.Pipeline() as p:
lines = p | ReadFromKafka(
consumer_config={'bootstrap.servers': bootstrap_servers},
topics=[topic],
) | beam.WindowInto(beam.window.FixedWindows(1))
output = lines | beam.FlatMap(lambda x: print(x))
output | WriteToText(output)
因为我试图在flink上运行,所以我按照这个文档在flink上运行beam并执行以下操作:
-->我下载了Flink1.10的二进制文件,并按照以下说明正确设置集群。
我检查了服务器和任务示例的日志。两者都已正确初始化。
-->Kafka开始使用docker并在端口9092中暴露它。
-->在终端执行以下操作
python example_1.py --runner FlinkRunner --topic myTopic --bootstrap_servers localhost:9092 --flink_master localhost:8081 --output output_folder
终端输出 2.23.0: Pulling from apache/beam_java_sdk Digest: sha256:3450c7953f8472c2312148a2a8324b0115fd71b3a7a01a3b017f6de69d89dfe1 Status: Image is up to date for apache/beam_java_sdk:2.23.0 docker.io/apache/beam_java_sdk:2.23.0
但是在给mytopic写了一些消息之后,终端仍然冻结,在output文件夹中我看不到任何东西。我检查了flink-conf.yml并给出了这两行
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
我假设作业的端口是6123,而不是beam文档中指定的8081,但是两个端口的行为是相同的。
我对beam/flink非常陌生,所以我不太确定它是否可以,到目前为止我有两个假设,但不太清楚如何调查它们:
与beam与flink通信以发送作业的端口相关的东西。
2.apache.beam.io.external.readfromkafka文档中提到的python sdk扩展服务 Note: To use these transforms, you need to start a Java Expansion Service. Please refer to the portability documentation on how to do that. Flink Users can use the built-in Expansion Service of the Flink Runner’s Job Server. The expansion service address has to be provided when instantiating the transforms.
但在阅读可移植性文档时,它让我又回到了同一个文档中。
有人能帮帮我吗?
编辑:我正在使用postgresql的debezium源代码连接器撰写主题,并看到了上面提到的行为。但是当我试图手动处理这个主题时,应用程序崩溃了
RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
1条答案
按热度按时间disho6za1#
你做的每件事都是正确的;java扩展服务不再需要手动启动(请参阅最新文档)。另外,flink在8081处提供webui,但是也接受作业提交,所以两个端口都可以正常工作。
看起来您可能遇到了python的textio还不支持流的问题。
此外,在flink上运行python管道时,实际代码在docker映像中运行,因此如果您试图写入“本地”文件,它将是映像中的一个文件,而不是在您的计算机上。