polyglot处理器与本地数据流服务器

gv8xihay  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(317)

我一直在尝试使用polyglot并构建一个简单的python处理器。我遵循了多语言的配方,但我无法部署流。我最初部署的处理器与示例中使用的处理器相同,但出现以下错误:
请求的未知命令行参数:spring.cloud.stream.bindings.input.destination请求的未知环境变量:spring\u cloud\u stream\u kafka\u binder\u brokers

Traceback (most recent call last):
File "/processor/python_processor.py", line 10, in
consumer = KafkaConsumer(get_input_channel(), bootstrap_servers=[get_kafka_binder_brokers()])
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 353, in init
self._client = KafkaClient(metrics=self._metrics,**self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 203, in init
self.cluster = ClusterMetadata(**self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/cluster.py", line 67, in init
self._bootstrap_brokers = self._generate_bootstrap_brokers()
File "/usr/local/lib/python2.7/dist-packages/kafka/cluster.py", line 71, in _generate_bootstrap_brokers
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 1336, in collect_hosts
host, port, afi = get_ip_port_afi(host_port)
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 1289, in get_ip_port_afi
host_and_port_str = host_and_port_str.strip()
AttributeError: 'NoneType' object has no attribute 'strip'
Exception AttributeError: "'KafkaClient' object has no attribute '_closed'" in <bound method KafkaClient.del of <kafka.client_async.KafkaClient object at 0x7f8b7024cf10>> ignored

然后,我尝试通过部署流传递环境和绑定参数,但没有成功。当我将spring\u cloud\u stream\u kafka\u binder\u brokers和spring.cloud.stream.bindings.input.destination参数手动插入kafka的使用者时,我能够将流部署为一种解决方法。我不完全确定是什么导致了这个问题,在kubernetes上部署这个会有什么不同吗?或者这是polyglot和dataflow的问题吗?任何帮助都将不胜感激。
复制步骤:尝试在本地数据流服务器上部署来自polyglot recipe的polyglot处理器流。我还使用与示例中相同的流定义:http--server.port=32123 | python processor--reversestring=true | log。
附加上下文:我试图在本地安装的spdf和kafka上部署流,因为我在使用docker部署自定义python应用程序时遇到了一些问题。

8fq7wneg

8fq7wneg1#

你在上面发布的菜谱期望 SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS 作为服务器配置的一部分存在的环境变量(由于流是通过skipper server管理的,因此需要在skipper server配置中设置此环境变量)。
您可以查看此文档,了解如何设置 SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS 作为skipper服务器部署中的环境属性。
在部署时,也可以将此属性作为deployer属性传递 python-processor 流应用程序。您可以参考此文档,了解如何在流部署时传递部署属性以设置spring云流属性(这里是binder配置属性)。

相关问题