kafka主题数据到hdfsParquet文件使用hdfs接收器连接器配置问题

fykwrbwg  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(340)

我需要关于Kafka主题的帮助,我想把Parquet格式的hdfs(与每日partitionner)。
我在一个Kafka主题中有很多数据,基本上都是这样的json数据:

{"title":"Die Hard","year":1988,"cast":["Bruce Willis","Alan Rickman","Bonnie Bedelia","William Atherton","Paul Gleason","Reginald VelJohnson","Alexander Godunov"],"genres":["Action"]}
{"title":"Toy Story","year":1995,"cast":["Tim Allen","Tom Hanks","(voices)"],"genres":["Animated"]}
{"title":"Jurassic Park","year":1993,"cast":["Sam Neill","Laura Dern","Jeff Goldblum","Richard Attenborough"],"genres":["Adventure"]}
{"title":"The Lord of the Rings: The Fellowship of the Ring","year":2001,"cast":["Elijah Wood","Ian McKellen","Liv Tyler","Sean Astin","Viggo Mortensen","Orlando Bloom","Sean Bean","Hugo Weaving","Ian Holm"],"genres":["Fantasy »]}
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}

此主题的名称为:test
我想把这些数据以Parquet格式放到我的hdfs集群中。但我很难适应Flume连接器的配置。我使用汇合hdfs接收器连接器。
到目前为止,我做到了以下几点:

{
  "name": "hdfs-sink",
  "config": {
    "name": "hdfs-sink",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "test",
    "hdfs.url": "hdfs://hdfs-IP:8020",
    "hadoop.home": "/user/test-user/TEST",
    "flush.size": "3",
    "locale": "fr-fr",
    "timezone": "UTC",
    "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner",
    "consumer.auto.offset.reset": "earliest",
    "value.converter":  "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true"

  }
}

关于我为什么这样配置连接器的一些解释:
我每天都有很多这样的数据
最后的目标是每天在我的hdfs中有一个关于这个主题的Parquet文件
我知道可能我必须使用schema注册表来将数据格式化为parquet,但我不知道怎么做。有必要吗?
你能帮我一下吗?
谢谢您

jtjikinw

jtjikinw1#

我个人没有使用过 ParquetFormat ,但您的数据必须具有架构,这意味着
您的数据是使用合流avro序列化程序生成的
您的数据作为protobuf生成,并将protobuf转换器添加到connect worker中
您使用kafkaconnect的特殊json格式,该格式在记录中包含一个模式。
基本上,它不能是“纯json”。i、 e.你目前有 "value.converter.schemas.enable": "true" ,我猜您的连接器不工作,因为您的记录不是上述格式。
基本上,如果没有模式,json解析器就不可能知道parquet需要写什么“列”。
而且daily partitioner每天不创建一个文件,只创建一个目录。你将得到一个文件每 flush.size 还有一个配置,用于安排刷新文件的旋转间隔。此外,每个kafka分区将有一个文件。
也, "consumer.auto.offset.reset": "earliest", 只适用于 connect-distribtued.properties 文件,不是在每个连接器上,好的。
因为我没有亲自用过 ParquetFormat ,这就是我能给出的所有建议,但是我已经使用了其他工具,比如nifi来实现类似的目标,这将允许您不更改现有的kafka生产者代码。
或者,使用 JSONFormat 但是,配置单元集成将不会自动工作,并且表必须是预定义的(这将要求您为主题创建一个模式)。
另一个选择是配置hive直接从kafka读取

相关问题