如何在Azure Databricks PySpark中执行存储过程?

tzdcorbm  于 12个月前  发布在  Spark
关注(0)|答案(3)|浏览(128)

我可以在Azure Databricks中使用PySpark执行简单的SQL语句,但我想执行存储过程。下面是我尝试的PySpark代码。

#initialize pyspark
import findspark
findspark.init('C:\Spark\spark-2.4.5-bin-hadoop2.7')
#import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
import pandas as pd

#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

table = "dbo.test"
#read table data into a spark dataframe
jdbcDF = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://localhost:1433;databaseName=Demo;integratedSecurity=true;") \
    .option("dbtable", table) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

#show the data loaded into dataframe
#jdbcDF.show()
sqlQueries="execute testJoin"
resultDF=spark.sql(sqlQueries)
resultDF.show(resultDF.count(),False)

这不管用-我该怎么做?

7vhp5slm

7vhp5slm1#

如果有人仍然在寻找一种方法来实现这一点,可以使用spark session的内置jdbc-connector。下面的代码示例将做的伎俩:

import msal

# Set url & credentials
jdbc_url = ...
tenant_id = ...
sp_client_id = ...
sp_client_secret = ...

# Write your SQL statement as a string
name = "Some passed value"

statement = f"""
EXEC Staging.SPR_InsertDummy
  @Name = '{name}'
"""

# Generate an OAuth2 access token for service principal
authority = f"https://login.windows.net/{tenant_id}"
app = msal.ConfidentialClientApplication(sp_client_id, sp_client_secret, authority)
token = app.acquire_token_for_client(scopes="https://database.windows.net/.default")["access_token"]

# Create a spark properties object and pass the access token
properties = spark._sc._gateway.jvm.java.util.Properties()
properties.setProperty("accessToken", token)

# Fetch the driver manager from your spark context
driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager

# Create a connection object and pass the properties object
con = driver_manager.getConnection(jdbc_url, properties)

# Create callable statement and execute it
exec_statement = con.prepareCall(statement)
exec_statement.execute()

# Close connections
exec_statement.close()
con.close()

有关更多信息和使用SQL用户凭据通过JDBC连接的类似方法,或者如何获取返回参数,我建议您查看此博客文章:
https://medium.com/delaware-pro/executing-ddl-statements-stored-procedures-on-sql-server-using-pyspark-in-databricks-2b31d9276811

k97glaaz

k97glaaz2#

到目前为止,还不支持通过JDBC连接从Azure数据块运行存储过程。但你的选择是:
1.使用pyodbc库连接并执行过程。但是通过使用这个库,这意味着您将在驱动程序节点上运行代码,而您的所有工作线程都处于空闲状态。详情请参阅本文。https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark
1.使用SQL表函数而不是过程。在某种意义上,您可以使用任何可以在SQL查询的FORM子句中使用的内容。
1.既然你在一个azure环境中,那么使用azure数据工厂(执行你的过程)和azure数据块的组合可以帮助你构建非常强大的管道。

kr98yfug

kr98yfug3#

我相信最上面的答案显示了如何执行命令/存储过程,但没有显示如果存储过程返回一个表,如何获得结果。这是我在Google上的第一个结果,希望对大家有所帮助。

解决方案

执行存储过程失败的原因首先是spark将查询括起来并将其分配给别名(select * from (query))。更多详情https://stackoverflow.com/a/75330385/19871699
我有两套不同的代码,一套pyspark,一套scala。两者都使用jdbc驱动程序做同样的事情。我更喜欢斯卡拉。现在,它需要数据集适合内存,但你可以用JavaScript来解决它。
关于如何使用驱动程序的许多文档都在这里:https://learn.microsoft.com/en-us/sql/connect/jdbc/microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16
两组代码:
1.创建到服务器的连接对象
1.创建SQL查询语句并执行它
1.获取元数据和结果集
1.提取列/计数的信息
1.循环遍历结果集的每一行和其中的每个字段以提取值,填充字段的行列表,然后将其追加到行列表。
1.将列表的结果列表转换为嵌套框架

Python

日期仍然是JavaObject类型有一个问题,尽管pandas读起来很好。如果你使用了spark. dataframe(pd.DataFrame(...)),它会抱怨这个类型,但是一个中间的csv可以工作。

  • 我使用getObject,因为它提取任何结果。还有其他方法,如getString,getFloat,...那会更好 *

这是py 4j可能出现的最糟糕的情况。你要将整个表中的每个值一个接一个地序列化,然后从scala发送到python。

%python
import pandas as pd

# parameters
username = ""
password = ""
host = ""
port = ""
database = ""

query = """
exec ...
"""

# construct jdbc url
sqlsUrl = f"jdbc:sqlserver://{host}:{port};database={database}"

# get the gateway/connection to py4j
gateway = sc._gateway
jvm = gateway.jvm

# connection to the server
con = jvm.java.sql.DriverManager.getConnection(sqlsUrl, username,password)

# create a statement, execute it, get result set and metadata
statement = con.prepareCall(query)
statement.execute()

metadata = statement.getMetaData()
resultset = statement.getResultSet()

# extract column names from metadata
columns = [metadata.getColumnName(i+1) for i in range(metadata.getColumnCount())]

# loop through the result set and make into a list of lists
rows = []
while resultset.next():
    row = []
    for i in range(len(columns)):
        row.append(resultset.getObject(i+1))
    rows.append(row)

# close the connection
con.close()

# make into a pandas dataframe and write temporarily to a csv (bug with date)
pd.DataFrame(rows, columns=columns).to_csv("tmp.csv",index=False)

df = spark.createDataFrame(pd.read_csv("tmp.csv"))

# voila
display(df)

Scala

%scala
import org.apache.spark.sql.types._
import java.sql.DriverManager
import java.sql.ResultSet

// connection parameters
val username = ""
val password = ""
val host = ""
val port = ""
val database = ""

val sqlsUrl = s"jdbc:sqlserver://$host:$port;databaseName=$database"

// query
val query = """
exec ..
"""

// get connection
val connection = DriverManager.getConnection(sqlsUrl, username,password)

// prepare statement and execute it
val statement = connection.prepareCall(query)
statement.executeQuery()

// fetch results and the structure of the results
val metaData = statement.getMetaData()
val resultSet = statement.getResultSet()
val indices = (1 to metaData.getColumnCount).toList

// translation of java types to spark types
val columnTypesSpark = Map(
    "java.lang.String"-> StringType,
    "java.lang.Short"-> ShortType,
    "java.sql.Date"-> DateType,
    "java.sql.Timestamp"-> TimestampType,
    "java.math.BigDecimal"-> DecimalType(10,1), // whatever precision you want
    "java.lang.Float" -> FloatType,
    "java.lang.Integer" -> IntegerType,
    "java.lang.Boolean" -> BooleanType)

// list out the column types in the returned data
val columnTypes = indices.map(i => columnTypesSpark(metaData.getColumnClassName(i)) )

// list out the column names in the returned data
val columnNames = indices.map(i => metaData.getColumnLabel(i))

// define the schema
val schema = StructType(indices.map(i => StructField(columnNames(i-1),columnTypes(i-1)) ))

// loop through the results dataset
val results: List[Row] = Iterator
  .continually {
    if (resultSet.next()) Some(Row(indices.map(o => resultSet.getObject(o)).toList:_*))
    else None
  }
  .takeWhile(_.isDefined)
  .map(_.get)
  .toList

// close connection
con.close()

// convert results rowset into an RDD and then assign results into a dataframe
val df = spark.createDataFrame(sc.parallelize(results),schema)

display(df)

很长的回答。希望这能帮助到某人。

相关问题