上下文:
我们有一个需求,我需要使用kafkaconsumer库捕获一个特定的事务。我使用一个唯一的会话作为键,我从另一个工具(我们称之为feeder工具)获取它来捕获事务。
一旦从feeder获得会话,我立即运行代码。
问题
我可以从Kafka那里获取多个记录,但是我没有看到我试图使用会话筛选的记录。
代码
from kafka import KafkaConsumer
import json
SESSION = 'sessionID'
def consumeRecords(topic,group,bootstrapserver,auto_offset_reset,mySession,auto_commit_boolean):
consumer = KafkaConsumer(topic,group_id=group,bootstrap_servers=bootstrapserver,auto_offset_reset=auto_offset_reset,enable_auto_commit=auto_commit_boolean)
consumer.topics()
consumer.seek_to_beginning()
try:
while True:
print("CALLING POLL")
records = consumer.poll(timeout_ms=1000)
print("RETURNED FROM POLL")
if records:
for consumedRecord in records.values():
for msg in consumedRecord:
json_data = json.loads(msg.value.decode('utf-8'))
#print(json_data)
if 'alias' in json_data.keys() and json_data['alias']=='myServer':
current_session = json_data[SESSION]
print("SESSION is :" , current_session)
if mySession == current_session :
print('My record is ', json_data)
except Exception as e:
print("Unable to find any related sessions")
print(e)
if __name__ == '__main__':
KAFKA_TOPIC = 'e-commerce.request'
KAFKA_GROUP = 'test'
KAFKA_BROKERS = ['ABC.net:9092', 'DEF:9092']
auto_commit = False
consumeRecords(KAFKA_TOPIC,KAFKA_GROUP,KAFKA_BROKERS,'earliest','38l87jondkvnefNW886QMTWVcN6S4my5Y-No167ZzqF',auto_commit)
我应该打印从kafka使用的以下json数据,但是我的代码没有获取这个记录,因此没有打印任何内容,并且运行了无限长的时间
{'Type': 'request', 'requestID': '2018100819564659-5', 'payload': {'timing': {'startTime': '20181008195624322', 'total': '0.063', 'totalActions': '0', 'jsp': '0.063'}, 'user': {'orgID': '', 'userID': '', 'newComer': 'FALSE', 'helpdeskUserID': '', 'helpdeskUserOrgID': '', 'travelerID': ''}, 'version': '1.0.0', 'client': {'referrer': '', 'ip': ''}, 'url': {'parameters': {'JSESSIONID': '38l87jondkvnefNW886QMTWVcN6S4my5Y-No167ZzqF!430553578!-1652153437'}, 'baseUrl': 'http://server_url', 'path': 'DUMMY', 'method': 'POST'}, 'actions': [{'cumulTiming': '0', 'name': 'OverrideServlet', 'isChained': 'FALSE', 'features': '[GlobalTimeSpent = 0][PRE_RULES = 0][POST_RULES = 0]', 'chainParent': ''}], 'context': {'sessionSize': '12|12', 'fatalError': 'FALSE', 'requestType': 'XML', 'error': [], 'requestedSessionID': '', 'templateName': ''}}, 'Hostname': 'DummyAgain', 'sessionID': '38l87jondkvnefNW886QMTWVcN6S4my5Y-No167ZzqF', 'Site': 'ABCDEFGH', 'ClientId': 1234551515439, 'Epoch': 1539028584353, 'IP': 'A.B.C.D', 'alias': 'myServer', 'SeqNb': 21845, 'Source': 'eCommerce'}
暂无答案!
目前还没有任何答案,快来回答吧!