我有一个用python编写的kafka客户机,在终端(macos)上运行良好。我将客户端代码部署到docker。现在,当客户端运行时,它似乎连接到kafka ok,但在我轮询数据时挂起。在这段代码中,对consumer.poll()的调用永远不会返回。
def process_video_stream():
global recv_img, first_frame
while keep_polling:
app.logger.info("polling")
msg_pack = consumer.poll(timeout_ms=500)
app.logger.info("back")
for tp, messages in msg_pack.items():
for message in messages:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
buf = numpy.array(message.value)
jpg_as_np = numpy.frombuffer(buf, dtype=numpy.uint8)
# mat = cv2.UMat(buf)
img = cv2.imdecode(jpg_as_np, cv2.IMREAD_COLOR)
if first_frame:
open_vid_file(img)
first_frame = False
writer.write(img)
app.logger.debug("%s:%d:%d" % (tp.topic, tp.partition,
message.offset))
从容器内部,我可以远程连接到Kafka服务器,所以我不认为这是一个网络问题。我能找到的所有答案都是关于在容器中运行Kafka的人,而不是生产者/消费者。
1条答案
按热度按时间cidc1ykv1#
结果我遇到了和Kafka在集装箱里时一样的问题。advised.listeners需要设置为主机的主机名/ip,而不是默认值。