kafka到pandas-dataframe-without-spark

ruyhziif  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(459)

我正在读取Kafka主题的流数据,我想将其中的一些部分存储在一个Dataframe中。

from confluent_kafka import Consumer, KafkaError

c = Consumer({
    'bootstrap.servers': "###",
    'group.id': '###',
    'default.topic.config': {
'auto.offset.reset': 'latest' }
})

c.subscribe(['scorestore'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

收到的消息是json

{
  "messageHeader" : {
    "messageId" : "4b604b33-7256-47b6-89d6-eb1d92a282e6",
    "timestamp" : 152520000,
    "sourceHost" : "test",
    "sourceLocation" : "test",
    "tags" : [ ],
    "version" : "1.0"
  },
  "id_value" : {
    "id" : "1234",
    "value" : "333.0"
  }
}

例如,我尝试创建一个包含timestamp、id和value列的Dataframe

timestamp   id  value
0   152520000   1234    333.0

有没有一种方法可以做到这一点,而不必解析json消息并将我需要的值逐行附加到dataframe?

ffvjumwh

ffvjumwh1#

我提出的解决方案可能有点棘手。假设您的json消息位于一个名为“msg\u str”的字符串中:

import pandas as pd

msg_str = '{  "messageHeader" : { "messageId" : "4b604b33-7256-47b6-89d6-eb1d92a282e6",    "timestamp" : 152520000,    "sourceHost" : "test",    "sourceLocation" : "test",    "tags" : [ ],    "version" : "1.0"  },  "id_value" : {    "id" : "1234",    "value" : "333.0"  }}'

# first create a dataframe with read_json

p = pd.read_json(msg_str)

# Now you have a dataframe with two columns. Where a column has a value, the other

# has a NaN. Now create a new column only with the values which are not 'NaN'

p['fussion'] = p['id_value'].fillna(p['messageHeader'])

# Delete columns 'id_value' and 'messageHeader' as you don't need them anymore

p = p[['fussion']].reset_index()

# Create a temporal column only to be the index to do a pivot

p['tmp'] = 0

# Do the pivot to convert rows into columns

p = p.pivot(index = 'tmp' ,values='fussion', columns='index')

# Finally get the columns that you are interested in

p = p.reset_index()[['timestamp','id','value']]

print(p)

结果:

index  timestamp    id value
0      152520000  1234   333

然后您可以将这个Dataframe附加到一个Dataframe中,在这个Dataframe中您正在累积您的结果。
也许有一个最简单的解决办法,但如果不是这样的话,我希望它能帮助你。

相关问题