我希望在Spark结构化流期间将消息Map到内部包含schema
和payload
的对象。
这是我的原始代码
val input_schema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType)
.add("voltage", DoubleType)
.add("temperature", DoubleType)
val df = spark.readStream
.schema(input_schema)
.option("maxFilesPerTrigger", 1)
.parquet("s3a://my-bucket/my-folder/")
.select(to_json(struct("*")).alias("value"))
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "my-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
这将在写入Kafka时以这种格式输出消息:
{
"timestamp": 1682556571.14622,
"current": 2.0172032595808242,
"voltage": 19.34080877806074,
"temperature": 37.461518565900434
}
但是,我希望添加一个字段schema
并将其移动到payload
,以便稍后可以通过JDBC Sink Connector(如Aiven's JDBC Sink and Source Connectors)接收Postgres。
基于这个文档,我认为我应该使用"decimal"
作为每个字段类型。
这就是我希望生成的Kafka消息格式:
{
"schema":{
"type": "struct",
"fields":[
{
"type": "decimal",
"optional": false,
"field": "timestamp"
},
{
"type": "decimal",
"optional": true,
"field": "current"
},
{
"type": "decimal",
"optional": true,
"field": "voltage"
},
{
"type": "decimal",
"optional": true,
"field": "temperature"
}
]
},
"payload":{
"timestamp": 1682556571.14622,
"current": 2.0172032595808242,
"voltage": 19.34080877806074,
"temperature": 37.461518565900434
}
}
我尝试更新我的Spark代码
val input_schema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType, nullable = true)
.add("voltage", DoubleType, nullable = true)
.add("temperature", DoubleType, nullable = true)
val output_schema = new StructType()
.add("timestamp", "decimal")
.add("current", "decimal", nullable = true)
.add("voltage", "decimal", nullable = true)
.add("temperature", "decimal", nullable = true)
val df = spark.readStream
.schema(input_schema)
.option("maxFilesPerTrigger", 1)
.parquet("s3a://my-bucket/my-folder/")
.select(
to_json(struct("*")).alias("payload")
)
.withColumn(
"schema",
to_json(struct(
lit("struct").alias("type"),
lit(output_schema.fields.map(field => struct(
lit(field.dataType).alias("type"),
lit(field.nullable).alias("optional"),
lit(field.name).alias("field")
))).alias("fields")
))
)
.select(
to_json(struct(
col("schema"),
col("payload")
)).alias("value")
)
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "my-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
但是当我spark-submit
时,我得到了错误
Exception in thread "main" org.apache.spark.SparkRuntimeException: The feature is not supported: literal for 'DecimalType(10,0)' of class org.apache.spark.sql.types.DecimalType.
at org.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:296)
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
at org.apache.spark.sql.functions$.lit(functions.scala:125)
at com.hongbomiao.IngestFromS3ToKafka$.$anonfun$main$1(IngestFromS3ToKafka.scala:46)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:45)
at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我感觉StructType
导致它返回DecimalType(10,0)
。在这种情况下,我不应该使用StructType
。
我不确定如何在输出消息中准确生成"decimal"
。任何指导将不胜感激,谢谢!
1条答案
按热度按时间dddzy1tm1#
我认为我最初的猜测是正确的。
StructType
是特殊的,不应该在这里使用。使用"decimal"
与使用DecimalType(10,0)
相同,这就是为什么它显示该错误。StructType
documentDecimalType
document在我的例子中,我应该使用简单的
struct
。另外,我发现下游Aiven的JDBC sink连接器document可能有问题:
我试过
decimal
,Decimal
,DECIMAL
,运行连接器时都失败了。由于默认的
value.converter
是org.apache.kafka.connect.json.JsonConverter
,所以我应该在我的案例中基于JsonSchema源代码使用double
(或float
)。所以最终的工作版本是:
这将打印格式