我正在尝试使用这里提供的指南构建一个定制的transformer应用程序https://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#streams-开发人员指南
我已经在我的windows机器上启动了kafka。我有一个运行在windows机器上的http源,它将数据写入目标。命令: java -Dserver.port=8123 -Dhttp.path-pattern=/data -Dspring.cloud.stream.bindings.output.destination=transformData -jar http-source-kafka-10-1.3.1.RELEASE.jar
我已经运行了transform应用程序,它从transformdata读取输入并输出到目标transformeddata命令 java -Dserver.port=8090 -Dspring.cloud.stream.bindings.input.destination=transformData -Dspring.cloud.stream.bindings.output.destination=transformedData -jar transformer-0.0.1-SNAPSHOT.jar
我运行了从目标transformeddata命令读取的日志接收器 java -Dserver.port=8888 -Dspring.cloud.stream.bindings.input.destination=transformedData -jar log-sink-kafka-10-1.3.1.RELEASE.jar
问题:当我尝试发送此curl请求时: curl -H "Content-Type: application/json" -X POST -d '{"id":"1", "temp":"400"}' http://172.20.24.47:8123/data
在custom transformer控制台上,我看到错误:
原因:com.fasterxml.jackson.core.jsonparseexception:无法识别的令牌'▒': 在[source:(byte[])”处应为('true'、'false'或'null')?contenttype“text/plain”originalcontenttype“应用程序/json;字符集=utf-8“{”id“:”1“,”temp“:”400“}”;行:1,列:4]位于com.fasterxml.jackson.core.jsonparser.\u constructerror(jsonparser)。java:1804)~[jackson-core-2.9.6.jar!/:2.9.6]在com.fasterxml.jackson.core.base.parserminimalbase.\u reporterror(parserminimalbase。java:679)~[jackson-core-2.9.6.jar!/:2.9.6]位于com.fasterxml.jackson.core.json.utf8streamjsonparser.\u reportinvalidtoken(utf8streamjsonparser)。java:3526)~[jackson-core-2.9.6.jar!/:2.9.6]位于com.fasterxml.jackson.core.json.utf8streamjsonparser.\u handleunexpectedvalue(utf8streamjsonparser)。java:2621)~[jackson-core-2.9.6.jar!/:2.9.6]位于com.fasterxml.jackson.core.json.utf8streamjsonparser.\u nexttokennotinobject(utf8streamjsonparser)。java:826)~[jackson-core-2.9.6.jar!/:2.9.6]位于com.fasterxml.jackson.core.json.utf8streamjsonparser.nexttoken(utf8streamjsonparser。java:723)~[jackson-core-2.9.6.jar!/:2.9.6]在com.fasterxml.jackson.databind.objectmapper.\u initforreading(objectmapper。java:4141)~[jackson-databind-2.9.6.jar!/:2.9.6]在com.fasterxml.jackson.databind.objectmapper.\u readmapandclose(objectmapper。java:4000)~[jackson-databind-2.9.6.jar!/:2.9.6]在com.fasterxml.jackson.databind.objectmapper.readvalue(objectmapper。java:3121)~[jackson-databind-2.9.6.jar!/:2.9.6]位于org.springframework.cloud.stream.converter.applicationjsonmessagemarshallingconverter.convertparameterizedtype(applicationjsonmessagemarshallingconverter)。java:114)~[spring-cloud-stream-2.0.0.release.jar!/:2.0.0.版本]。。。省略37个公共框架
有人能帮忙吗?
2条答案
按热度按时间monwx1rj1#
看起来您正在使用SpringCloudStream2.0.0.release,但您的应用程序是1.3.x。你能设定吗
spring.cloud.stream.bindings.input.consumer.headerMode
至embeddedHeaders
在处理器应用程序中出现故障?在2.0中,默认情况下不嵌入头,因为kafka支持开箱即用。但是,在2.0之前的版本(1.3.x应用程序使用)中,默认是嵌入头。在使用这个组合时,需要显式地设置它。ubby3x7f2#
我终于成功了。当使用spring初始化器而不是选择2.0.4release作为starter构建自定义应用程序时,我恢复到1.5.15release。现在我不再需要在订阅者端传递属性,订阅者端是自定义应用程序和使用headermodes设置为embeddedheaders的logger sink应用程序。