我正在使用3个kafka代理进行多节点合流kafka设置,其中ksql和schema注册表有单独的节点,并在单独的节点中运行connector。所有这些经纪人之间的Map都是成功的。
我正在尝试使用汇合的postgresql接收器连接器将avro格式的数据从ksql流主题“twitter-csv\u avro”加载到postgresql表中。这是你的名字 postgres-sink.properties
文件
# key.converter=org.apache.kafka.connect.storage.StringConverter
# key.converter.schema.registry.url=http://host:8081
# value.converter=io.confluent.connect.avro.AvroConverter
# value.converter.schema.registry.url=http://host:8081
# schema.registry.url=http://host:8081
# Postgres sink connector id name which must be unique for each connector
name=twitter-streaming-postgres-sink
# Name of the connector class to be run
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
# Max number of tasks to spawn for this connector instance
tasks.max=1
# Kafka topic name which is input data to postgresql table
topics=TWITTER_CSV_AVRO
# Postgresql Database connection details
connection.url=jdbc:postgresql://host:5432/d2insights?user=postgres&password=*******
key.converter.schemas.enable=false
value.converter.schemas.enable=true
auto.create=false
auto.evolve=true
key.ignore=true
# Postgresql Table name
table.name.format=twitter
# Primary key configuration that
# pk.mode=none
# record_key=??
这是你的名字 connect-avro-standalone.properties
文件
bootstrap.servers=192.168.39.17:9092,192.168.39.18:9092,192.168.39.19:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://192.168.39.27:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://192.168.39.27:8081
value.converter.enhanced.avro.schema.support=true
offset.storage.file.filename=/tmp/connect.offsets
rest.port=9080
plugin.path=/usr/share/java/kafka-connect-jdbc
就像我用的 confluent-5.2.1
版本,我使用的是默认的postgresql-42.2.10.jar /usr/share/java/kafka-connect-jdbc
以及 postgresql-10.12
版本数据库。
下面是我用来运行连接器的命令
connect-standalone /etc/schema-registry/connect-avro-standalone-postgres.properties /etc/kafka/connect-postgresql-sink1.properties
虽然postgresqljar存在,但我得到了以下例外
[2020-08-25 15:42:29,378] INFO Unable to connect to database on attempt 2/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:99)
java.sql.SQLException: No suitable driver found for jdbc:postgresql://192.168.39.20:5432/d2insights
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:224)
at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:93)
at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:62)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:56)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
有人能告诉我我做错了什么吗。
暂无答案!
目前还没有任何答案,快来回答吧!