spark作业读取Dataframe中已排序的avro文件,但未按顺序写入kafka

wlwcrazw  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(318)

我用id对avro文件进行排序,每个id都有一个名为“id=234”的文件夹,文件夹中的数据是avro格式的,并根据日期进行排序。我正在运行spark作业,它获取输入路径并读取Dataframe中的avro。这个Dataframe然后用5个分区写入kafka主题。

val properties: Properties = getProperties(args)

val spark = SparkSession.builder().master(properties.getProperty("master"))
  .appName(properties.getProperty("appName")).getOrCreate()
val sqlContext = spark.sqlContext

val sourcePath = properties.getProperty("sourcePath")

val dataDF = sqlContext.read.avro(sourcePath).as("data")
val count = dataDF.count();
val schemaRegAdd = properties.getProperty("schemaRegistry")

val schemaRegistryConfs = Map(
  SchemaManager.PARAM_SCHEMA_REGISTRY_URL          -> schemaRegAdd,
  SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME
)
val start = Instant.now

dataDF.select(functions.struct(properties.getProperty("message.key.name")).alias("key"), functions.struct("*").alias("value"))
  .toConfluentAvroWithPlainKey(properties.getProperty("topic"), properties.getProperty("schemaName"),
  properties.getProperty("schemaNamespace"))(schemaRegistryConfs)
  .write.format("kafka")
  .option("kafka.bootstrap.servers",properties.getProperty("kafka.brokers"))
  .option("topic",properties.getProperty("topic")).save()

}
我的用例是按顺序写入来自每个id(按日期排序)的所有消息,例如来自一个id 1的所有排序数据应该首先添加,然后从id 2添加,依此类推。Kafka消息的密钥id为。

dz6r00yl

dz6r00yl1#

不要忘了,当您进行转换时,rdd/数据集中的数据是无序的,因此您会丢失顺序。
实现这一点的最佳方法是逐个读取文件并将其发送给kafka,而不是读取文件中的完整目录 val sourcePath = properties.getProperty("sourcePath")

相关问题