我试图使用汇合的过滤器smt与debezium的例子展开smt。
我将以下配置添加到源连接器(debezium mysql)配置中:
"transforms": "route,csFilter",
...
...
"transforms.csFilter.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.csFilter.filter.condition": "$.payload.after.source == 2",
"transforms.csFilter.filter.type": "exclude",
"transforms.csFilter.missing.or.null.behavior": "fail"
因为这个过滤器smt是由confluent提供的,所以我下载了jar文件并将(connect transforms、connect utils、json path)jar文件复制到 path-to-kafka/connect/debezium-connector-mysql
目录。
当我试图注册debezium mysql源代码连接器时,
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json"
localhost:8083/connectors/ -d @source_connector_config.json
我有个错误:
{"error_code":400,
"message":"Connector configuration is invalid and contains the following 1 error(s):\n
Invalid value $.payload.after.source == 2 for configuration filter.condition: Invalid json path defined.
Please refer to https://github.com/json-path/JsonPath README for correct use of json path.\n
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}
我用本指南中提供的示例检查了json路径表达式。好像没事。
你能给我指一下正确的方向吗?我错过了什么?谢谢。
1条答案
按热度按时间xeufq47z1#
请尝试使用此条件:
$.payload.after[?(@.source == 2)]