提供的maven坐标必须为'groupid:artifactid:Pypark和kafka版本

dgiusagp  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(563)

将kafka消息转换为dataframe时,将包作为参数传递时出错。

from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0.jar: org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:jar:2.1.1 pyspark-shell'pyspark-shell'

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "Jim_Topic") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

错误××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××××

::          UNRESOLVED DEPENDENCIES         ::

    ::::::::::::::::::::::::::::::::::::::::::::::

    :: org.apache.spark#spark-sql-kafka-0-10_2.11;2.2.0.jar: not found

    ::::::::::::::::::::::::::::::::::::::::::::::
ef1yzkbh

ef1yzkbh1#

例外情况表明,您的一个依赖项中有一个输入错误。

org.apache.spark:spark-sql-kafka-0-10_2.12:jar:

缺少版本(并且它还有一个不必要的 : ). 下面应该可以做到这一点:

org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

完全依赖关系将变为:

'--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.1.1'

相关问题