Kafka 如何在Spark结构化流中将消息正确Map到具有`schema`和`payload`的对象?

cl25kdpy  于 2023-04-29  发布在  Apache
关注(0)|答案(1)|浏览(87)

我希望在Spark结构化流期间将消息Map到内部包含schemapayload的对象。
这是我的原始代码

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType)
  .add("voltage", DoubleType)
  .add("temperature", DoubleType)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(to_json(struct("*")).alias("value"))

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

这将在写入Kafka时以这种格式输出消息:

{
  "timestamp": 1682556571.14622,
  "current": 2.0172032595808242,
  "voltage": 19.34080877806074,
  "temperature": 37.461518565900434
}

但是,我希望添加一个字段schema并将其移动到payload,以便稍后可以通过JDBC Sink Connector(如Aiven's JDBC Sink and Source Connectors)接收Postgres。
基于这个文档,我认为我应该使用"decimal"作为每个字段类型。
这就是我希望生成的Kafka消息格式:

{
  "schema":{
    "type": "struct",
    "fields":[
      {
        "type": "decimal",
        "optional": false,
        "field": "timestamp"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "current"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "voltage"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "temperature"
      }
    ]
  },
  "payload":{
    "timestamp": 1682556571.14622,
    "current": 2.0172032595808242,
    "voltage": 19.34080877806074,
    "temperature": 37.461518565900434
  }
}

我尝试更新我的Spark代码

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType, nullable = true)
  .add("voltage", DoubleType, nullable = true)
  .add("temperature", DoubleType, nullable = true)

val output_schema = new StructType()
  .add("timestamp", "decimal")
  .add("current", "decimal", nullable = true)
  .add("voltage", "decimal", nullable = true)
  .add("temperature", "decimal", nullable = true)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(
    to_json(struct("*")).alias("payload")
  )
  .withColumn(
    "schema",
    to_json(struct(
      lit("struct").alias("type"),
      lit(output_schema.fields.map(field => struct(
        lit(field.dataType).alias("type"),
        lit(field.nullable).alias("optional"),
        lit(field.name).alias("field")
      ))).alias("fields")
    ))
  )
  .select(
    to_json(struct(
      col("schema"),
      col("payload")
    )).alias("value")
  )

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

但是当我spark-submit时,我得到了错误

Exception in thread "main" org.apache.spark.SparkRuntimeException: The feature is not supported: literal for 'DecimalType(10,0)' of class org.apache.spark.sql.types.DecimalType.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:296)
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
    at org.apache.spark.sql.functions$.lit(functions.scala:125)
    at com.hongbomiao.IngestFromS3ToKafka$.$anonfun$main$1(IngestFromS3ToKafka.scala:46)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:45)
    at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我感觉StructType导致它返回DecimalType(10,0)。在这种情况下,我不应该使用StructType
我不确定如何在输出消息中准确生成"decimal"。任何指导将不胜感激,谢谢!

dddzy1tm

dddzy1tm1#

我认为我最初的猜测是正确的。StructType是特殊的,不应该在这里使用。使用"decimal"与使用DecimalType(10,0)相同,这就是为什么它显示该错误。

在我的例子中,我应该使用简单的struct
另外,我发现下游Aiven的JDBC sink连接器document可能有问题:

我试过decimalDecimalDECIMAL,运行连接器时都失败了。
由于默认的value.converterorg.apache.kafka.connect.json.JsonConverter,所以我应该在我的案例中基于JsonSchema源代码使用double(或float)。
所以最终的工作版本是:

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType, nullable = true)
  .add("voltage", DoubleType, nullable = true)
  .add("temperature", DoubleType, nullable = true)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(
    struct(
      lit("struct").alias("type"),
      array(
        struct(
          lit("double").alias("type"),
          lit(false).alias("optional"),
          lit("timestamp").alias("field")
        ),
        struct(
          lit("double").alias("type"),
          lit(true).alias("optional"),
          lit("current").alias("field")
        ),
        struct(
          lit("double").alias("type"),
          lit(true).alias("optional"),
          lit("voltage").alias("field")
        ),
        struct(
          lit("double").alias("type"),
          lit(true).alias("optional"),
          lit("temperature").alias("field")
        )
      ).alias("fields")
    ).alias("schema"),
    struct("*").alias("payload")
  )
  .select(to_json(struct("*")).alias("value"))

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

这将打印格式

{
  "schema":{
    "type": "struct",
    "fields":[
      {
        "type": "double",
        "optional": false,
        "field": "timestamp"
      },
      {
        "type": "double",
        "optional": true,
        "field": "current"
      },
      {
        "type": "double",
        "optional": true,
        "field": "voltage"
      },
      {
        "type": "double",
        "optional": true,
        "field": "temperature"
      }
    ]
  },
  "payload":{
    "timestamp": 1682556571.14622,
    "current": 2.0172032595808242,
    "voltage": 19.34080877806074,
    "temperature": 37.461518565900434
  }
}

相关问题