import logging
from confluent_kafka import Producer
import os
logger = logging.getLogger("main")
BOOTSTRAP_SERVERS = os.environ['BOOTSTRAP_SERVERS']
APPLICATION_ID = os.getenv('APPLICATION_ID', default = "nke-data-source")
RECONNECT_BACKOFF_MS = os.getenv('RECONNECT_BACKOFF_MS', default = 1000)
REQUEST_TIMEOUT_MS = os.getenv('REQUEST_TIMEOUT_MS', default = 40000)
ACKS = os.getenv('ACKS', default = "all")
RETRIES = os.getenv('RETRIES', default = 15)
RETRY_BACK_OFF = os.getenv('RETRY_BACK_OFF', default = 1000)
MAX_IN_FLIGHT_REQUESTS = os.getenv('MAX_IN_FLIGHT_REQUESTS', default = 1)
topic = os.getenv('OUTBOUND_TOPIC', default = "tti-nke-raw")
p = Producer({'bootstrap.servers': BOOTSTRAP_SERVERS,
'client.id': APPLICATION_ID,
'reconnect.backoff.ms': RECONNECT_BACKOFF_MS,
'request.timeout.ms': REQUEST_TIMEOUT_MS,
'acks': ACKS,
'retries': RETRIES,
'retry.backoff.ms': RETRY_BACK_OFF,
'max.in.flight.requests.per.connection': MAX_IN_FLIGHT_REQUESTS,
'compression.type': "lz4"})
def send(key, event):
try:
logger.info("Sending key: [{0}] value: [{1}]".format(key, event))
p.produce(topic=topic, value=event.encode('utf-8'), key=key)
except Exception:
logger.error("error sending events to kafka", exc_info=True)
错误:-
Traceback (most recent call last):
BufferError: Local: Queue full
File "/app/sender.py", line 30, in send
p.produce(topic=topic, value=event.encode('utf-8'), key=key)
有人能帮助我在这方面,因为我是新的Python
1条答案
按热度按时间wa7juj8i1#
这个
Queue
是在librdkafka
库中实现的(confluent_kafka
绑定到这个库)produce有一个内部
Queue
,它接收production交付报告并等待production处理这些报告(大多数情况下什么都不做),但是您需要触发这种遍历队列的机制,可以通过调用poll
来执行你应该在每次调用produce之后调用
producer.poll(0)
,所以改变:致:
这将触发队列清理,不要担心性能,因为这是一个非常简单的函数,并没有像librdkafka的作者所写的那样做太多:
poll()的调用很便宜,它不会对性能产生影响,所以请将它添加到您的生产者循环中。
基本上它是这样做的:
定期调用poll()来服务生产者的交付报告回调。
考虑在此Issue中阅读有关此内容