我正在使用pyspark的Spark流。我能够流式传输和创建Dataframe正确没有问题。我还能够将数据插入到impala表中,该表是由kafka消息中的所有列(72)中仅有的几个(5)个采样列创建的。但是,当我用适当的数据类型和列创建一个新的a表时,类似地,dataframe现在拥有kafka流消息中提到的所有列。我得到以下例外。
java.sql.sqlfeaturenotsupportedexception:[cloudera]jdbc驱动程序不支持此可选功能。位于com.cloudera.impala.exceptions.exceptionconverter.tosqlexception(未知源),位于com.cloudera.impala.jdbc.common.spreparedstatement.checktypesupported(未知源),位于com.cloudera.impala.jdbc.common.spreparedstatement.setnull(未知源),位于org.apache.spark.sql.execution.datasources.jdbc.jdbcutils$.savepartition(jdbcutils)。scala:627) 位于org.apache.spark.sql.execution.datasources.jdbc.jdbcutils$$anonfun$savetable$1.apply(jdbcutils)。scala:782)位于org.apache.spark.sql.execution.datasources.jdbc.jdbcutils$$anonfun$savetable$1.apply(jdbcutils)。scala:782)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$29.apply(rdd。scala:926)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$29.apply(rdd)。scala:926)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext。scala:2064)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext。scala:2064)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:87)在org.apache.spark.scheduler.task.run(任务。scala:108)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:338)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)
我在这方面找了很多,但找不到任何解决办法。我也启用了调试日志,但是它不会提到驱动程序不支持什么特性。任何帮助或适当的指导将不胜感激。谢谢您
版本详细信息:
pyspark:2.2.0 kafka:0.10.2 cloudera:5.15.0 cloudera impala:2.12.0-cdh5.15.0 cloudera impala jdbc驱动程序:2.6.4
我使用的代码:
import json
from pyspark import SparkContext,SparkConf,HiveContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession,Row
from pyspark.sql.functions import lit
from pyspark.sql.types import *
conf = SparkConf().setAppName("testkafkarecvstream")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
spark = SparkSession.builder.appName("testkafkarecvstream").getOrCreate()
jdbcUrl = "jdbc:impala://hostname:21050/dbName;AuthMech=0;"
fields = [
StructField("column_name01", StringType(), True),
StructField("column_name02", StringType(), True),
StructField("column_name03", DoubleType(), True),
StructField("column_name04", StringType(), True),
StructField("column_name05", IntegerType(), True),
StructField("column_name06", StringType(), True),
.....................
StructField("column_name72", StringType(), True),
]
schema = StructType(fields)
def make_rows(parts):
customRow = Row(column_name01=datatype(parts['column_name01']),
.....,
column_name72=datatype(parts['column_name72'])
)
return customRow
def createDFToParquet(rdd):
try:
df = spark.createDataFrame(rdd,schema)
df.show()df.write.jdbc(jdbcUrl,
table="table_name",
mode="append",)
except Exception as e:
print str(e)
zkNode = "zkNode_name:2181"
topic = "topic_name"
# Reciever method
kvs = KafkaUtils.createStream(ssc,
zkNode,
"consumer-group-id",
{topic:5},
{"auto.offset.reset" : "smallest"})
lines = kvs.map(lambda x: x[1])
conv = lines.map(lambda x: json.loads(x))
table = conv.map(makeRows)
table.foreachRDD(createDFToParquet)
table.pprint()
ssc.start()
ssc.awaitTermination()
暂无答案!
目前还没有任何答案,快来回答吧!