aiokafka:以前的工作代码现在在send\u和\u wait失败

ftf50wuq  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(524)

我已经用了一段时间了,我没有问题,直到今天。
奇怪的 TypeError 当我尝试使用发送消息时显示 AIOKafkaProducer.send_and_wait . 我还将这个问题作为问题发布在aiokafka的github存储库上,但看起来它们是不活跃的。也许有人能帮我。
代码如下:

import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers="localhost:9092")

async def _initialize(prod, future):
    await prod.start()
    await prod.send_and_wait("main_topic", str.encode("hello!!"))

future = asyncio.Future()
task = asyncio.ensure_future(_initialize(producer, future))
loop.run_until_complete(task)
print("loop ended!")
loop.close()

下面是我收到的错误消息:

yilmazali@yilmazali:~$ python3 aiokafkatest.py
Unexpected error in sender routine
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/producer.py", line 374, in _sender_routine
    task.result()
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/producer.py", line 418, in _send_produce_req
    response = yield from self.client.send(node_id, request)
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/client.py", line 415, in send
    request, expect_response=expect_response)
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/conn.py", line 165, in send
    message = header.encode() + request.encode()
  File "/usr/local/lib/python3.6/dist-packages/kafka/util.py", line 159, in __call__
    return self.method()(self.target(), *args,**kwargs)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/struct.py", line 42, in _encode_self
    [self.__dict__[name] for name in self.SCHEMA.names]
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in encode
    for i, field in enumerate(self.fields)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in <listcomp>
    for i, field in enumerate(self.fields)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in encode
    [self.array_of.encode(item) for item in items]
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in <listcomp>
    [self.array_of.encode(item) for item in items]
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in encode
    for i, field in enumerate(self.fields)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in <listcomp>
    for i, field in enumerate(self.fields)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in encode
    [self.array_of.encode(item) for item in items]
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in <listcomp>
    [self.array_of.encode(item) for item in items]
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in encode
    for i, field in enumerate(self.fields)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in <listcomp>
    for i, field in enumerate(self.fields)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 93, in encode
    return Int32.encode(len(value)) + value
TypeError: object of type '_io.BytesIO' has no len()

我没有改变我的Kafka结构或图书馆。我的Kafka经纪人看起来不错。我可以使用shell脚本生成/使用消息。
在过去的2-3个月里,我没有遇到任何问题,上面的代码运行良好。不知怎的,这个错误出现了,我想知道是什么问题。
任何帮助都将不胜感激。
致以最诚挚的问候,
阿里

更新:我们在朋友的电脑上运行了这个代码段,效果很好。我在外面宣传我的Kafka,她用上面的代码成功地给我当地的Kafka主题写信。两台机器上的aiokafka库版本都是0.4.0。两台机器上的asyncio版本都是3.4.3。简言之,问题不在于我的Kafka或图书馆。我的机器出了点问题,但天知道具体是什么原因造成的。

3bygqnnd

3bygqnnd1#

终于在我的机器上工作了。我只是卸载和安装 aiokafka 模块。
虽然我不满意这个解决方案,我想冒险更深入的问题的核心,我很高兴我现在可以继续做我的工作。
希望这对有同样问题的陌生人有所帮助。

twh00eeo

twh00eeo2#

可能您已经将kafka python更新到了1.3.5以上的版本。我这么做了,爱奥Kafka开始失败了。我在kafkapython1.3.5上回来了,看起来还不错

相关问题