如何在配置单元表中的kafka主题中快速插入数据?

ev7lccsx  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(292)

我有一个Kafka主题,其中我收到了大约50万事件。
目前,我需要将这些事件插入到配置单元表中。由于事件是时间驱动的,我决定使用以下策略:
1) 在hdfs中定义一个路由,我称之为用户。在这条路线里面,会有几个Parquet文件,每个文件对应一个特定的日期。e、 例如:20180412、20180413、20180414等(格式yyyymmdd)。2) 创建一个配置单元表并使用yyyymmdd格式的日期作为分区。其思想是使用users hdfs目录中的每个文件作为表的分区,只需通过以下命令添加相应的parquet文件:

ALTER TABLE users DROP IF EXISTS PARTITION 
(fecha='20180412') ;
ALTER TABLE users ADD PARTITION
(fecha='20180412') LOCATION '/users/20180412';

3) 通过迭代从最早的事件读取kafka主题中的数据,获取事件中的日期值(在参数dateclient内),并给定该日期值,将该值插入相应的parque文件中。4) 为了完成第3点,我读取每个事件并将其保存在一个临时hdfs文件中,我使用spark从中读取该文件。之后,我使用spark将临时文件内容转换为Dataframe。5) 使用spark,我成功地将dataframe值插入到parquet文件中。
代码遵循以下方法:

val conf = ConfigFactory.parseResources("properties.conf")
val brokersip = conf.getString("enrichment.brokers.value")
val topics_in = conf.getString("enrichment.topics_in.value")
val spark = SparkSession
    .builder()
    .master("yarn")
    .appName("ParaTiUserXY")
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._

val properties = new Properties
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
properties.put("bootstrap.servers", brokersip)
properties.put("auto.offset.reset", "earliest")
properties.put("group.id", "UserXYZ2")

//Schema para transformar los valores del topico de Kafka a JSON
val my_schema = new StructType()
    .add("longitudCliente", StringType)
    .add("latitudCliente", StringType)
    .add("dni", StringType)
    .add("alias", StringType)
    .add("segmentoCliente", StringType)
    .add("timestampCliente", StringType)
    .add("dateCliente", StringType)
    .add("timeCliente", StringType)
    .add("tokenCliente", StringType)
    .add("telefonoCliente", StringType)

val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe( util.Collections.singletonList("geoevents") )

val fs = {
    val conf = new Configuration()
    FileSystem.get(conf)
}

val temp_path:Path = new Path("hdfs:///tmp/tmpstgtopics")
    if( fs.exists(temp_path)){
        fs.delete(temp_path, true)
}

while(true)
{
    val records=consumer.poll(100)
    for (record<-records.asScala){
        val data = record.value.toString
        val dataos: FSDataOutputStream = fs.create(temp_path)
        val bw: BufferedWriter = new BufferedWriter( new OutputStreamWriter(dataos, "UTF-8"))
        bw.append(data)
        bw.close
        val data_schema = spark.read.schema(my_schema).json("hdfs:///tmp/tmpstgtopics")
        val fechaCliente = data_schema.select("dateCliente").first.getString(0)

        if( fechaCliente < date){
            data_schema.select("longitudCliente", "latitudCliente","dni", "alias", 
            "segmentoCliente", "timestampCliente", "dateCliente", "timeCliente", 
            "tokenCliente", "telefonoCliente").coalesce(1).write.mode(SaveMode.Append)
            .parquet("/desa/landing/parati/xyusers/" + fechaCliente)
          }
          else{
              break
          }
        }
    }

  consumer.close()

但是,此方法处理集群中的每条记录大约需要1秒。到目前为止,这将意味着我将需要6天左右的时间来处理所有的事件。
这是将Kafka主题中的所有事件插入配置单元表的最佳方式吗?
还有哪些替代方案,或者我可以对我的代码进行哪些升级以加快速度?

0ejtzxu1

0ejtzxu11#

除了您没有正确使用spark streaming从kafka(您编写了一个带有while循环的vanilla scala kafka消费者)进行投票之外 coalesce(1) 这将永远是一个瓶颈,因为它迫使一个执行者收集记录,我只能说,你真的在重新发明轮子这里。
还有什么其他的选择
我所知道的都是开源的
linkedin的gobblin(取代了加缪)
kafka connect w/hdfs sink connector(内置于confluent平台,但也从github上的源代码构建)
流线型
Apache尼菲
pinterest的秘书
从上面列出的那些消息中,最好使用json或avro编码的kafka消息,而不是一个简单的字符串。这样,您就可以将文件按原样放入配置单元serde中,并且在使用它们时不解析它们。如果您无法编辑生产者代码,则使用原始字符串数据制作一个单独的kafka streams作业,对其进行解析,然后写入avro或json的新主题。
如果选择avro(对于配置单元支持,您确实应该选择avro),那么可以使用合流模式注册表。或者如果你在经营hortonworks,他们也提供类似的注册。
avro上的hive比文本或json运行得好得多。avro可以很容易地转换成Parquet地板,我相信以上每个选项至少提供Parquet地板的支持,而其他也可以做兽人(Kafka连接不做兽人在这个时候)。
以上每一个都支持基于kafka记录时间的某种级别的自动hive分区生成。

cyej8jka

cyej8jka2#

您可以通过增加kafka主题的分区,并让一个或多个使用者组与多个使用者对每个分区进行一对一的消费,来提高并行性。
正如cricket\u007所提到的,您可以使用一个开源框架,也可以让更多的消费群体使用相同的主题来卸载数据。

相关问题