我可以在Azure Databricks中使用PySpark执行简单的SQL语句,但我想执行存储过程。下面是我尝试的PySpark代码。
#initialize pyspark
import findspark
findspark.init('C:\Spark\spark-2.4.5-bin-hadoop2.7')
#import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
import pandas as pd
#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
table = "dbo.test"
#read table data into a spark dataframe
jdbcDF = spark.read.format("jdbc") \
.option("url", f"jdbc:sqlserver://localhost:1433;databaseName=Demo;integratedSecurity=true;") \
.option("dbtable", table) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
#show the data loaded into dataframe
#jdbcDF.show()
sqlQueries="execute testJoin"
resultDF=spark.sql(sqlQueries)
resultDF.show(resultDF.count(),False)
这不管用-我该怎么做?
3条答案
按热度按时间7vhp5slm1#
如果有人仍然在寻找一种方法来实现这一点,可以使用spark session的内置jdbc-connector。下面的代码示例将做的伎俩:
有关更多信息和使用SQL用户凭据通过JDBC连接的类似方法,或者如何获取返回参数,我建议您查看此博客文章:
https://medium.com/delaware-pro/executing-ddl-statements-stored-procedures-on-sql-server-using-pyspark-in-databricks-2b31d9276811
k97glaaz2#
到目前为止,还不支持通过JDBC连接从Azure数据块运行存储过程。但你的选择是:
1.使用
pyodbc
库连接并执行过程。但是通过使用这个库,这意味着您将在驱动程序节点上运行代码,而您的所有工作线程都处于空闲状态。详情请参阅本文。https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark1.使用
SQL
表函数而不是过程。在某种意义上,您可以使用任何可以在SQL查询的FORM
子句中使用的内容。1.既然你在一个azure环境中,那么使用azure数据工厂(执行你的过程)和azure数据块的组合可以帮助你构建非常强大的管道。
kr98yfug3#
我相信最上面的答案显示了如何执行命令/存储过程,但没有显示如果存储过程返回一个表,如何获得结果。这是我在Google上的第一个结果,希望对大家有所帮助。
解决方案
执行存储过程失败的原因首先是spark将查询括起来并将其分配给别名(
select * from (query)
)。更多详情https://stackoverflow.com/a/75330385/19871699我有两套不同的代码,一套pyspark,一套scala。两者都使用jdbc驱动程序做同样的事情。我更喜欢斯卡拉。现在,它需要数据集适合内存,但你可以用JavaScript来解决它。
关于如何使用驱动程序的许多文档都在这里:https://learn.microsoft.com/en-us/sql/connect/jdbc/microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16
两组代码:
1.创建到服务器的连接对象
1.创建SQL查询语句并执行它
1.获取元数据和结果集
1.提取列/计数的信息
1.循环遍历结果集的每一行和其中的每个字段以提取值,填充字段的行列表,然后将其追加到行列表。
1.将列表的结果列表转换为嵌套框架
Python
日期仍然是JavaObject类型有一个问题,尽管pandas读起来很好。如果你使用了spark. dataframe(pd.DataFrame(...)),它会抱怨这个类型,但是一个中间的csv可以工作。
这是py 4j可能出现的最糟糕的情况。你要将整个表中的每个值一个接一个地序列化,然后从scala发送到python。
Scala
很长的回答。希望这能帮助到某人。