我正在尝试使用aws lambda读取kafka主题(aws msk)中的值。
从lambda打印的事件记录如下所示:
{'eventsource':'aws:kafka','eventsourcearn':'arn:aws:kafka:ap-northeast-1:987654321:cluster/mskcluster/79y80c66-813a-4f-af0e-4ea47ba107e6','records':{'transactions-0':[{'topic':'transactions','partition':0,'offset':4798,'timestamp':1603565835915,'timestamptype':'create\u time','value':'eyjfdmvudfrpbwuioiiaimjaymc0xmc0yncaxodo1nzoxns45mtuzmjqilcaisvaiaiaiaimtgwlji0ms4xntkumje4iiwgikfjy291bnrodw1izxiioiiwimtq2oda4odyilcaivxnlck5hbwuioi67iqw1izxigum9tyxjviiwgikftb3vudci6ici1ntyiwgilyyyyw5zwn0aw9usuqioiaitzi4qlg3tlbjbwzmxexwcidb3vuthj5ijogik9tyw4ifq='}]}
如何提取“topic”和“value”字段?值1是base64编码的。我得到以下错误:
名称错误:未定义名称“record”
我正在尝试以下代码:
import json
import base64
def lambda_handler(event, context):
print(event)
message = event['records']
payload=base64.b64decode(record["message"]["value"])
print("Decoded payload: " + str(payload))
msk事件结构示例
1条答案
按热度按时间6tr1vspr1#
在代码片段中
record
试图传递给解码函数的变量不存在。迭代记录的示例如下:每个函数调用都包含每个主题的多个记录。如果你有多个类似的
Transactions-1
,...