kafka connect-extracttopic转换与hdfs sink connector抛出nullpointerexception

j8ag8udp  于 2021-05-31  发布在  Hadoop
关注(0)|答案(1)|浏览(840)

我使用汇合hdfs接收器连接器5.0.0与Kafka2.0.0,我需要使用extracttopic转换(https://docs.confluent.io/current/connect/transforms/extracttopic.html). 我的连接器工作得很好,但是当我添加这个转换时,我得到了nullpointerexception,即使在只有2个属性的简单数据样本上也是如此。

ERROR Task hive-table-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.NullPointerException
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:352)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

以下是连接器的配置:

name=hive-table-test
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=hive_table_test

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=${env.SCHEMA_REGISTRY_URL}
value.converter.schema.registry.url=${env.SCHEMA_REGISTRY_URL}
schema.compatibility=BACKWARD

# HDFS configuration

# Use store.url instead of hdfs.url (deprecated) in later versions. Property store.url does not work, yet

hdfs.url=${env.HDFS_URL}
hadoop.conf.dir=/etc/hadoop/conf
hadoop.home=/opt/cloudera/parcels/CDH/lib/hadoop
topics.dir=${env.HDFS_TOPICS_DIR}

# Connector configuration

format.class=io.confluent.connect.hdfs.avro.AvroFormat
flush.size=100
rotate.interval.ms=60000

# Hive integration

hive.integration=true
hive.metastore.uris=${env.HIVE_METASTORE_URIS}
hive.conf.dir=/etc/hive/conf
hive.home=/opt/cloudera/parcels/CDH/lib/hive
hive.database=kafka_connect

# Transformations

transforms=InsertMetadata, ExtractTopic
transforms.InsertMetadata.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertMetadata.partition.field=partition
transforms.InsertMetadata.offset.field=offset

transforms.ExtractTopic.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ExtractTopic.field=name
transforms.ExtractTopic.skip.missing.or.null=true

我使用的是模式注册表,数据是avro格式的,我确信给定的属性 name 不为空。有什么建议吗?我需要的基本上是提取给定字段的内容并将其用作主题名。
编辑:
甚至在avro格式的简单json上也会发生这种情况:

{
   "attr": "tmp",
   "name": "topic1"
}
dpiehjr4

dpiehjr41#

简单的回答是,您在转换中更改了主题的名称。
每个主题分区的hdfs连接器都有单独的 TopicPartitionWriter . 在中创建负责处理消息的sinktask时 open(...) 每个分区的方法 TopicPartitionWriter 已创建。
当它处理记录时,基于它查找的主题名和分区号 TopicPartitionWriter 并尝试将记录附加到其缓冲区。在您的情况下,它找不到任何write-for消息。主题名称已通过转换更改,对于该对(主题、分区),可以 TopicPartitionWriter 未创建。
记录,传递给 HdfsSinkTask::put(Collection<SinkRecord> records) ,已经设置了分区和主题,因此不必应用任何转换。
我想 io.confluent.connect.transforms.ExtractTopic 应该用来 SourceConnector .

相关问题