将kafka消息转换为dataframe时,将包作为参数传递时出错。
from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0.jar: org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:jar:2.1.1 pyspark-shell'pyspark-shell'
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "Jim_Topic") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
错误××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××
:: UNRESOLVED DEPENDENCIES ::
::::::::::::::::::::::::::::::::::::::::::::::
:: org.apache.spark#spark-sql-kafka-0-10_2.11;2.2.0.jar: not found
::::::::::::::::::::::::::::::::::::::::::::::
1条答案
按热度按时间ef1yzkbh1#
例外情况表明,您的一个依赖项中有一个输入错误。
缺少版本(并且它还有一个不必要的
:
). 下面应该可以做到这一点:完全依赖关系将变为: