如何在aws glue中使用snowflake jdbc连接驱动程序运行pyspark

ztmd8pv5  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(589)
I am trying to run the below code in AWS glue:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from py4j.java_gateway import java_import
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

## @params: [JOB_NAME, URL, ACCOUNT, WAREHOUSE, DB, SCHEMA, USERNAME, PASSWORD]

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'URL', 'ACCOUNT', 'WAREHOUSE', 'DB', 'SCHEMA', 'USERNAME', 'PASSWORD'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
java_import(spark._jvm, "net.snowflake.spark.snowflake")

## uj = sc._jvm.net.snowflake.spark.snowflake

spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

options = {
"sfURL" : args['URL'],
"sfAccount" : args['ACCOUNT'],
"sfUser" : args['USERNAME'],
"sfPassword" : args['PASSWORD'],
"sfDatabase" : args['DB'],
"sfSchema" : args['SCHEMA'],
"sfWarehouse" : args['WAREHOUSE'],
}

df = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "STORE") \
  .load()

display(df)

## Perform any kind of transformations on your data and save as a new Data Frame: “df1”

## df1 = [Insert any filter, transformation, etc]

## Write the Data Frame contents back to Snowflake in a new table

## df1.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "[new_table_name]").mode("overwrite").save()

job.commit()

并得到一个错误。

Traceback (most recent call last): File "/tmp/spark_snowflake", line 35, in <module> 
.option("dbtable", "STORE") \ File 
"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load return 
self._df(self._jreader.load()) File "/opt/amazon/spark/python/lib/py4j-0.10.7-

src.zip/py4j/java\u gateway.py”,第1257行,在call answer,self.gateway\u client,self.target\u id,self.name)file“/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”,第63行,在deco return f(*a,**kw)file“/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py”,第328行,get\u return value格式(target\u id,“.”,name),value)py4j.protocol.py4jjavaerror:调用o78.load时出错:java.lang.classnotfoundexception:找不到数据源:snowflake。请在以下地址查找包裹http://spark.apache.org/third-party-projects.html 在org.apache.spark.sql.execution.datasources.datasource$.lookupdateasource(datasource。scala:657)位于org.apache.spark.sql.dataframereader.load(dataframereader。scala:194)位于org.apache.spark.sql.dataframereader.load(dataframereader。scala:167)在sun.reflect.nativemethodaccessorimpl.invoke0(本机方法)位于sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl)。java:62)在sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl。java:43)在java.lang.reflect.method.invoke(方法。java:498)在py4j.reflection.methodinvoker.invoke(methodinvoker。java:244)在py4j.reflection.reflectionengine.invoke(reflectionengine。java:357)在py4j.gateway.invoke(gateway。java:282)在py4j.commands.abstractcommand.invokemethod(abstractcommand。java:132)在py4j.commands.callcommand.execute(callcommand。java:79)在py4j.gatewayconnection.run(网关连接。java:238)在java.lang.thread.run(线程。java:748)原因:java.lang.classnotfoundexception:snowflake.defaultsource位于java.net.urlclassloader.findclass(urlclassloader)。java:382)在java.lang.classloader.loadclass(classloader。java:418)在sun.misc.launcher$appclassloader.loadclass(launcher。java:352)在java.lang.classloader.loadclass(classloader。java:351)在
org.apache.spark.sql.execution.datasources.datasource$$anonfun$20$$anonfun$apply$12.apply(datasource.scala:634)位于

ef1yzkbh

ef1yzkbh1#

错误消息显示“java.lang.classnotfoundexception:找不到数据源:snowflake”。创建作业时是否使用了合适的jar并将其传递给胶水?这里有一些例子
在pyspark中运行自定义java类

相关问题