使用kafkaconsumer在python中从kafka获取特定记录,方法是使用sessionid归档事务

cnh2zyt3  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(194)

上下文:
我们有一个需求,我需要使用kafkaconsumer库捕获一个特定的事务。我使用一个唯一的会话作为键,我从另一个工具(我们称之为feeder工具)获取它来捕获事务。
一旦从feeder获得会话,我立即运行代码。
问题
我可以从Kafka那里获取多个记录,但是我没有看到我试图使用会话筛选的记录。
代码

from kafka import KafkaConsumer
import json
SESSION = 'sessionID'
def consumeRecords(topic,group,bootstrapserver,auto_offset_reset,mySession,auto_commit_boolean):
    consumer = KafkaConsumer(topic,group_id=group,bootstrap_servers=bootstrapserver,auto_offset_reset=auto_offset_reset,enable_auto_commit=auto_commit_boolean)
    consumer.topics()
    consumer.seek_to_beginning()
    try:
        while True:
            print("CALLING POLL")
            records = consumer.poll(timeout_ms=1000)
            print("RETURNED FROM POLL")
            if records:
                for consumedRecord in records.values():
                    for msg in consumedRecord:
                        json_data = json.loads(msg.value.decode('utf-8'))
                        #print(json_data)
                        if 'alias' in json_data.keys() and json_data['alias']=='myServer':

                            current_session = json_data[SESSION]
                            print("SESSION is :" , current_session)
                            if mySession == current_session :
                                print('My record is ', json_data)
    except Exception as e:
        print("Unable to find any related sessions")
        print(e)

if __name__ == '__main__':
    KAFKA_TOPIC = 'e-commerce.request'
    KAFKA_GROUP = 'test'
    KAFKA_BROKERS = ['ABC.net:9092', 'DEF:9092']
    auto_commit = False
    consumeRecords(KAFKA_TOPIC,KAFKA_GROUP,KAFKA_BROKERS,'earliest','38l87jondkvnefNW886QMTWVcN6S4my5Y-No167ZzqF',auto_commit)

我应该打印从kafka使用的以下json数据,但是我的代码没有获取这个记录,因此没有打印任何内容,并且运行了无限长的时间

{'Type': 'request', 'requestID': '2018100819564659-5', 'payload': {'timing': {'startTime': '20181008195624322', 'total': '0.063', 'totalActions': '0', 'jsp': '0.063'}, 'user': {'orgID': '', 'userID': '', 'newComer': 'FALSE', 'helpdeskUserID': '', 'helpdeskUserOrgID': '', 'travelerID': ''}, 'version': '1.0.0', 'client': {'referrer': '', 'ip': ''}, 'url': {'parameters': {'JSESSIONID': '38l87jondkvnefNW886QMTWVcN6S4my5Y-No167ZzqF!430553578!-1652153437'}, 'baseUrl': 'http://server_url', 'path': 'DUMMY', 'method': 'POST'}, 'actions': [{'cumulTiming': '0', 'name': 'OverrideServlet', 'isChained': 'FALSE', 'features': '[GlobalTimeSpent = 0][PRE_RULES = 0][POST_RULES = 0]', 'chainParent': ''}], 'context': {'sessionSize': '12|12', 'fatalError': 'FALSE', 'requestType': 'XML', 'error': [], 'requestedSessionID': '', 'templateName': ''}}, 'Hostname': 'DummyAgain', 'sessionID': '38l87jondkvnefNW886QMTWVcN6S4my5Y-No167ZzqF', 'Site': 'ABCDEFGH', 'ClientId': 1234551515439, 'Epoch': 1539028584353, 'IP': 'A.B.C.D', 'alias': 'myServer', 'SeqNb': 21845, 'Source': 'eCommerce'}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题