如何在python中从sqs json msg中提取嵌套值?

qmelpv7a  于 2023-01-01  发布在  Python
关注(0)|答案(1)|浏览(150)

我的aws链:在ec2上的lambda-〉sns-〉sqs-〉python脚本
python脚本给我错误,因为无法提取我需要的值,错误的结构,我认为,但我看不到错误。
我无法从sqs msg中的“vote”字段中获取值。如何操作?
测试事件结构(启用原始消息传送):

{
  "body": {
    "MessageAttributes": {
      "vote": {
        "Type": "Number",
        "Value": "90"
      },
      "voter": {
        "Type": "String",
        "Value": "default_voter"
      }
    }
  }
}

处理sqs msg的python脚本:

#!/usr/bin/env python3

import boto3
import json
import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.INFO)

queue = boto3.resource('sqs', region_name='us-east-1').get_queue_by_name(QueueName="erjan")
table = boto3.resource('dynamodb', region_name='us-east-1').Table('Votes')

def process_message(message):
    try:
        payload = json.loads(message.body) #unable to parse sqs json msg here
        #payload = message.body
        #payload = message['body']
        voter = payload['MessageAttributes']['voter']['Value'] #here the exception raised!
        vote  = payload['MessageAttributes']['vote']['Value']
        logging.info("Voter: %s, Vote: %s", voter, vote)
        store_vote(voter, vote)
        update_count(vote)
        message.delete()
    except Exception as e:
        print('x = msg.body')
        x = (message.body)
        print(x)
        print('-------')
        print('message.body')
        print(message.body)

        try:
            vote = x['MessageAttributes']['vote']['Value']
            logging.error("Failed to process message")
            logging.error('------- here: ' + str(e))
            logging.error('vote %d' % vote)
        except TypeError:
            logging.error("error catched")

def store_vote(voter, vote):
    try:
        logging.info('table put item.......')
        print('table put item......')
        response = table.put_item(
           Item={'voter': voter, 'vote': vote}
        )
    except:
        logging.error("Failed to store message")
        raise

def update_count(vote):
    logging.info('update count....')
    print('update count....')
    table.update_item(
        Key={'voter': 'count'},
        UpdateExpression="set #vote = #vote + :incr",
            ExpressionAttributeNames={'#vote': vote},
            ExpressionAttributeValues={':incr': 1}
    )

if __name__ == "__main__":
    while True:
        try:
            messages = queue.receive_messages()
        except KeyboardInterrupt:
           logging.info("Stopping...")
           break
        except:
            logging.error(sys.exc_info()[0])
            logging.info('here error - we continue')
            continue
        for message in messages:
            process_message(message)

我收到错误消息:

payload = json.loads(message)
  File "/usr/lib64/python3.7/json/__init__.py", line 341, in loads
    raise TypeError(f'the JSON object must be str, bytes or bytearray, '
TypeError: the JSON object must be str, bytes or bytearray, not sqs.Message

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./processor.py", line 90, in <module>
    process_message(message)
  File "./processor.py", line 40, in process_message
    vote = x['MessageAttributes']['vote']['Value']
TypeError: string indices must be integers

我理解它的sqs消息,并且比json有diff结构。我试过了

json.loads(message.body)

但是sqs消息是用空消息体生成的。即使上面的测试事件有“body”字段,它也不一样。当我只是调试和打印(消息)时
它给出以下输出:

sqs.Message(queue_url='https://sqs.us-east-1.amazonaws.com/025416187662/erjan', receipt_handle='AQEBAz3JiGRwss1ROOr2R8GkpBWwr7tJ1tUDUa7JurX7H6SxoF6gyj7YOxoLuU1/KcHpBIowon12Vle97mJ/cZFUIjzJon78zIIcVSVLrZbKPBABztUeE/Db0ALCMncVXpHWXk76hZVLCC+LHMsi8E5TveZ7+DbTdyDX
U6djTI1VcKpUjEoKLV9seN6JIEZV35r3fgbipHsX897IqTVvjhb0YADt6vTxYQTM1kVMEPBo5oNdTWqn6PfmoYJfZbT1GHMqphTluEwVuqBzux2kPSMtluFk3yk4XXwPJS304URJ7srMksUdoVTemA56OsksVZzXT4AcS8sm8Y3SO2PLLjZSV+7Vdc6JZlX7gslvVSADBlXw5BJCP/Rb9mA2xI9FOyW4')

我认为自动生成的sqs消息将其隐藏在某个地方

fhg3lkii

fhg3lkii1#

解决方案来自这个例子-在结尾它显示如何接收和处理消息-https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html
总的来说,我认为队列是空的,因为我在从队列中检索它时没有指定var 'messages'属性名称。

if __name__ == "__main__":
    while True:
        try:
            messages = queue.receive_messages() #this was empty
            messages = queue.receive_messages(MessageAttributeNames=['vote','voter']) #this is the solution - we have to specify the msg attr names
        except KeyboardInterrupt:
           logging.info("Stopping...")
           break
        except:
            logging.error(sys.exc_info()[0])
            continue
        for message in messages:
            process_message(message)

因为现在在调试中它确实显示了msg属性:

def process_message(message):
    try:
        payload = json.loads(message.body)
        print(type(payload))
        print(payload)
        print('-------------------MSG ATTR----------------------')
        print(message.message_attributes) #shows actual values!

相关问题