如何使用pyspark中的abris库给Kafka写信?

yr9zkbsy  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(184)

有人能用这个库用pyspark给Kafka写信吗?
我已经能够使用自述文档中的代码成功地阅读:

import logging, traceback
import requests
from pyspark.sql import Column
from pyspark.sql.column import *

jvm_gateway = spark_context._gateway.jvm
abris_avro  = jvm_gateway.za.co.absa.abris.avro
naming_strategy = getattr(getattr(abris_avro.read.confluent.SchemaManager, "SchemaStorageNamingStrategies$"), "MODULE$").TOPIC_NAME()        

schema_registry_config_dict = {"schema.registry.url": schema_registry_url,
                               "schema.registry.topic": topic,
                               "value.schema.id": "latest",
                               "value.schema.naming.strategy": naming_strategy}

conf_map = getattr(getattr(jvm_gateway.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$")
    for k, v in schema_registry_config_dict.items():
        conf_map = getattr(conf_map, "$plus")(jvm_gateway.scala.Tuple2(k, v))

    deserialized_df = data_frame.select(Column(abris_avro.functions.from_confluent_avro(data_frame._jdf.col("value"), conf_map))
                      .alias("data")).select("data.*")

然而,我正在努力通过写主题来扩展这种行为 to_confluent_avro 功能。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题