Kafka Debezium - Oracle连接器-服务未启动

6tdlim6h  于 2023-04-05  发布在  Apache
关注(0)|答案(1)|浏览(291)

DebeziumEngine正在查找kafka主题,尽管我没有为offset.storage指定KafkaOffsetBackingStore
参考:DebeziumEngine Config
配置

Configuration config = Configuration.create()
      .with("name", "oracle_debezium_connector")
      .with("connector.class", "io.debezium.connector.oracle.OracleConnector")
      .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
      .with("offset.storage.file.filename", "/Users/dk/Documents/work/ACET/offset.dat") 
      .with("offset.flush.interval.ms", 2000)
      .with("database.hostname", "localhost") 
      .with("database.port", "1521")
      .with("database.user", "pravin")
      .with("database.password", "*****")
      .with("database.sid", "ORCLCDB")
      .with("database.server.name", "mServer")
      .with("database.out.server.name", "dbzxout")
      .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
      .with("database.history.file.filename", "/Users/dk/Documents/work/ACET/dbhistory.dat")          
      .with("topic.prefix","cycowner")
      .with("database.dbname", "ORCLCDB")
      .build();

Debezium引擎

DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
      .using(config.asProperties())
      .using(connectorCallback)
      .using(completionCallback)
      .notifying(record -> {
            System.out.println(record);
        })
      .build();

错误:

2022-10-29T16:06:16,457 ERROR [pool-2-thread-1] i.d.c.Configuration: The 'schema.history.internal.kafka.topic' value is invalid: A value is required
2022-10-29T16:06:16,457 ERROR [pool-2-thread-1] i.d.c.Configuration: The 'schema.history.internal.kafka.bootstrap.servers' value is invalid: A value is required**
2022-10-29T16:06:16,458 INFO  [pool-2-thread-1] i.d.c.c.BaseSourceTask: Stopping down connector
2022-10-29T16:06:16,463 INFO  [pool-3-thread-1] i.d.j.JdbcConnection: Connection gracefully closed
2022-10-29T16:06:16,465 INFO  [pool-2-thread-1] o.a.k.c.s.FileOffsetBackingStore: Stopped FileOffsetBackingStore
connector stopped successfully
---------------------------------------------------
success status: false, message : Unable to initialize and start connector's task class 'io.debezium.connector.oracle.OracleConnectorTask' with config: {connector.class=io.debezium.connector.oracle.OracleConnector, database.history.file.filename=/Users/dkuma416/Documents/work/ACET/dbhistory.dat, database.user=pravin, database.dbname=ORCLCDB, offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore, database.server.name=mServer, offset.flush.timeout.ms=5000, errors.retry.delay.max.ms=10000, database.port=1521, database.sid=ORCLCDB, offset.flush.interval.ms=2000, topic.prefix=cycowner, offset.storage.file.filename=/Users/dkuma416/Documents/work/ACET/offset.dat, errors.max.retries=-1, database.hostname=localhost, database.password=********, name=oracle_debezium_connector, database.out.server.name=dbzxout, errors.retry.delay.initial.ms=300, value.converter=org.apache.kafka.connect.json.JsonConverter, key.converter=org.apache.kafka.connect.json.JsonConverter, database.history=io.debezium.relational.history.MemoryDatabaseHistory}, **Error: Error configuring an instance of KafkaSchemaHistory; check the logs for details**
dwbf0jvd

dwbf0jvd1#

出于某种原因,您应该提供topic.prefixschema.history.internalschema.history.internal.file.filename属性,正如它们的文档中所提到的那样
例如:

.with("topic.prefix", "topic")
.with("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory")
.with("schema.history.internal.file.filename", "/Users/dk/Documents/work/ACET/schistory.dat")

相关问题