将kafka消息转换为dataframe时,将包作为参数传递时出错。
from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-sql-kafka-0-10_2.11-2.0.2.jar,spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar pyspark-shell'
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "Jim_Topic") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
py4j.protocol.py4jjavaerror:调用o28.load时出错:java.util.serviceconfigurationerror:org.apache.spark.sql.sources.datasourceregister:无法示例化提供程序org.apache.spark.sql.kafka010.kafkasourceprovider
2条答案
按热度按时间ezykj2lf1#
这是因为
spark-sql-kafka
与当前运行的spark版本不匹配。例如,您当前使用的依赖项适用于spark 2.4.1:
要解决这个问题,只需在依赖项字符串(replace)的末尾使用spark的版本
x.y.z
):ioekq8ef2#
用下面的配置定义jar对我很有帮助,