我需要关于Kafka主题的帮助,我想把Parquet格式的hdfs(与每日partitionner)。
我在一个Kafka主题中有很多数据,基本上都是这样的json数据:
{"title":"Die Hard","year":1988,"cast":["Bruce Willis","Alan Rickman","Bonnie Bedelia","William Atherton","Paul Gleason","Reginald VelJohnson","Alexander Godunov"],"genres":["Action"]}
{"title":"Toy Story","year":1995,"cast":["Tim Allen","Tom Hanks","(voices)"],"genres":["Animated"]}
{"title":"Jurassic Park","year":1993,"cast":["Sam Neill","Laura Dern","Jeff Goldblum","Richard Attenborough"],"genres":["Adventure"]}
{"title":"The Lord of the Rings: The Fellowship of the Ring","year":2001,"cast":["Elijah Wood","Ian McKellen","Liv Tyler","Sean Astin","Viggo Mortensen","Orlando Bloom","Sean Bean","Hugo Weaving","Ian Holm"],"genres":["Fantasy »]}
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}
此主题的名称为:test
我想把这些数据以Parquet格式放到我的hdfs集群中。但我很难适应Flume连接器的配置。我使用汇合hdfs接收器连接器。
到目前为止,我做到了以下几点:
{
"name": "hdfs-sink",
"config": {
"name": "hdfs-sink",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "test",
"hdfs.url": "hdfs://hdfs-IP:8020",
"hadoop.home": "/user/test-user/TEST",
"flush.size": "3",
"locale": "fr-fr",
"timezone": "UTC",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner",
"consumer.auto.offset.reset": "earliest",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true"
}
}
关于我为什么这样配置连接器的一些解释:
我每天都有很多这样的数据
最后的目标是每天在我的hdfs中有一个关于这个主题的Parquet文件
我知道可能我必须使用schema注册表来将数据格式化为parquet,但我不知道怎么做。有必要吗?
你能帮我一下吗?
谢谢您
1条答案
按热度按时间jtjikinw1#
我个人没有使用过
ParquetFormat
,但您的数据必须具有架构,这意味着您的数据是使用合流avro序列化程序生成的
您的数据作为protobuf生成,并将protobuf转换器添加到connect worker中
您使用kafkaconnect的特殊json格式,该格式在记录中包含一个模式。
基本上,它不能是“纯json”。i、 e.你目前有
"value.converter.schemas.enable": "true"
,我猜您的连接器不工作,因为您的记录不是上述格式。基本上,如果没有模式,json解析器就不可能知道parquet需要写什么“列”。
而且daily partitioner每天不创建一个文件,只创建一个目录。你将得到一个文件每
flush.size
还有一个配置,用于安排刷新文件的旋转间隔。此外,每个kafka分区将有一个文件。也,
"consumer.auto.offset.reset": "earliest",
只适用于connect-distribtued.properties
文件,不是在每个连接器上,好的。因为我没有亲自用过
ParquetFormat
,这就是我能给出的所有建议,但是我已经使用了其他工具,比如nifi来实现类似的目标,这将允许您不更改现有的kafka生产者代码。或者,使用
JSONFormat
但是,配置单元集成将不会自动工作,并且表必须是预定义的(这将要求您为主题创建一个模式)。另一个选择是配置hive直接从kafka读取