bounty将在22小时后到期。回答此问题可获得+50声望奖励。user3480223正在寻找一个答案从一个有信誉的来源。
我正在使用PySpark中的from_avro函数从Kafka中读取Avro格式的数据,并使用在模式注册表中注册的模式。但是,我遇到了一个问题,模式注册表在批处理期间没有适当地考虑模式更改。因此,当流式传输作业开始时,它始终利用最新的模式,但它没有考虑流式传输作业之间可能发生的任何模式变化。在理想情况下,模式注册表应该考虑前5个字节中指定的模式ID,以确保准确的模式解析。
data_df = (
spark.readStream.format("kafka")
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("kafka.security.protocol", "SSL")
.option("kafka.bootstrap.servers", servers_details)
.option("kafka.ssl.truststore.location", location)
.option("kafka.ssl.truststore.password", pwd)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", 30)
.option("subscribe", name)
.load()
)
transform_df = (
df.withColumn(
"record",
from_avro(
fn.col("value"),
schemaRegistryAddress="http://schema-registry.com",
subject=f"{topic_name}-value",
),
)
.withColumn("schema_id", function_convert(fn.expr("substring(value, 2, 4)")))
.select("schema_id", fn.col("record"))
)
display(transform_df)
字符串
我尝试了from_avro中的选项,但似乎不起作用
transform_df = df.withColumn(
"record",
from_avro(
fn.col("value"),
options={"confluent.value.schema.validation": "true"},
schemaRegistryAddress="http://schema-registry.com",
subject=f"{topic_name}-value",
),
).select(fn.col("record").alias("RECORD_CONTENT"))
型
1条答案
按热度按时间vs3odd8k1#
看起来您正在使用PySpark中的
from_avro
函数从Kafka读取Avro数据,并且在批处理过程中没有考虑模式更改。不幸的是,PySpark中的from_avro
函数并没有直接提供一个选项来使用前5个字节指定模式解析的模式ID。您尝试的confluent.value.schema.validation
选项无法解决此特定场景。但是,您可以通过使用架构注册表和Avro数据前5个字节中的架构ID手动解析Avro架构来解决此问题。这里有一个可能的方法来实现这一点:
字符串
在这种方法中,我们定义了一个UDF(
parse_avro_udf
),它接受Avro数据,使用前5个字节中的模式ID从模式注册表中获取相应的模式,然后使用解析的模式解析Avro数据。fetch_schema_from_registry
函数负责根据模式ID从模式注册表中获取模式。通过使用此UDF,您应该能够在批处理期间处理模式更改,因为它将根据Avro数据中存在的模式ID为每个记录解析正确的模式。