我正在读取Kafka主题的流数据,我想将其中的一些部分存储在一个Dataframe中。
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': "###",
'group.id': '###',
'default.topic.config': {
'auto.offset.reset': 'latest' }
})
c.subscribe(['scorestore'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
收到的消息是json
{
"messageHeader" : {
"messageId" : "4b604b33-7256-47b6-89d6-eb1d92a282e6",
"timestamp" : 152520000,
"sourceHost" : "test",
"sourceLocation" : "test",
"tags" : [ ],
"version" : "1.0"
},
"id_value" : {
"id" : "1234",
"value" : "333.0"
}
}
例如,我尝试创建一个包含timestamp、id和value列的Dataframe
timestamp id value
0 152520000 1234 333.0
有没有一种方法可以做到这一点,而不必解析json消息并将我需要的值逐行附加到dataframe?
1条答案
按热度按时间ffvjumwh1#
我提出的解决方案可能有点棘手。假设您的json消息位于一个名为“msg\u str”的字符串中:
结果:
然后您可以将这个Dataframe附加到一个Dataframe中,在这个Dataframe中您正在累积您的结果。
也许有一个最简单的解决办法,但如果不是这样的话,我希望它能帮助你。