Flink鲁纳身上的Apache光束不读Kafka的作品

8xiog9wr  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(295)

我正在尝试运行由本地flink集群支持的apachebeam,以便使用kafka主题,如readfromkafka文档中所述。
代码基本上是这个管道和梁示例中描述的一些其他设置

with beam.Pipeline() as p:

  lines = p | ReadFromKafka(
      consumer_config={'bootstrap.servers': bootstrap_servers},
    topics=[topic],
          ) | beam.WindowInto(beam.window.FixedWindows(1))

  output = lines | beam.FlatMap(lambda x: print(x))

  output | WriteToText(output)

因为我试图在flink上运行,所以我按照这个文档在flink上运行beam并执行以下操作:
-->我下载了Flink1.10的二进制文件,并按照以下说明正确设置集群。
我检查了服务器和任务示例的日志。两者都已正确初始化。
-->Kafka开始使用docker并在端口9092中暴露它。
-->在终端执行以下操作

python example_1.py --runner FlinkRunner --topic myTopic --bootstrap_servers localhost:9092  --flink_master localhost:8081 --output output_folder

终端输出 2.23.0: Pulling from apache/beam_java_sdk Digest: sha256:3450c7953f8472c2312148a2a8324b0115fd71b3a7a01a3b017f6de69d89dfe1 Status: Image is up to date for apache/beam_java_sdk:2.23.0 docker.io/apache/beam_java_sdk:2.23.0 但是在给mytopic写了一些消息之后,终端仍然冻结,在output文件夹中我看不到任何东西。我检查了flink-conf.yml并给出了这两行

jobmanager.rpc.address: localhost 
jobmanager.rpc.port: 6123

我假设作业的端口是6123,而不是beam文档中指定的8081,但是两个端口的行为是相同的。
我对beam/flink非常陌生,所以我不太确定它是否可以,到目前为止我有两个假设,但不太清楚如何调查它们:
与beam与flink通信以发送作业的端口相关的东西。
2.apache.beam.io.external.readfromkafka文档中提到的python sdk扩展服务 Note: To use these transforms, you need to start a Java Expansion Service. Please refer to the portability documentation on how to do that. Flink Users can use the built-in Expansion Service of the Flink Runner’s Job Server. The expansion service address has to be provided when instantiating the transforms. 但在阅读可移植性文档时,它让我又回到了同一个文档中。
有人能帮帮我吗?
编辑:我正在使用postgresql的debezium源代码连接器撰写主题,并看到了上面提到的行为。但是当我试图手动处理这个主题时,应用程序崩溃了

RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
disho6za

disho6za1#

你做的每件事都是正确的;java扩展服务不再需要手动启动(请参阅最新文档)。另外,flink在8081处提供webui,但是也接受作业提交,所以两个端口都可以正常工作。
看起来您可能遇到了python的textio还不支持流的问题。
此外,在flink上运行python管道时,实际代码在docker映像中运行,因此如果您试图写入“本地”文件,它将是映像中的一个文件,而不是在您的计算机上。

相关问题