jsonparser错误架构

68bkxrlz  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(280)

**结束。**此问题需要详细的调试信息。它目前不接受答案。
**想改进这个问题吗?**更新问题,使其成为堆栈溢出的主题。

两年前关门了。
改进这个问题
我正在为Kafka使用hbase接收器连接器(https://github.com/mravi/kafka-connect-hbase)使用jsoneventparser,我遇到了一些问题。连接器返回错误:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

但是我确信我已经在producer消息中包含了schema。以下是我的消息示例:

{"schema":{"type": "struct","optional": false,"name": "record","fields": [{"type": "string","optional": true, "field": "id"},{"type": "string","optional":true,"field": "name"}]},"payload": {"id": "8","name": "test"}}

这是我的属性文件:

bootstrap.servers=xxxx.xxxx.xx.xx:xxxx

    group.id=connect-cluster

    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter

    key.converter.schemas.enable=true
    value.converter.schemas.enable=true

    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false

    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=1

    config.storage.topic=connect-configs
    config.storage.replication.factor=1

    status.storage.topic=connect-status
    status.storage.replication.factor=1

    offset.flush.interval.ms=10000

    # These are provided to inform the user about the presence of the REST host and port configs 
    # Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
    rest.host.name=xxxxx.xxx.xxx.xx
    rest.port=xxxx

    # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
    rest.advertised.host.name=xxxxx.xxx.xxx.xx
    rest.advertised.port=xxxx

    /* authentication Kerberos */

    # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
    # (connectors, converters, transformations). The list should consist of top level directories that include 
    # any combination of: 
    # a) directories immediately containing jars with plugins and their dependencies
    # b) uber-jars with plugins and their dependencies
    # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
    # Examples: 
    #

有人知道为什么会抛出这个错误吗?

ioekq8ef

ioekq8ef1#

此问题可能是由于密钥转换器类型造成的。目前您已指定:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

这意味着,键和值都是json类型。如果键没有模式和负载,它将抛出错误。
因此,在null或字符串键类型的情况下使用字符串转换器。

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

相关问题