pyspark Spark,如何向外部数据库发送语句查询(既不是写查询也不是读查询)

xv8emn3q  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(88)

我有一个AWS RDS与PostgreSQL。我有两张table:staging_testtest。我想通过使用staging_test上的数据来合并到test中。
这两个表仅在AWS RDS中。我没有把它们复制到数据砖元存储中。因此,我希望将MERGE语句从数据块发送到RDS中,就像在pgadmin中运行它一样。
然而,我所发现的只是如何使用spark使用JDBC将pyspark对象框架写入外部数据库,或者如何向RDS发送SELECT查询并将结果作为pyspark对象框架检索。这就是我所发现的:

  • 写作:df.write.format('jdbc').options(...<connection_parameters>...)
  • 阅读查询:spark.read.format('jdbc').options(...<connection_and_query_parameters>...)

但是我还没有发现如何发送一个既不意味着写数据也不意味着取数据的查询。只是想在RDS上执行一些事情。比如运行存储过程、授予特权、合并、在列上创建索引等。
例如,如果我只想运行这个GRANT SELECT ON schema.table to some_user,我如何使用spark将“查询”字符串发送到外部数据库?这个“查询”既不写也不读数据,它只是发送一条语句让RDS执行。
我希望有类似SQLAlchemy的东西,但在pyspark上,你可以将这些类型的“查询”/语句发送到你的数据库

from sqlalchmey import create_engine

<Set variables USER/HOST/PASS/PORT/DB>

str_engine="postgresql://" + USER+ ":" + PASS+ "@" + HOST+ ":" + PORT+ "/" + DB
engine=create_engine(str_engine)
conn=engine.connect()

query='INSERT INTO some_table (number1,number2) VALUES (1,2)`
conn.execute(query_update)

按照同样的sqlalchmey示例,有没有一种方法可以使用spark将INSERT INTO some_table (number1,number2) VALUES (1,2)查询发送到外部数据库(使用JDBC/ODBC)?
它不一定是INSERT查询,它可以是GRANT,MERGE,CREATE INDEX,执行存储过程等。或任何其他不打算返回一个嵌套框架或使用pyspark嵌套框架写入表的语句。

pgky5nke

pgky5nke1#

Spark是一个数据处理引擎,它有连接器来读写数据。如果您需要运行命令,如SQL、存储过程等。你只需要使用相应的SDK - Pyspark仍然是Python,你可以使用任何其他库。所以,是的,只要使用SQL Alchemy或Psycopg即可。
如果你真的需要它,你可以定义一个函数来为你执行一个可以从SELECT语句调用的函数。

CREATE OR REPLACE FUNCTION exec_ddl(ddl_text text) RETURNS void AS $$$
BEGIN
  EXECUTE ddl_text;
END;
$$ LANGUAGE plpgsql;

SELECT exec_ddl('GRANT SELECT ON schema.table to some_user');

从理论上讲,这将允许您从Spark运行它,如下所示:

spark.read.format('jdbc')
  .options(...<connection_and_query_parameters>...)
  .option("dbtable", "(SELECT exec_ddl('GRANT ...')) as dummy")
  .load()

一个真正丑陋的工作区诚实。

相关问题