pyspark impala jdbc驱动程序不支持此可选功能

uqjltbpv  于 2021-06-26  发布在  Impala
关注(0)|答案(0)|浏览(238)

我正在使用pyspark的Spark流。我能够流式传输和创建Dataframe正确没有问题。我还能够将数据插入到impala表中,该表是由kafka消息中的所有列(72)中仅有的几个(5)个采样列创建的。但是,当我用适当的数据类型和列创建一个新的a表时,类似地,dataframe现在拥有kafka流消息中提到的所有列。我得到以下例外。
java.sql.sqlfeaturenotsupportedexception:[cloudera]jdbc驱动程序不支持此可选功能。位于com.cloudera.impala.exceptions.exceptionconverter.tosqlexception(未知源),位于com.cloudera.impala.jdbc.common.spreparedstatement.checktypesupported(未知源),位于com.cloudera.impala.jdbc.common.spreparedstatement.setnull(未知源),位于org.apache.spark.sql.execution.datasources.jdbc.jdbcutils$.savepartition(jdbcutils)。scala:627) 位于org.apache.spark.sql.execution.datasources.jdbc.jdbcutils$$anonfun$savetable$1.apply(jdbcutils)。scala:782)位于org.apache.spark.sql.execution.datasources.jdbc.jdbcutils$$anonfun$savetable$1.apply(jdbcutils)。scala:782)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$29.apply(rdd。scala:926)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$29.apply(rdd)。scala:926)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext。scala:2064)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext。scala:2064)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:87)在org.apache.spark.scheduler.task.run(任务。scala:108)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:338)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)
我在这方面找了很多,但找不到任何解决办法。我也启用了调试日志,但是它不会提到驱动程序不支持什么特性。任何帮助或适当的指导将不胜感激。谢谢您

版本详细信息:

pyspark:2.2.0 kafka:0.10.2 cloudera:5.15.0 cloudera impala:2.12.0-cdh5.15.0 cloudera impala jdbc驱动程序:2.6.4
我使用的代码:

import json
from pyspark import SparkContext,SparkConf,HiveContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession,Row
from pyspark.sql.functions import lit
from pyspark.sql.types import *

conf = SparkConf().setAppName("testkafkarecvstream")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
spark = SparkSession.builder.appName("testkafkarecvstream").getOrCreate()
jdbcUrl = "jdbc:impala://hostname:21050/dbName;AuthMech=0;"

fields = [
                 StructField("column_name01", StringType(), True),
                 StructField("column_name02", StringType(), True),
                 StructField("column_name03", DoubleType(), True),
                 StructField("column_name04", StringType(), True),
                 StructField("column_name05", IntegerType(), True),
                 StructField("column_name06", StringType(), True),
                  .....................
                 StructField("column_name72", StringType(), True),
]

schema = StructType(fields)

def make_rows(parts):
    customRow = Row(column_name01=datatype(parts['column_name01']),
                              .....,
                              column_name72=datatype(parts['column_name72'])
                           )
    return customRow

def createDFToParquet(rdd):
    try:
        df = spark.createDataFrame(rdd,schema)
        df.show()df.write.jdbc(jdbcUrl,
                            table="table_name",
                            mode="append",)
    except Exception as e:
        print str(e)

zkNode = "zkNode_name:2181"
topic = "topic_name"

# Reciever method

kvs = KafkaUtils.createStream(ssc,
                              zkNode,
                              "consumer-group-id",
                              {topic:5},
                              {"auto.offset.reset" : "smallest"})

lines = kvs.map(lambda x: x[1])
conv = lines.map(lambda x: json.loads(x))
table = conv.map(makeRows)
table.foreachRDD(createDFToParquet)

table.pprint()

ssc.start()
ssc.awaitTermination()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题