RabbitMQ Pika连接重置,(-1,ConnectionResetError(104,'Connection reset by peer'))

tpgth1q7  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(3)|浏览(189)

通过stackoverflow搜索并发布此问题,因为没有解决方案对我有效,我的问题可能与其他问题不同。
我正在写一个脚本,它从rabbitMQ队列中获取一篇文章,并对文章进行处理,以计算单词并从中提取关键词,然后将其转储到db中。我的脚本工作正常,但经过一段时间的执行,我得到这个异常
(-1, "ConnectionResetError(104, 'Connection reset by peer')")
我不知道为什么我会得到这个。我已经尝试了很多解决方案,可在stackover流没有一个是为我工作。我写了我的剧本,并尝试了两种不同的方式。两者都工作正常,但过了一段时间后,同样的异常发生。
下面是我的第一个代码:

def app_main():

    global channel, results, speedvars
    Logger.log_message('Starting app main')

    # Edit 4
    def pika_connect():
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=Config.AMQ_DAEMONS['base']['amq-host']))
        channel = connection.channel()
        print ("In pika connect")
        Logger.log_message('Setting up input queue consumer')
        channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
        channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

        Logger.log_message('Starting loop')
        channel.start_consuming()

    #########

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()

    # Edit 5 starting 10 threads to listen to pika 

    for th in range(qthreads):
        Logger.log_message('Starting thread: '+str(th))
        try:
            t = Thread(target=pika_connect, args=())
            t.start()
        except Exception as e:
            Logger.error_message("Exception in starting threads " + str(e))


try:
    app_main()
except Exception as e:
    Logger.error_message("Exception in APP MAIN " + str(e))

下面是我的第二个代码:

def app_main():

    global channel, results, speedvars
    Logger.log_message('Starting app main')

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()

    connection = pika.BlockingConnection(pika.ConnectionParameters(
             host=Config.AMQ_DAEMONS['base']['amq-host']))
    channel = connection.channel()
    print ("In app main")
    Logger.log_message('Setting up input queue consumer')
    channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
    channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

    Logger.log_message('Starting loop')

    try:
        channel.start_consuming()
    except Exception as e:
        Logger.error_message("Exception in start_consuming in main " + str(e))
        raise e

try:
    app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))

在我的第一个代码中,我使用了线程,因为我想加快处理文章的过程。
这是我的回电功能
def on_message(ch, method, properties, message): Logger.log_message("Starting parsing new msg ") handle_message(message)

编辑:完整代码

import os
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)

from Modules import Logger
import pika
import Config
import json
import pickle
import Pipeline
import sys
import time
import datetime
import threading
import queue
import functools

from pid.decorator import pidfile

Logger.log_init(Config.AMQ_DAEMONS['consumer']['log-ident'])
#qthreads = Config.AMQ_DAEMONS['consumer']['threads']
results = queue.Queue()
channel = None
speedvars = None

SPD_RECEIVED = 'received'
SPD_DISCARDED = 'discarded'
SPD_SENT = 'sent'

class SpeedVars(object):
    vars = {}
    lock = None

    def __init__(self):
        self.lock = threading.Lock()

    def inc(self, var):

        self.lock.acquire()
        try:
            if var in self.vars:
                self.vars[var] += 1
            else:
                self.vars[var] = 1
        finally:
            self.lock.release()

    def dec(self, var):

        self.lock.acquire()
        try:
            if var in self.vars:
                self.vars[var] -= 1
            else:
                Logger.error_message('Cannot decrement ' + var + ', not tracked')
        finally:
            self.lock.release()

    def get(self, var):

        out = None
        self.lock.acquire()
        try:
            if var in self.vars:
                out = self.vars[var]
            else:
                Logger.error_message('Cannot get ' + var + ', not tracked')
        finally:
            self.lock.release()

        return out

    def get_all(self):

        out = None
        self.lock.acquire()
        try:
            out = self.vars.copy()
        finally:
            self.lock.release()

        return out

