我想把kafka日志文件移到hadoop日志文件。所以我做了如下hdfs连接器配置
/快速启动-hdfs.properties
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=kafka_log_test
hdfs.url=hdfs://10.100.216.60:9000
flush.size=100000
hive.integration=true
hive.metastore.uris=thrift://localhost:9083
schema.compatibility=BACKWARD
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
/connect-avro-standalone.properties连接
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
当我运行hdfs连接器时,只需在.avro文件中写入avro模式。不是数据。
/Kafka日志测试+0+0000000018+0000000020.avro
avro.schema {"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}],"connect.version":1,"connect.name":"myrecord"}
主题有大量数据,但合流hdfs连接器不会将数据移动到hdfs。
我怎样才能解决这个问题?
1条答案
按热度按时间qybjjes11#
根据定义,除非消息在偏移量18和20之间被压缩或过期,否则包含该名称的文件
0+0000000018+0000000020
将有来自分区0的2条记录。你应该使用
tojson
指挥avro-tools
而不是getmeta
.或者你可以用spark或pig来读取文件。
您可能还需要验证连接器在启动后是否继续运行,因为设置
hive.metastore.uris=thrift://localhost:9083
在不是配置单元元存储服务器的计算机上,连接任务将失败。uri应该是配置单元的实际主机,就像您对namenode所做的那样。而且,也不可能得到
.avro
文件扩展名format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
无论如何,您可能需要验证您正在寻找正确的hdfs路径。注意:将写操作连接到+tmp
在写入最终输出文件之前,请先临时放置。