pykafka元数据(字节而不是字符串)

ukdjmx9f  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(351)

我看到pykafka有一个不寻常的行为,我最近才开始使用这个客户。
错误如下:

Failed to connect newly created broker for b'4758e4ee1af6':9092
{0: <pykafka.broker.Broker at 0x7f319e19be10 (host=b'4758e4ee1af6',port=9092, id=0)>}

错误的来源如下:

self.client = KafkaClient(hosts=BROKER_ADDRESS, broker_version="0.10.1.0")
consumer = self.client.topics[bytes(self.input_topic,"UTF-8")].get_balanced_consumer(
        consumer_group=bytes(self.consumer_group,"UTF-8"),
        auto_commit_enable=True
    )

调试时,我看到客户机使用正确的字符串ip连接到种子代理,但是当检索代理列表时,它们的ip是二进制的,当pykafka再次尝试连接以创建使用者时,这些ip显然不起作用。
另一个可能与此相关的问题是,我需要自己将主题名和使用者组名转换为字节(与其他客户端一样),但文档中的所有示例都显示了字符串的用法。
kafka broker版本:0.10.1.0 pykafka版本:2.7.0

cxfofazt

cxfofazt1#

检查你经纪人的账户 advertised.listeners config-它定义了在pykafka运行期间将发送给zookeeper并转发给pykafka客户端的主机名 Cluster 初始化。docker可能正在破坏此信息,因此您需要使用 advertised.listeners . 根据文件:
如果不同于 listeners 配置属性。在iaas环境中,这可能需要与代理绑定到的接口不同。
至于bytes/string问题,pykafka的最新开发版本接受字符串或bytes作为主题名和使用者组名,以方便程序员使用。对于旧版本,您需要使用以下技术将字符串参数转换为字节:

topic_name = str_topic_name.encode('ascii')
o3imoua4

o3imoua42#

好吧,我完全被误导了:那不是ip而是base64中的主机名(由docker生成)。

相关问题