class SpeedTracker(threading.Thread):
    speedvars = None
    start_ts = None
    last_vars = {}

    def __init__(self, speedvars):
        super(SpeedTracker, self).__init__()
        self.start_ts = time.time()
        self.speedvars = speedvars
        Logger.log_message('Setting up speed tracker')

    def run(self):
        while True:
            time.sleep(Config.AMQ_DAEMONS['consumer']['speed-tracking-interval'])
            prev = self.last_vars
            cur = self.speedvars.get_all()
            now = time.time()
            if len(prev) > 0:
                q = {}
                for key in cur:
                    qty = cur[key] - prev[key]
                    avg = qty / Config.AMQ_DAEMONS['consumer']['speed-tracking-interval']
                    overall_avg = cur[key] / (now - self.start_ts)
                    Logger.log_message('Speed-tracking (' + key + '): total ' + str(cur[key])
                                       + ', delta ' + str(qty) + ', speed ' + '%0.2f' % avg + '/sec, '
                                       + ', overall speed ' + '%0.2f' % overall_avg + '/sec')
                pending = cur[SPD_RECEIVED] - cur[SPD_DISCARDED] - cur[SPD_SENT]
                pending_avg = pending / (now - self.start_ts)
                Logger.log_message('Speed-tracking (pending): total ' + str(pending)
                                   + ', overall speed ' + '%0.2f' % pending_avg + '/sec')
            self.last_vars = cur

class ResultsSender(threading.Thread):
    channel = None
    results = None
    speedvars = None

    def __init__(self, results, speedvars):
        super(ResultsSender, self).__init__()
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=Config.AMQ_DAEMONS['base']['amq-host']))
        self.channel = connection.channel()
        Logger.log_message('Setting up output exchange')
        self.channel.exchange_declare(exchange=Config.AMQ_DAEMONS['consumer']['output'], exchange_type='direct')
        self.results = results
        self.speedvars = speedvars

    def run(self):
        while True:
            item = self.results.get()
            self.channel.basic_publish(
                exchange=Config.AMQ_DAEMONS['consumer']['output'],
                routing_key='',
                body=item)
            self.speedvars.inc(SPD_SENT)

def parse_message(message):
    try:
        bodytxt = message.decode('UTF-8')
        body = json.loads(bodytxt)
        return body
    except Exception as e:
        Logger.error_message("Cannot parse message - " + str(e))
        raise e

def get_body_elements(body):
    try:
        artid = str(body.get('article_id'))
        article_dt = datetime.datetime.fromtimestamp(body.get('pubTime'))
        date = article_dt.strftime(Config.DATE_FORMAT)
        article = "\n".join([body.get('title', ''), body.get('subheading', ''), body.get('content', '')])
        return (artid, date, article)
    except Exception as e:
        Logger.error_message("Cannot retrieve article attributes " + str(e))
        raise e

def process_article(id, date, text):
    global results, speedvars
    try:
        Logger.log_message('Processing article ' + id)
        keywords = Pipeline.extract_keywords(text)
        send_data = {"id": id, "date": date, "keywords": keywords}
        results.put(pickle.dumps(send_data))
        # print('Queue Size:',results.qsize())
    except Exception as e:
        Logger.error_message("Problem processing article " + str(e))
        raise e

