我有一个简单的Spark应用程序,通过以下方式生成Kafka消息:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.avro.functions.to_avro
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
object IngestFromS3ToKafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ingest-from-s3-to-kafka")
.config("spark.ui.port", "4040")
.getOrCreate()
val folderPath = "s3a://hongbomiao-bucket/iot/"
val parquet_schema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType, nullable = true)
.add("voltage", DoubleType, nullable = true)
.add("temperature", DoubleType, nullable = true)
val df = spark.readStream
.schema(parquet_schema)
.option("maxFilesPerTrigger", 1)
.parquet(folderPath)
.withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
.select(to_avro(struct("*")).alias("value"))
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "hm.motor")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
query.awaitTermination()
}
}
我在Apicurio Registry中有一个Avro架构,由
curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2/groups/hm-group/artifacts' \
--header 'Content-type: application/json; artifactType=AVRO' \
--header 'X-Registry-ArtifactId: hm-iot' \
--data '{
"type": "record",
"namespace": "com.hongbomiao",
"name": "hm.motor",
"fields": [
{
"name": "timestamp",
"type": "long"
},
{
"name": "current",
"type": "double"
},
{
"name": "voltage",
"type": "double"
},
{
"name": "temperature",
"type": "double"
}
]
}'
我正在尝试使用Apicurio Registry的Confluent兼容REST API端点。当前使用内容ID 26检索
curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6/schemas/ids/26' \
--header 'Content-type: application/json; artifactType=AVRO' \
--header 'X-Registry-ArtifactId: hm-iot'
哪个打印
{
"schema": "{\n \"type\": \"record\",\n \"namespace\": \"com.hongbomiao\",\n \"name\": \"hm.motor\",\n \"fields\": [\n {\n \"name\": \"timestamp\",\n \"type\": \"long\"\n },\n {\n \"name\": \"current\",\n \"type\": \"double\"\n },\n {\n \"name\": \"voltage\",\n \"type\": \"double\"\n },\n {\n \"name\": \"temperature\",\n \"type\": \"double\"\n }\n ]\n}",
"references": []
}
看起来不错
基于Aiven的JDBC连接器文档,我编写了JDBC sink连接器配置:
{
"name": "hm-motor-jdbc-sink-kafka-connector",
"config": {
"connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"topics": "hm.motor",
"connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
"connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
"connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",
"insert.mode": "upsert",
"table.name.format": "motor",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6",
"transforms": "convertTimestamp",
"transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTimestamp.field": "timestamp",
"transforms.convertTimestamp.target.type": "Timestamp"
}
}
然而,我在我的Kafka Connect日志中得到了这个错误
2023-05-01 19:01:11,291 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] WorkerSinkTask{id=hm-motor-jdbc-sink-kafka-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-hm-motor-jdbc-sink-kafka-connector-0]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:518)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hm.motor to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:518)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -1330532454
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:253)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:372)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:203)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
... 18 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: No content with id/hash 'contentId--1330532454' was found.; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:853)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:826)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:311)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:433)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:361)
... 21 more
它试图获取内容ID -1330532454
,但显然我没有这个。我的是26
。JDBC如何查找相应的AVRO模式?
我不知道它现在是如何Map的。我以为它会基于Kafka主题寻找一个名为hm.motor
的模式,但结果不是。
谢谢!
更新1
谢谢@Ftisiot!
我找到了关于Kafka序列化器和反序列化器的文档。
Kafka序列化器和反序列化器在注册或检索模式时默认使用<topicName>-key
和<topicName>-value
作为相应的主题名称。
同样对于value.converter.value.subject.name.strategy
,默认情况下使用io.confluent.kafka.serializers.subject.TopicNameStrategy
。
我已经更新了我的Avro架构名称为hm.motor-value
,但仍然得到相同的错误。
3条答案
按热度按时间pwuypxnk1#
我相信默认的模式名应该是主题名和
-value
或-key
的连接,这取决于您正在解码的msg的部分。因此,在您的例子中,我将尝试使用模式名
hm.motor-value
。在this video中,当使用flink从json编码到avro时,您可以检查自动生成的模式名称。
免责声明:我为艾文工作,我们应该更新文档以反映这一点
4dbbbstv2#
先别说什么连接。你应该先用
kafka-avro-console-consumer
调试你的主题。你会得到相同的错误,因为你的生产者需要正确编码的数据。Spark的
to_avro
没有这样做。请参阅此库的
toConfluentAvro
函数-https://github.com/AbsaOSS/ABRiS关于内部结构的更多详细信息https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
关于模式问题,
name
引用完全限定的Java类名,如Avro规范所定义,并且在使用TopicNameStategy时与Registry主题没有关系这个主题名是什么
它是API调用
POST /subjects/:name/versions/
中的路径参数,由Serializer和Deserializer内部HTTP客户端使用。之前也提到过,Kafka Connect在这里是不必要的。Spark可以直接写入JDBC数据库。数据源可以是Parquet***或Kafka***。
t2a7ltrp3#
谢谢大家的帮助,我终于想通了!我会总结一下我学到的东西。
1.生成Avro格式的Kafka消息
实际上有两种主要类型的Avro数据:
1.1 [成功]在Spark中生成“标准”/“vanilla”Apache Avro数据
首先,我通过以下方式生成了Varo模式
在Spark中,使用原生
org.apache.spark.sql.avro.functions.to_avro
非常简单。构建.sbt
我从this article得到了很多想法。
1.2在Spark中生成融合Avro数据
Confluent Avro不是导致some inconvenience for Spark和其他工具的“标准”/“香草”Avro。
有一个库ABRiS可以帮助生成Confluent Avro格式的Kafka消息(
toConfluentAvro
)。然而,ABRiS的
sbt assembly
是痛苦的,因为我不得不处理assemblyMergeStrategy。🥲(我没有再往这个方向走了)
2.在JDBC Kafka Connector中阅读Avro格式的Kafka消息并sink到数据库
2.1 [成功]“Standard”/“vanilla”Apache Avro中的Kafka消息
非常简单,只需使用
io.apicurio.registry.utils.converter.AvroConverter
。我的JDBC连接器配置:
也许将来我可以找到摆脱
value.converter.apicurio.registry.fallback
相关字段的方法。关于
io.apicurio.registry.utils.converter.AvroConverter
的更多信息可以在这里找到。2.2 Confluent Avro中的Kafka消息
2.2.1使用
io.confluent.connect.avro.AvroConverter
和Apicurio注册表这里我们使用Apicurio Registry的Confluent兼容REST API:
(我没有再往这个方向走了)
2.2.2在Confluent Schema Registry中使用
io.apicurio.registry.utils.converter.AvroConverter
这里我们使用Confluent Registry REST API:
(我没有再往这个方向走了)