为什么我的Kafka消费者比我的Kafka制作人慢得多?

doinxwow  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(371)

我得到一个数据流,我可以完全爬行。所有的数据都被输入Kafka,然后发送给Cassandra。现在Kafka的消费速度非常慢,比生产商慢得多。我希望他们完全一样。我能做些什么来获得这个结果,或者我的代码有什么问题?
这是我用python编写的kafka消费代码:

import logging
from cassandra.cluster import Cluster
from kafka.consumer.kafka import KafkaConsumer
from kafka.consumer.multiprocess import MultiProcessConsumer
from kafka.client import KafkaClient
from kafka.producer.simple import SimpleProducer
import json
from datetime import datetime, timedelta  
from cassandra import ConsistencyLevel
from dateutil.parser import parse
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG)
class Whitelist(logging.Filter):
    def __init__(self, *whitelist):
        self.whitelist = [logging.Filter(name) for name in whitelist]
    def filter(self, record):
        return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
    handler.addFilter(Whitelist('consumer'))
log = logging.getLogger('consumer')
try:
    cluster = Cluster(['localhost']); session = cluster.connect(keyspace)
    kafka = KafkaClient('localhost')
    consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None)
    article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?")
    article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM
    article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
    article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)")
    article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)")
    schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)")
    axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)")
    while True:
        messages = consumer.get_messages(count=16)
        if len(messages) == 0:
            print 'IDLE'
            continue
        for message in messages:
            try:
                response = json.loads(message.value)
                data = json.loads(response['body'])
                print response['body']
                articles = data['articles']
                idlist = [r['id'] for r in articles]
                if len(idlist)>0:
                    article_rows = session.execute(article_lookup_stmt,[idlist])
                    rows = [r.id for r in article_rows]
                    for article in articles:
                        try:
                            if not article['id'] in rows:
                                article['created_at'] = parse(article['created_at'])
                                scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0)
                                session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre']))
                                session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id']))
                                session.execute(article_by_url_insert_stmt, (article['url'], article['id']))
                                session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id']))
                                log.debug('%s %s' % (article['id'],article['created_at']))
                            session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares']))
                        except Exception as e:
                            print 'error==============:',e
                            continue
            except Exception as e:
                print 'error is:',e
                log.exception(e.message)
except Exception as e:
    log.exception(e.message)

编辑:
我还添加了我的个人资料结果和慢行代码似乎是

article_rows = session.execute(article_lookup_stmt,[idlist])

Sun Feb 14 16:01:01 2016    consumer.out

         395793 function calls (394232 primitive calls) in 23.074 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      141   10.695    0.076   10.695    0.076 {select.select}
     7564   10.144    0.001   10.144    0.001 {method 'acquire' of 'thread.lock' objects}
        1    0.542    0.542   23.097   23.097 consumer.py:5(<module>)
     1510    0.281    0.000    0.281    0.000 {method 'recv' of '_socket.socket' objects}
       38    0.195    0.005    0.195    0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode)
       13    0.078    0.006    0.078    0.006 {time.sleep}
     2423    0.073    0.000    0.137    0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__)
    22112    0.063    0.000    0.095    0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack)
        3    0.052    0.017    0.162    0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response)
2006/2005    0.047    0.000    0.055    0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan)
     1270    0.032    0.000    0.034    0.000 /usr/local/lib/python2.7/threading.py:259(__init__)
        3    0.024    0.008    0.226    0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics)
       33    0.024    0.001    0.031    0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple)
    15374    0.024    0.000    0.024    0.000 {built-in method new of type object at 0x788ee0}
      141    0.023    0.000   11.394    0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request)
      288    0.020    0.000    0.522    0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes)
     2423    0.018    0.000    0.029    0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller)
      115    0.018    0.000   11.372    0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages)
     2423    0.018    0.000    0.059    0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers)
    24548    0.017    0.000    0.017    0.000 {_struct.unpack}
44228/43959    0.016    0.000    0.016    0.000 {len}

谢谢你的回复。

bqf10yzr

bqf10yzr1#

您可以尝试运行consumer而不保存到c*,这样您就可以观察它有多大的不同。
如果事实证明保存到c是一个瓶颈(我假设是这样),那么您可以拥有一个线程池(大于您的使用者生成的16个线程),其唯一职责就是写入c
这样,您就可以卸载代码中缓慢的部分,这将只在用户代码中留下微不足道的部分。
你需要一个 from multiprocessing import Pool .
这里有更多。

相关问题