def ack_message(ch, delivery_tag):
    """Note that `channel` must be the same pika channel instance via which
    the message being ACKed was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        channel.basic_ack(delivery_tag)
    else:
        Logger.error_message("Channel is already closed, so we can't ACK this message" + str(e))
        # Channel is already closed, so we can't ACK this message;
        # log and/or do something that makes sense for your app in this case.
        #pass

def handle_message(connection, ch, delivery_tag, message):
    global speedvars
    start = time.time()
    thread_id = threading.get_ident()

    try:
        speedvars.inc(SPD_RECEIVED)
        body = parse_message(message)
        (id, date, text) = get_body_elements(body)
        words = len(text.split())
        if words <= Config.AMQ_DAEMONS['consumer']['word-count-limit']:
            process_article(id, date, text)
        else:
            Logger.log_message('Ignoring article, over word count limit')
            speedvars.inc(SPD_DISCARDED)

    except Exception as e:
        Logger.error_message("Could not process message - " + str(e))

    cb = functools.partial(ack_message, ch, delivery_tag)
    connection.add_callback_threadsafe(cb)

    Logger.log_message("Thread id: "+str(thread_id)+" Delivery tag: "+str(delivery_tag)) 
    Logger.log_message("TOtal time taken to handle message : "+ str(time.time()-start))

# CALL BACK    
## def on_message(ch, method, properties, message):
##    global executor
##    executor.submit(handle_message, message)

def on_message(ch, method, header_frame, message, args):
    (connection, threads) = args
    delivery_tag = method.delivery_tag
    t = threading.Thread(target=handle_message, args=(connection, ch, delivery_tag, message))
    t.start()
    threads.append(t)

####################################################
@pidfile(piddir=Config.AMQ_DAEMONS['base']['pid-dir'], pidname=Config.AMQ_DAEMONS['consumer']['pid-file'])
def app_main():
    global channel, results, speedvars

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()

    # Pika Connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=Config.AMQ_DAEMONS['base']['amq-host']))
    channel = connection.channel()

    Logger.log_message('Setting up input queue consumer')
    channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)

    #channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
    channel.basic_qos(prefetch_count=1)
    threads = []
    on_message_callback = functools.partial(on_message, args=(connection, threads))
    channel.basic_consume(on_message_callback, Config.AMQ_DAEMONS['consumer']['input'])

    Logger.log_message('Starting loop')
    ## channel.start_consuming()
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()

    Wait for all to complete
    for thread in threads:
        thread.join()

    connection.close()

app_main()

皮卡是不是花了很多时间来处理消息仍然我面临着连接重置问题.

**处理消息的总时间:0.0005991458892822266

yfwxisqw

yfwxisqw1#

你的handle_message方法阻塞了心跳,因为你的所有代码,包括Pika I/O循环,都在同一个线程上运行。查看this example如何在Pikas I/O循环的单独线程上运行您的工作(handle_message),然后正确地确认消息。

pexxcrt2

pexxcrt22#

我也收到了同样的问题。增加心跳和连接超时配置的持续时间对我来说不起作用。我终于弄明白了,如果你已经创建了一个频道,并且你在几分钟内没有发布任何东西(在我的情况下是20分钟),在这种情况下,我们会得到这个错误。
为我工作的解决方案:
1.在发布任何消息之前立即创建通道。或
1.使用try-except,如果你得到一个异常,创建另一个频道并重新发布。ie.

try:
     channel.basic_publish(exchange='', routing_key='abcd', body=data)
 except Exception as e1:
     connection=pika.BlockingConnection(pika.ConnectionParameters(host='1.128.0.3',credentials=credentials))
     channel = connection.channel()
     channel.basic_publish(exchange='', routing_key='abcd', body=data)

这将至少保持运行的东西,并防止丢失任何数据。我不是这方面的Maven,但希望这对某人有帮助!

kpbwa7wx

kpbwa7wx3#

我也遇到了同样的问题,并通过增加心跳和连接超时配置的持续时间来解决。
非常感谢@LukeBakken,他实际上找到了根本原因。
以下是配置超时的方法:

import pika

def main():

    # NOTE: These parameters work with all Pika connection types
    params = pika.ConnectionParameters(heartbeat=600, blocked_connection_timeout=300)

    conn = pika.BlockingConnection(params)

    chan = conn.channel()

    chan.basic_publish('', 'my-alphabet-queue', "abc")

    # If publish causes the connection to become blocked, then this conn.close()
    # would hang until the connection is unblocked, if ever. However, the
    # blocked_connection_timeout connection parameter would interrupt the wait,
    # resulting in ConnectionClosed exception from BlockingConnection (or the
    # on_connection_closed callback call in an asynchronous adapter)
    conn.close()

if __name__ == '__main__':
    main()

参考:https://pika.readthedocs.io/en/stable/examples/heartbeat_and_blocked_timeouts.html

相关问题