我有以下Kafka连接器配置:
{
"name": "some-topic-connector",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "some-topic",
"hdfs.url": "hdfs://hadoopams1",
"logs.dir": "apps/kafka-connect-preview/some-topic.logs",
"topics.dir": "apps/kafka-connect-preview/some-topic.db",
"hadoop.conf.dir": "/etc/hadoop/conf",
"flush.size": "1000000",
"rotate.interval.ms": "3600000",
"rotate.schedule.interval.ms": "86400000",
"hive.integration": "true",
"hive.metastore.uris": "thrift://metastore-1.hadoop-1.foobar.com:9083",
"hive.database": "preview",
"locale": "en_GB",
"timezone": "Europe/Berlin",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry.preview.foobar.com",
"schema.compatibility": "BACKWARD",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "86400000",
"path.format": "'dt'=YYYYMMdd",
"partition.field.name": "dt"
}
}
我已验证数据已成功写入hdfs,但由于某些原因,配置单元中的表未被创建。从日志中,我看不到Kafka连接中有任何错误。
我做错什么了?有什么配置或要求我遗漏了吗?
1条答案
按热度按时间kg7wmglp1#
有一个已知的问题(功能?)是
HdfsSinkConnector
不在配置单元中创建表,以防logs.dir
以及topics.dir
已经存在。例如,如果在连接器创建之后的某个时间点决定启用配置单元集成,则可能会发生这种情况。还有一个pull请求修复了这个问题,但是它还没有被合并。
所以不是吗
你自己造
HdfsSinkConnector
基于上面链接的拉取请求重命名目录,重新创建连接器,直到创建了配置单元表,然后将表移回(当然,在生产环境中很困难)
或者手动创建表