我得到一个数据流,我可以完全爬行。所有的数据都被输入Kafka,然后发送给Cassandra。现在Kafka的消费速度非常慢,比生产商慢得多。我希望他们完全一样。我能做些什么来获得这个结果,或者我的代码有什么问题?
这是我用python编写的kafka消费代码:
import logging
from cassandra.cluster import Cluster
from kafka.consumer.kafka import KafkaConsumer
from kafka.consumer.multiprocess import MultiProcessConsumer
from kafka.client import KafkaClient
from kafka.producer.simple import SimpleProducer
import json
from datetime import datetime, timedelta
from cassandra import ConsistencyLevel
from dateutil.parser import parse
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG)
class Whitelist(logging.Filter):
def __init__(self, *whitelist):
self.whitelist = [logging.Filter(name) for name in whitelist]
def filter(self, record):
return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
handler.addFilter(Whitelist('consumer'))
log = logging.getLogger('consumer')
try:
cluster = Cluster(['localhost']); session = cluster.connect(keyspace)
kafka = KafkaClient('localhost')
consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None)
article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?")
article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM
article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)")
article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)")
schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)")
axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)")
while True:
messages = consumer.get_messages(count=16)
if len(messages) == 0:
print 'IDLE'
continue
for message in messages:
try:
response = json.loads(message.value)
data = json.loads(response['body'])
print response['body']
articles = data['articles']
idlist = [r['id'] for r in articles]
if len(idlist)>0:
article_rows = session.execute(article_lookup_stmt,[idlist])
rows = [r.id for r in article_rows]
for article in articles:
try:
if not article['id'] in rows:
article['created_at'] = parse(article['created_at'])
scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0)
session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre']))
session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id']))
session.execute(article_by_url_insert_stmt, (article['url'], article['id']))
session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id']))
log.debug('%s %s' % (article['id'],article['created_at']))
session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares']))
except Exception as e:
print 'error==============:',e
continue
except Exception as e:
print 'error is:',e
log.exception(e.message)
except Exception as e:
log.exception(e.message)
编辑:
我还添加了我的个人资料结果和慢行代码似乎是
article_rows = session.execute(article_lookup_stmt,[idlist])
Sun Feb 14 16:01:01 2016 consumer.out
395793 function calls (394232 primitive calls) in 23.074 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
141 10.695 0.076 10.695 0.076 {select.select}
7564 10.144 0.001 10.144 0.001 {method 'acquire' of 'thread.lock' objects}
1 0.542 0.542 23.097 23.097 consumer.py:5(<module>)
1510 0.281 0.000 0.281 0.000 {method 'recv' of '_socket.socket' objects}
38 0.195 0.005 0.195 0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode)
13 0.078 0.006 0.078 0.006 {time.sleep}
2423 0.073 0.000 0.137 0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__)
22112 0.063 0.000 0.095 0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack)
3 0.052 0.017 0.162 0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response)
2006/2005 0.047 0.000 0.055 0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan)
1270 0.032 0.000 0.034 0.000 /usr/local/lib/python2.7/threading.py:259(__init__)
3 0.024 0.008 0.226 0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics)
33 0.024 0.001 0.031 0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple)
15374 0.024 0.000 0.024 0.000 {built-in method new of type object at 0x788ee0}
141 0.023 0.000 11.394 0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request)
288 0.020 0.000 0.522 0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes)
2423 0.018 0.000 0.029 0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller)
115 0.018 0.000 11.372 0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages)
2423 0.018 0.000 0.059 0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers)
24548 0.017 0.000 0.017 0.000 {_struct.unpack}
44228/43959 0.016 0.000 0.016 0.000 {len}
谢谢你的回复。
1条答案
按热度按时间bqf10yzr1#
您可以尝试运行consumer而不保存到c*,这样您就可以观察它有多大的不同。
如果事实证明保存到c是一个瓶颈(我假设是这样),那么您可以拥有一个线程池(大于您的使用者生成的16个线程),其唯一职责就是写入c。
这样,您就可以卸载代码中缓慢的部分,这将只在用户代码中留下微不足道的部分。
你需要一个
from multiprocessing import Pool
.这里有更多。