如何在Python中将JSON数据从WebSocket解析到变量

0yg35tkg  于 2023-08-05  发布在  Python
关注(0)|答案(1)|浏览(86)

我正在使用agi asterisk进行语音识别。它通过WebSocket连接并接收JSON响应。脚本收集单词并将它们粘合成短语。一切都在这个阶段工作。
我需要进一步使用收集的短语(发送到telegram,然后通过API发送到helpdesk系统),但如果我只是使用text作为变量,它就不起作用了。如何让 * text* 成为一个变量?

#!/usr/bin/python3
from asterisk.agi import *
import os
from websocket import create_connection
import json
import traceback
import requests

AUDIO_FD = 3
CONTENT_TYPE = 'audio/l16; rate=8000; channels=1'
ACCEPT = 'audio/pcm'

def telegram_bot_sendtext(text):
    bot_token = '6069wxts_nWcA'
    bot_chatID = '-10300'
    bot_message = text
    send_text = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + bot_chatID + '&reply_to_message_id=2&parse_mode=Markdown&text=' + bot_message
    response = requests.get(send_text)
    return response.json()

def process_chunk(agi, ws, buf):
    agi.verbose("Processing chunk")
    ws.send_binary(buf)
    res = json.loads(ws.recv())
    agi.verbose("Result: " + str(res))
    if 'result' in res:
        **text = " ".join([w['word'] for w in res['result']])**
        os.system("espeak -w /tmp/response22.wav \"" + text.encode('utf-8') + "\"")
        os.system("sox /tmp/response22.wav -r 8000 /tmp/response.wav")
        agi.stream_file("/tmp/response")
        os.remove("/tmp/response.wav")

def startAGI():
    agi = AGI()
    agi.verbose("EAGI script started...")
    ani = agi.env['agi_callerid']
    did = agi.env['agi_extension']
    agi.verbose("Call answered from: %s to %s" % (ani, did))
    ws = create_connection("ws://localhost:2700")
    ws.send('{ "config" : { "sample_rate" : 8000 } }')
    agi.verbose("Connection created")
    try:
        while True:
            data = os.read(AUDIO_FD, 8000)
            if not data:
                break
            process_chunk(agi, ws, data)
    except Exception as err:
        agi.verbose(''.join(traceback.format_exception(type(err), err, err.__traceback__)).replace('\n', ' '))
    try:
        telegram_bot_sendtext(text)
    except Exception as exc:
        print("Post_Auth_Script_Telega : Error : ", str(exc))
    finally:
        ws.close()

startAGI()

字符串
json看起来像

Result: {
'result': [
{'conf': 1.0, 'end': 2.07, 'start': 1.71, 'word': 'One'}, 
{'conf': 1.0, 'end': 2.34, 'start': 2.07, 'word': 'Two'}, 
], 
'text': 'One Two'}

gopyfrb3

gopyfrb31#

最终工作版本

#!/usr/bin/python3

from asterisk.agi import *
import os
from websocket import create_connection
import json
import traceback
import requests

AUDIO_FD = 3
CONTENT_TYPE = 'audio/l16; rate=8000; channels=1'
ACCEPT = 'audio/pcm'

def telegram_bot_sendtext(text):
    bot_token = '606922aNwxts_nWcA'
    bot_chatID = '-100100'
    bot_message = text 
    send_text = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + bot_chatID + '&reply_to_message_id=2&parse_mode=Markdown&text=' + bot_message
    response = requests.get(send_text)
    return response.json()

def process_chunk(agi, ws, buf):
    agi.verbose("Processing chunk")
    ws.send_binary(buf)
    res = json.loads(ws.recv())
    agi.verbose("Result: " + str(res))
    if 'result' in res:
        text = " ".join([w['word'] for w in res['result']])
        try:
            telegram_bot_sendtext(text)
        except Exception as exc:
            print("rror : ", str(exc))
        os.system("espeak -w /tmp/response22.wav \"" + text.encode('utf-8') + "\"")
        os.system("sox /tmp/response22.wav -r 8000 /tmp/response.wav")
        agi.stream_file("/tmp/response")
        os.remove("/tmp/response.wav")

def startAGI():
    agi = AGI()
    agi.verbose("EAGI script started...")
    ani = agi.env['agi_callerid']
    did = agi.env['agi_extension']
    agi.verbose("Call answered from: %s to %s" % (ani, did))
    ws = create_connection("ws://localhost:2700")
    ws.send('{ "config" : { "sample_rate" : 8000 } }')
    agi.verbose("Connection created")
    try:
        while True:
            data = os.read(AUDIO_FD, 8000)
            if not data:
                break
            process_chunk(agi, ws, data)
    except Exception as err:
        agi.verbose(''.join(traceback.format_exception(type(err), err, err.__traceback__)).replace('\n', ' '))
#    try:
#        telegram_bot_sendtext(text)
#    except Exception as exc:
#        print("rror : ", str(exc))
    finally:
        ws.close()

startAGI()

字符串

相关问题