kafka由于docker延迟而超时

798qvoo8  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(416)

我对Kafka和多克完全陌生,有一个问题要解决。我们针对kafka(apache)队列的持续集成测试在本地计算机上运行良好,但在jenkins ci服务器上时,偶尔会出现以下错误:

%3|1508247800.270|FAIL|art#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1508247800.270|ERROR|art#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1508247800.270|ERROR|art#producer-1| [thrd:localhost:9092/bootstrap]: 1/1 brokers are down

工作原理是docker的形象需要时间来开始,到那时Kafka制作人已经放弃了。违规代码是

producer_properties = {
        'bootstrap.servers': self._job_queue.bootstrap_server,
        'client.id': self._job_queue.client_id
    }
    try:
        self._producer = kafka.Producer(**producer_properties)
    except:
        print("Bang!")

上面的错误行出现在生产者的创建中。但是,不会引发异常,并且调用会返回一个外观有效的生产者,因此我无法以编程方式测试代理端点的存在性。是否有用于检查代理状态的api?

jgwigjjp

jgwigjjp1#

这是似乎对我有用的代码。如果它看起来有点像弗兰肯斯坦,那么你是对的,它是!如果有一个干净的解决方案,我期待着看到它:

import time
import uuid
from threading import Event
from typing import Dict

import confluent_kafka as kafka

# pylint: disable=no-name-in-module

from confluent_kafka.cimpl import KafkaError

# more imports...

LOG = # ...

# Default number of times to retry connection to Kafka Broker

_DEFAULT_RETRIES = 3

# Default time in seconds to wait between connection attempts

_DEFAULT_RETRY_DELAY_S = 5.0

# Number of times to scan for an error after initiating the connection. It appears that calling

# flush() once on a producer after construction isn't sufficient to catch the 'broker not available'

# # error. At least twice seems to work.

_NUM_ERROR_SCANS = 2

class JobProducer(object):
    def __init__(self, connection_retries: int=_DEFAULT_RETRIES,
                 retry_delay_s: float=_DEFAULT_RETRY_DELAY_S) -> None:
        """
        Constructs a producer.
        :param connection_retries: how many times to retry the connection before raising a
        RuntimeError. If 0, retry forever.
        :param retry_delay_s: how long to wait between retries in seconds.
        """
        self.__error_event = Event()
        self._job_queue = JobQueue()
        self._producer = self.__wait_for_broker(connection_retries, retry_delay_s)
        self._topic = self._job_queue.topic

    def produce_job(self, job_definition: Dict) -> None:
        """
        Produce a job definition on the queue
        :param job_definition: definition of the job to be executed
        """
        value = ... # Conversion to JSON
        key = str(uuid.uuid4())
        LOG.info('Produced message: %s', value)

        self.__error_event.clear()
        self._producer.produce(self._topic,
                               value=value,
                               key=key,
                               on_delivery=self._on_delivery)
        self._producer.flush(self._job_queue.flush_timeout)

    @staticmethod
    def _on_delivery(error, message):
        if error:
            LOG.error('Failed to produce job %s, with error: %s', message.key(), error)

    def __create_producer(self) -> kafka.Producer:
        producer_properties = {
            'bootstrap.servers': self._job_queue.bootstrap_server,
            'error_cb': self.__on_error,
            'client.id': self._job_queue.client_id,
        }
        return kafka.Producer(**producer_properties)

    def __wait_for_broker(self, retries: int, delay: float) -> kafka.Producer:
        retry_count = 0
        while True:
            self.__error_event.clear()
            producer = self.__create_producer()
            # Need to call flush() several times with a delay between to ensure errors are caught.
            if not self.__error_event.is_set():
                for _ in range(_NUM_ERROR_SCANS):
                    producer.flush(0.1)
                    if self.__error_event.is_set():
                        break
                    time.sleep(0.1)
                else:
                    # Success: no errors.
                    return producer

            # If we get to here, the error callback was invoked.
            retry_count += 1
            if retries == 0:
                msg = '({})'.format(retry_count)
            else:
                if retry_count <= retries:
                    msg = '({}/{})'.format(retry_count, retries)
                else:
                    raise RuntimeError('JobProducer timed out')

            LOG.warn('JobProducer: could not connect to broker, will retry %s', msg)
            time.sleep(delay)

    def __on_error(self, error: KafkaError) -> None:
        LOG.error('KafkaError: %s', error.str())
        self.__error_event.set()
bn31dyow

bn31dyow2#

如果与代理的连接失败,客户端似乎不会抛出异常。实际上,当生产者第一次尝试发送消息时,它会尝试连接到引导服务器。如果连接失败,它会反复尝试连接到引导列表中传递的任何代理。最后,如果代理出现,send就会发生(我们可以在回调函数中检查状态)。合流kafka python库正在使用librdkafka库,而此客户端似乎没有正确的文档。某些由kafka协议指定的kafka生产者选项似乎不受librdkafka支持。
下面是我使用的回调示例代码:

from confluent_kafka import Producer

def notifyme(err, msg):
    print err, msg.key(), msg.value()

p = Producer({'bootstrap.servers': '127.0.0.1:9092', 'retry.backoff.ms' : 100,
        'message.send.max.retries' : 20,
        "reconnect.backoff.jitter.ms" :  2000})
try:
    p.produce(topic='sometopic', value='this is data', on_delivery=notifyme)
except Exception as e:
    print e
p.flush()

另外,检查代理是否存在,您可以只通过telnet连接到代理端口上的代理ip(在本例中是9092)。在kafka集群使用的zookeeper上,您可以检查/brokers/ids下znode的内容

相关问题