使用Kafka Sink Connector的Oracle DB问题

qpgpyjmq  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(128)

我不熟悉接收器连接器。当我使用接收器连接器接收数据时,我收到以下错误。

https://docs.oracle.com/error-help/db/ora-01400/
Error : 1400, Position : 0, SQL = INSERT INTO "RENTED_PRODUCT"("PRODUCT_NO","RENT_START_DATE","RENT_END_DATE","OWNER_NICKNAME","BORROWER_NICKNAME","STATUS","REVIEW_STATUS") VALUES(:1 ,:2 ,:3 ,:4 ,:5 ,:6 ,:7 ), Original SQL = INSERT INTO "RENTED_PRODUCT"("PRODUCT_NO","RENT_START_DATE","RENT_END_DATE","OWNER_NICKNAME","BORROWER_NICKNAME","STATUS","REVIEW_STATUS") VALUES(?,?,?,?,?,?,?), Error Message = ORA-01400: cannot insert NULL into ("TEST1"."RENTED_PRODUCT"."RENTED_PRODUCT_NO")

        at io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:165)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:111)
        ... 12 more
[2023-12-07 10:20:17,979] ERROR [rentedproduct2-sink-connect|task-0] WorkerSinkTask{id=rentedproduct2-sink-connect-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

字符串
连接器具有以下配置

{
    "name": "rentedproduct2-sink-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "RENTED_PRODUCT",
        "connection.url": "jdbc:oracle:thin:@localhost:1521/xe",
        "connection.user": "test1",
        "connection.password": "test1",
        "insert.mode": "insert",
        "pk.mode": "none",
        "auto.create": "false",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable":"false",
        "value.converter.schemas.enable":"true"
    }
}


方案和负载中不存在PK对应的RENTED_PRODUCT_NO
发送到主题的数据格式如下

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int64",
                "optional": true,
                "field": "PRODUCT_NO"
            },
            {
                "type": "string",
                "optional": true,
                "field": "RENT_START_DATE"
            },
            {
                "type": "string",
                "optional": true,
                "field": "RENT_END_DATE"
            },
            {
                "type": "string",
                "optional": true,
                "field": "OWNER_NICKNAME"
            },
            {
                "type": "string",
                "optional": true,
                "field": "BORROWER_NICKNAME"
            },
            {
                "type": "string",
                "optional": true,
                "field": "STATUS"
            },
            {
                "type": "string",
                "optional": true,
                "field": "REVIEW_STATUS"
            }
        ],
        "optional": false,
        "name": "RENTED_PRODUCT"
    },
    "payload": {
        "product_no": 1,
        "rent_start_date": "2023-12-12 11:00",
        "rent_end_date": "2023-12-15 12:00",
        "owner_nickname": "Owener",
        "borrower_nickname": "Borrower",
        "status": "Using",
        "review_status": "None"
    }
}


请帮帮我,我一天都找不到原因
使用JPA创建表,

create table rented_product 
(rented_product_no number(19,0) not null, 
borrower_nickname varchar2(255 char) not null, 
owner_nickname varchar2(255 char) not null, 
product_no number(19,0) not null, 
rent_end_date timestamp not null, 
rent_start_date timestamp not null, 
review_status varchar2(255 char), 
status varchar2(255 char), 
primary key (rented_product_no))


实体的一部分

@Id
    @SequenceGenerator(
            name = "SEQ_GENERATOR",
            sequenceName = "RENTEDPRODUCT_SEQ",
            allocationSize = 1
    )
    @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "SEQ_GENERATOR")
    private Long rentedProductNo;

    @Column(nullable = false)
    private Long productNo;

    @Column(nullable = false)
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm", timezone = "Asia/Seoul")
    private LocalDateTime rentStartDate;

    @Column(nullable = false)
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm", timezone = "Asia/Seoul")
    private LocalDateTime rentEndDate;

    @Column(nullable = false)
    private String ownerNickname;

    @Column(nullable = false)
    private String borrowerNickname;

    @Enumerated(EnumType.STRING)
    private Status status;

    @Enumerated(EnumType.STRING)
    private ReviewStatus reviewStatus;


当我使用MySQL时,它运行得很好,但是当我们继续到Oracle时,使用序列的PK一直在进入MySQL。

exdqitrt

exdqitrt1#

你使用的SQL是:

INSERT INTO "RENTED_PRODUCT"("PRODUCT_NO","RENT_START_DATE","RENT_END_DATE","OWNER_NICKNAME","BORROWER_NICKNAME","STATUS","REVIEW_STATUS") VALUES(:1 ,:2 ,:3 ,:4 ,:5 ,:6 ,:7 )

字符串
你得到的错误是:

ORA-01400: cannot insert NULL into ("TEST1"."RENTED_PRODUCT"."RENTED_PRODUCT_NO")


这是因为您没有在要插入的列中包含RENTED_PRODUCT_NO,因此使用了该列的默认值,即NULL
您声明该实体包含:

@Id
@SequenceGenerator(
  name = "SEQ_GENERATOR",
  sequenceName = "RENTEDPRODUCT_SEQ",
  allocationSize = 1
)


在这种情况下,查询(假设您手动提供)应该包括主键列:

INSERT INTO "RENTED_PRODUCT (
  "RENTED_PRODUCT_NO",
  "PRODUCT_NO",
  "RENT_START_DATE",
  "RENT_END_DATE",
  "OWNER_NICKNAME",
  "BORROWER_NICKNAME",
  "STATUS",
  "REVIEW_STATUS"
) VALUES(
  "RENTEDPRODUCT_SEQ".NEXTVAL,
  :1,
  :2,
  :3,
  :4,
  :5,
  :6,
  :7
)


或者,在Oracle 12中,您可以在表中使用IDENTITY列:

CREATE TABLE rented_product (
  rented_product_no number(19,0)
                    GENERATED ALWAYS AS IDENTITY
                    not null, 
  borrower_nickname varchar2(255 char) not null, 
  owner_nickname    varchar2(255 char) not null, 
  product_no        number(19,0) not null, 
  rent_end_date     timestamp not null, 
  rent_start_date   timestamp not null, 
  review_status     varchar2(255 char), 
  status            varchar2(255 char), 
  primary key (rented_product_no)
)


那么你的查询应该起作用。

相关问题