我将jdbc接收器连接器用于以下配置:
curl -s -X POST http://********:8083/connectors -H "Content-Type: application/json" --data '{
"name":"mysql-sensor-sink-connector-02",
"config": {
"tasks.max":"2",
"batch.size":"1000",
"batch.max.rows":"1000",
"poll.interval.ms":"500",
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://**********:3306/device_details",
"table.name.format":"tb_sensordata",
"topics":"sensor_data_topic",
"connection.user":"*******",
"connection.password":"********",
"insert.mode":"insert",
"auto.create":"true",
"auto.evolve":"true",
"pk.mode":"record_value",
"pk.fields":"packetid",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false"
}
}' | jq '.'
我得到的错误是:
[2020-04-18 09:42:16,444] INFO ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [cpnode.local.lan:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = confluent.monitoring.interceptor.connector-consumer-mysql-sensor-sink-connector-03-1
compression.type = lz4
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 500
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 10485760
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 10
retry.backoff.ms = 500
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
(org.apache.kafka.clients.producer.ProducerConfig:347)
[2020-04-18 09:42:16,457] INFO Kafka version: 5.4.0-ce (org.apache.kafka.common.utils.AppInfoParser:117)
[2020-04-18 09:42:16,457] INFO Kafka commitId: ca78a82127cbef3a (org.apache.kafka.common.utils.AppInfoParser:118)
[2020-04-18 09:42:16,458] INFO Kafka startTimeMs: 1587183136457 (org.apache.kafka.common.utils.AppInfoParser:119)
[2020-04-18 09:42:16,458] INFO interceptor=confluent.monitoring.interceptor.connector-consumer-mysql-sensor-sink-connector-03-1 created for client_id=connector-consumer-mysql-sensor-sink-connector-03-1 client_type=CONSUMER session= cluster=dy7maqKlQBysl4HTbJuIEQ group=connect-mysql-sensor-sink-connector-03 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:153)
[2020-04-18 09:42:16,470] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:87)
[2020-04-18 09:42:16,602] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:49)
[2020-04-18 09:42:16,765] ERROR WorkerSinkTask{id=mysql-sensor-sink-connector-03-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:559)
java.lang.NullPointerException
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-04-18 09:42:16,766] ERROR WorkerSinkTask{id=mysql-sensor-sink-connector-03-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
... 10 more
[2020-04-18 09:42:16,766] ERROR WorkerSinkTask{id=mysql-sensor-sink-connector-03-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-04-18 09:42:16,766] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:105)
[2020-04-18 09:42:16,767] INFO Closing connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:113)
主题中的数据是json格式,没有模式。
运行console consumer时,我可以看到如下数据:
kafka-console-consumer --bootstrap-server*********:9092 --topic sensor_data_topic --from-beginning | jq '.'
{
"packetid": 1501,
"macaddress": "F8-27-B9-C7-AB-99",
"readingtypeid": "temperature",
"readingvalue": 70.1,
"setpoint": 77.2,
"threshold": 7,
"readingtime": "2019-05-20 02:46:30",
"logtime": "2019-09-01 19:32:06"
}
{
"packetid": 1502,
"macaddress": "2B-6A-C1-45-86-ED",
"readingtypeid": "temperature",
"readingvalue": 65.21,
"setpoint": 77.06,
"threshold": 7,
"readingtime": "2019-05-17 03:39:18",
"logtime": "2020-04-05 06:37:45"
}
mysql表如下:
+-----------------+-------------+------+-----+-------------------+-------------------+
| Field | Type | Null | Key | Default | Extra |
+-----------------+-------------+------+-----+-------------------+-------------------+
| packetid | bigint | NO | PRI | NULL | |
| macaddress | varchar(20) | NO | MUL | NULL | |
| readingtypeid | bigint | NO | MUL | NULL | |
| readingvalue | float | YES | | NULL | |
| setpoint | float | YES | | NULL | |
| threshold | float | YES | | NULL | |
| lastupdatedtime | timestamp | NO | | NULL | |
| logtime | timestamp | YES | | CURRENT_TIMESTAMP | DEFAULT_GENERATED |
+-----------------+-------------+------+-----+-------------------+-------------------+
谁能帮我把我的数据放到mysql里。提前谢谢。
ps:我使用的版本是:confluent platform 5.4.0
1条答案
按热度按时间7uzetpgm1#
如果要使用接收器连接器,则必须定义模式。这可以通过使用avro和schema registry或者json和schema来实现。
如果你想坚持
JsonConverter
,只需确保启用了架构: