使用kafka jdbc sink将数据加载到oracle表中

hgb9j2n6  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(554)

我正在尝试使用jdbc接收器连接器将数据从kafka加载到oracle,以复制confluent网站中提到的示例:
https://docs.confluent.io/current/connect/connect-jdbc/docs/sink_connector.html

name=jdbc-sink
        connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
        tasks.max=1

        # The topics to consume from - required for sink connectors like this one
        topics=orders

        # Configuration specific to the JDBC sink connector.
        # We want to connect to a SQLite database stored in the file test.db and 
        auto-create tables.
        #connection.url=jdbc:sqlite:test.db
        connection.url=jdbc:oracle:thin:@XXXX:XXXX/XXXXX
        connection.user=XXXX
        connection.password=XXXXX
        auto.create=true
        auto.evolve=true
        pk.mode=record_value
        insert.mode=insert
        pk.fields=id
        #fields.whitelist=product,quantity,price
        batch.size=0

错误


# ./confluent status jdbc-sink

    {"name":"jdbc-sink","connector":{"state":"RUNNING","worker_id":"10.87.40.165:8083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
            at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, name='product', isPrimaryKey=false}, as it is not optional and does not have a default value
            at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:137)
            at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:73)
            at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:78)
            at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
            at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:69)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
            ... 10 more
    ","id":0,"worker_id":"10.87.40.165:8083"}],"type":"sink"}

我可以看到使用使用者消耗的数据,但数据没有加载到oracle表中。我把主题名改成大写,试过了,但还是没用。我添加了 auto.evolve=true 也可以选择,但不起作用。这里也有类似的帖子,我没怎么帮忙就浏览了。
我只是在做表格的自动创建。我可以看到在oracle中创建的表,但没有数据。

erhoui1w

erhoui1w1#

查看stacktrace,以下是错误:

Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, 
name='product', isPrimaryKey=false}, as it is not optional and does not have 
a default value

所以你要加载的数据 product 在其中,没有默认值,并且目标表没有该列。

相关问题