我使用kafka来维护一个python服务,该服务应该是并行工作的,以便有效地处理每条消息的缓慢api请求。
我使用了python和kafkapython上的多处理模块。
zookeeper和kafka 2.11在同一个ubuntu服务器上运行,配置大多不正确。
这个主题是由另一个kafkapython生产者自动创建的,设置为有10个分区,以便同时使用10个使用者。
当我检查时,我发现队列很长,所以生产者发送了很多请求:
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic usrReq --time -1
usrReq:8:1157
usrReq:2:1185
usrReq:5:1167
usrReq:4:1115
usrReq:7:1164
usrReq:10:1150
usrReq:1:1149
usrReq:9:1138
usrReq:3:1186
usrReq:6:1220
usrReq:0:6264
然而;虽然并行处理10个内核,但消费者需要很长时间(下面的示例日志为117秒)才能从队列中获取下一条消息。
thread 7, consumer: 117.485 sec
api1:0.412 sec
api2:0.752 sec
db_insert:0.132 sec
这是每个进程创建自己的使用者、获取消息和对代码运行分析的方式:
consumer = KafkaConsumer(group_id='my-group',
bootstrap_servers='localhost',
value_deserializer=lambda m: json.loads(m.decode('ascii'))
consumer.subscribe(topics='usrReq')
while True:
msg = next(consumer).value['id']
method(msg)
此设置中的问题可能在哪里?
暂无答案!
目前还没有任何答案,快来回答吧!