持久的spark流输出

vx6bjr1n  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(402)

我正在从一个消息应用程序收集数据,我目前正在使用flume,它每天发送大约5000万条记录
我希望使用kafka,使用spark streaming从kafka消费,并将其持久化到hadoop和impala查询
我尝试过的每种方法都有问题。。
方法1-将rdd另存为parquet,将外部配置单元parquet表指向parquet目录

// scala
val ssc =  new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {

    // 1 - Create a SchemaRDD object from the rdd and specify the schema
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)

    // 2 - register it as a spark sql table
    SchemaRDD1.registerTempTable("sparktable")

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
    val finalParquet = sqlContext.sql(sql)
    finalParquet.saveAsParquetFile(dir)

问题是finalparquet.saveasparquetfile输出大量文件,从kafka接收的数据流以1分钟的批量大小输出200多个文件。它输出许多文件的原因是因为计算是分布式的,正如另一篇文章中所解释的-如何使saveastextfile不将输出拆分为多个文件?
然而,建议的解决方案似乎并不适合我,例如,正如一个用户所说,只有一个输出文件是一个好主意,如果你有很少的数据。
方法2-使用 HiveContext . 将rdd数据直接插入配置单元表


# python

sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)

def sendRecord(rdd):

  sql = "INSERT INTO TABLE table select * from beacon_sparktable"

  # 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
  beaconDF = sqlContext.jsonRDD(rdd,schema)

  # 2- Register the DataFrame as a spark sql table.
  beaconDF.registerTempTable("beacon_sparktable")

  # 3 - insert to hive directly from a qry on the spark sql table
  sqlContext.sql(sql);

这工作得很好,它直接插入到Parquet地板表中,但是由于处理时间超过了批间隔时间,因此批的调度会出现延迟。消费者跟不上正在生产的产品,要加工的批次开始排队。
向Hive写信似乎很慢。我尝试过调整批处理间隔大小,运行更多的使用者示例。

总而言之

如果存在多个文件的问题以及写入配置单元的潜在延迟,那么保存spark流媒体中的大数据的最佳方法是什么?其他人在干什么?
这里也有人问过类似的问题,但他有一个问题,目录中有太多的文件,如何让spark流写入其输出,以便 Impala 可以读取它?
非常感谢你的帮助

8tntrjer

8tntrjer1#

我想小文件的问题可以解决一些。您可能会得到大量基于kafka分区的文件。对我来说,我有12个分区的Kafka主题,我用 spark.write.mode("append").parquet("/location/on/hdfs") .
现在,根据您的要求,您可以添加 coalesce(1) 或更多以减少文件数量。另一个选择是增加微批量持续时间。例如,如果你可以接受每天5分钟的写作延迟,你可以有300秒的微批。
对于第二个问题,批处理排队只是因为没有启用背压。首先,您应该验证在单个批处理中可以处理的最大值是多少。一旦你能绕过那个数字,你就可以设定 spark.streaming.kafka.maxRatePerPartition 价值与 spark.streaming.backpressure.enabled=true 为每个微批启用有限数量的记录。如果您仍然不能满足需求,那么唯一的选择就是在主题上增加分区,或者在spark应用程序上增加资源。

unftdfkk

unftdfkk2#

在解决方案2中,可以通过每个rdd的分区数来控制创建的文件数。
请参见此示例:

// create a Hive table (assume it's already existing)
sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET")

// create a RDD with 2 records and only 1 partition
val rdd = sc.parallelize(List( List(1, "hello"), List(2, "world") ), 1)

// create a DataFrame from the RDD
val schema = StructType(Seq(
 StructField("id", IntegerType, nullable = false),
 StructField("txt", StringType, nullable = false)
))
val df = sqlContext.createDataFrame(rdd.map( Row(_:_*) ), schema)

// this creates a single file, because the RDD has 1 partition
df.write.mode("append").saveAsTable("test")

现在,我想您可以使用从kafka提取数据的频率,以及每个rdd的分区数(默认情况下,kafka主题的分区,您可以通过重新分区来减少)。
我使用的是cdh5.5.1中的spark1.5,我使用其中一个得到了相同的结果 df.write.mode("append").saveAsTable("test") 或sql字符串。

相关问题