在一个任务中,我序列化了一个dict(将dict转换为字符串)并将其推送到XCOMresult[data] = json.dumps({"agents": ["john.doe@example.com"], "houses": ["jane.doe@example.com"]})
在Airflow的UI中,它看起来像一个字符串,而DAG级别,我得到的是值XCOM_DATA = "{{ task_instance.xcom_pull(task_ids='task_name', key='return_value')['data']}}"
但是当一个k8s pod操作符,由于某种原因,它删除了双引号(“)
KubernetesPodOperator(
cmds=["python", "src/send_notification.py"],
arguments=[
"--data",
XCOM_DATA,
],
task_id="notify",
name="notify",
dag=dag,
**COMMON_TASKS_ARGS,
)
UPDATE:这是传递给K8S的命令
['. /secrets/env; python /home/app/src/send_notification.py '
'"--pr" '
'"https://gitlab.com/30675" "--data" '
'"{"agents": ["john.doe@example.com", '
'"jane.doe@example.com"]}" '
当我在脚本中反序列化它时,我得到这个错误json.loads(args.data)
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/local/lib/python3.9/json/decoder.py", line 353, in raw_decode
obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
我打印了值,由于某种原因出现了错误airflow删除了字符串的双引号,这就是出现错误的原因。有人能帮我纠正一下吗?
- 我用了像安全或逃跑之类的忍者方法
- 我在Jinja中转换为json
1条答案
按热度按时间pcww981p1#
您正在向CLI发送字符串。在这个字符串中,由于JSON格式,你需要添加双引号。
在这种情况下,您需要使用转义文字发送它