pyspark 无法使用架构注册表获取正确的架构

icnyk63a  于 2023-08-02  发布在  Spark
关注(0)|答案(1)|浏览(150)

bounty将在22小时后到期。回答此问题可获得+50声望奖励。user3480223正在寻找一个答案从一个有信誉的来源

我正在使用PySpark中的from_avro函数从Kafka中读取Avro格式的数据,并使用在模式注册表中注册的模式。但是,我遇到了一个问题,模式注册表在批处理期间没有适当地考虑模式更改。因此,当流式传输作业开始时,它始终利用最新的模式,但它没有考虑流式传输作业之间可能发生的任何模式变化。在理想情况下,模式注册表应该考虑前5个字节中指定的模式ID,以确保准确的模式解析。

data_df = (
    spark.readStream.format("kafka")
    .option("kafka.ssl.endpoint.identification.algorithm", "")
    .option("kafka.security.protocol", "SSL")
    .option("kafka.bootstrap.servers", servers_details)
    .option("kafka.ssl.truststore.location", location)
    .option("kafka.ssl.truststore.password", pwd)
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .option("maxOffsetsPerTrigger", 30)
    .option("subscribe", name)
    .load()
)

transform_df = (
    df.withColumn(
        "record",
        from_avro(
            fn.col("value"),
            schemaRegistryAddress="http://schema-registry.com",
            subject=f"{topic_name}-value",
        ),
    )
    .withColumn("schema_id", function_convert(fn.expr("substring(value, 2, 4)")))
    .select("schema_id", fn.col("record"))
)
display(transform_df)

字符串
我尝试了from_avro中的选项,但似乎不起作用

transform_df = df.withColumn(
    "record",
    from_avro(
        fn.col("value"),
        options={"confluent.value.schema.validation": "true"},
        schemaRegistryAddress="http://schema-registry.com",
        subject=f"{topic_name}-value",
    ),
).select(fn.col("record").alias("RECORD_CONTENT"))

vs3odd8k

vs3odd8k1#

看起来您正在使用PySpark中的from_avro函数从Kafka读取Avro数据,并且在批处理过程中没有考虑模式更改。不幸的是,PySpark中的from_avro函数并没有直接提供一个选项来使用前5个字节指定模式解析的模式ID。您尝试的confluent.value.schema.validation选项无法解决此特定场景。
但是,您可以通过使用架构注册表和Avro数据前5个字节中的架构ID手动解析Avro架构来解决此问题。这里有一个可能的方法来实现这一点:

import io
import requests
from avro.schema import AvroSchema
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Function to fetch Avro schema from schema registry based on schema ID
def fetch_schema_from_registry(schema_registry_url, subject, schema_id):
    url = f"{schema_registry_url}/subjects/{subject}/versions/{schema_id}"
    response = requests.get(url)
    response.raise_for_status()
    return response.json()["schema"]

# Function to parse Avro data using the schema from the schema registry
def parse_avro_data(avro_data, schema_registry_url, subject):
    schema_id = int.from_bytes(avro_data[:5], "big")
    avro_schema_str = fetch_schema_from_registry(schema_registry_url, subject, schema_id)
    avro_schema = AvroSchema(avro_schema_str)
    return avro_schema.decode(io.BytesIO(avro_data[5:]))

# UDF to parse Avro data and apply schema resolution
parse_avro_udf = fn.udf(lambda value: parse_avro_data(value, "http://schema-registry.com", f"{topic_name}-value"))

data_df = (
    spark.readStream.format("kafka")
    # ... other Kafka options ...
    .load()
)

transform_df = data_df.withColumn("record", parse_avro_udf(fn.col("value")))

# Continue with your processing on `transform_df` as needed

字符串
在这种方法中,我们定义了一个UDF(parse_avro_udf),它接受Avro数据,使用前5个字节中的模式ID从模式注册表中获取相应的模式,然后使用解析的模式解析Avro数据。fetch_schema_from_registry函数负责根据模式ID从模式注册表中获取模式。
通过使用此UDF,您应该能够在批处理期间处理模式更改,因为它将根据Avro数据中存在的模式ID为每个记录解析正确的模式。

相关问题