汇合hdfs连接器

hujrc8aj  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(368)

我想把kafka日志文件移到hadoop日志文件。所以我做了如下hdfs连接器配置
/快速启动-hdfs.properties

name=hdfs-sink 
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1 
topics=kafka_log_test 
hdfs.url=hdfs://10.100.216.60:9000 
flush.size=100000 
hive.integration=true 
hive.metastore.uris=thrift://localhost:9083 
schema.compatibility=BACKWARD 
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat 
partitioner.class=io.confluent.connect.hdfs.partitioner.Hour‌​lyPartitioner

/connect-avro-standalone.properties连接

bootstrap.servers=localhost:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

当我运行hdfs连接器时,只需在.avro文件中写入avro模式。不是数据。
/Kafka日志测试+0+0000000018+0000000020.avro

avro.schema {"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}],"connect.version":1,"connect.name":"myrecord"}

主题有大量数据,但合流hdfs连接器不会将数据移动到hdfs。
我怎样才能解决这个问题?

qybjjes1

qybjjes11#

根据定义,除非消息在偏移量18和20之间被压缩或过期,否则包含该名称的文件 0+0000000018+0000000020 将有来自分区0的2条记录。
你应该使用 tojson 指挥 avro-tools 而不是 getmeta .
或者你可以用spark或pig来读取文件。
您可能还需要验证连接器在启动后是否继续运行,因为设置 hive.metastore.uris=thrift://localhost:9083 在不是配置单元元存储服务器的计算机上,连接任务将失败。uri应该是配置单元的实际主机,就像您对namenode所做的那样。
而且,也不可能得到 .avro 文件扩展名 format.class=io.confluent.connect.hdfs.parquet.ParquetFormat 无论如何,您可能需要验证您正在寻找正确的hdfs路径。注意:将写操作连接到 +tmp 在写入最终输出文件之前,请先临时放置。

相关问题