目前我有一个python脚本,它从一个对象中获取数据并修改它,然后将其生成到一个Kafka主题,它看起来像这样(没有逻辑函数):
def produce_data(data):
record_value = json.dumps(data)
print("Producing record: {}\t{}".format("timeseries data", record_value))
time.sleep(0)
producer.produce(topic, key="timeseries data", value=record_value, on_delivery=ack)
producer.poll(0)
producer.flush()
print("{} messages were produced to topic {}!".format(delivered_records, topic))
def generate_data():
update_rover_samples()
for rover_data in rovers:
print("thread finished...exiting")
produce_data(rover_data)
if __name__ == '__main__':
conf = {'bootstrap.servers': '127.0.0.2:9092'}
producer = Producer(conf)
delivered_records = 0
create_topic()
def ack(err, msg):
global delivered_records
"""
Message Delivered Successfully!!!
"""
if err is not None:
print("Failed to deliver message: {}".format(err))
else:
delivered_records += 1
print("Produced record to topic {} partition [{}] @ offset {}"
.format(msg.topic(), msg.partition(), msg.offset()))
generate_data()
字符串
现在我可以调用generate_data()4次,它会更新它4次,但是在生产者完成并再次调用之后,脚本会生成新的数据。我知道我可以将它链接到数据库,但是我只有一个对象需要更新,所以我想知道是否有什么可以使用环境变量,或者是否有任何特定的方法来存储它,这将是最好的。
对Kafka来说很新鲜,所以任何建议都会有帮助!
2条答案
按热度按时间bwitn5fc1#
我不知道Kafka,但如果它只是一个值,你想存储和读取,只需创建一个文件,没有扩展名(或.txt)和读/写它。
https://www.w3schools.com/python/python_file_write.asp
如果你想以有组织的方式存储更多的数据,请阅读有关.json文件的信息。
cbjzeqam2#
一个很好的从Python打开、修改和写出文件的shell应该是这样的。在这里,你可以修改文件而不需要修改原始文件。你可以根据你保存的对象的结构来修改它。另外,ALL CAPS在这里只是占位符。
字符串