无法将kafka流接收到jdbc:不可恢复的异常

gr8qqesn  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(189)

我正在尝试从启用了模式的mqtt源接收一个数据流到microsoftsqlserver数据库。
我关注了很多关于这个问题的帖子,尽管我收到了以下错误:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

mqtt源配置为:

connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
key.converter.schemas.enable=true
value.converter.schemas.enable=true
name=schema-mqtt-source
connect.mqtt.kcql=INSERT INTO schema_IoT SELECT * FROM machine/sensor/mytopic/test WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` 
value.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.service.quality=0
key.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.hosts=tcp://host:1884

Kafka的数据样本如下:

{
        "timestamp": 1526912884265,
        "partition": 0,
        "key": {
            "schema": {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "topic"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    }
                ],
                "optional": false,
                "name": "com.datamountaineer.streamreactor.connect.converters.MsgKey"
            },
            "payload": {
                "topic": "machine/sensor/mytopic/test",
                "id": "0"
            }
        },
        "offset": 0,
        "topic": "schema_IoT",
        "value": {
            "schema": {
                "type": "struct",
                "fields": [
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "sentOn"
                    },
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "fan"
                            },
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "buzzer"
                            },
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "light"
                            },
                            {
                                "type": "double",
                                "optional": false,
                                "field": "temperature"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "assetName"
                            },
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "led"
                            },
                            {
                                "type": "boolean",
                                "optional": false,
                                "field": "water"
                            }
                        ],
                        "optional": false,
                        "name": "metrics",
                        "field": "metrics"
                    }
                ],
                "optional": false,
                "name": "machine_sensor_mytopic_test"
            },
            "payload": {
                "sentOn": 1526913070679,
                "metrics": {
                    "fan": 1,
                    "buzzer": 0,
                    "light": 255,
                    "temperature": 22.296352538102642,
                    "assetName": "SIMopcua",
                    "led": 0,
                    "water": false
                }
            }
        }
    }

最后是jdbc接收器连接器配置的属性。文件是:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=password
topics=schema_IoT
batch.size=10
key.converter.schemas.enable=true
auto.evolve=true
connection.user=username
name=sink-mssql
value.converter.schemas.enable=true
auto.create=true
connection.url=jdbc:sqlserver://hostname:port;databaseName=mydb;user=username;password=mypsd;
value.converter=org.apache.kafka.connect.json.JsonConverter
insert.mode=insert
key.converter=org.apache.kafka.connect.json.JsonConverter

我做错什么了?感谢您的帮助。
法比奥

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题