我是spark scala世界的新手。我正在尝试用同样的方法复制一个etl逻辑。所以本质上,我想创建动态代码,其中我需要从存在特定列的表中提取数据,对该列进行筛选,然后将其数据存储到azure blob。
val url = "<Host Address>"
val user = "<Username>"
val pw = "<Password>"
val driver = "org.postgresql.Driver"
val sslfactory = "org.postgresql.ssl.NonValidatingFactory"
var sql_lookup = " select * from information_schema.tables as inf_schema left join (SELECT table_schema as country_table_schema ,table_name as country_table_name, column_name as country_table_column_name FROM information_schema.columns WHERE table_schema = 'Schema_name' AND columns.column_name = 'Column_A') as country on inf_schema.table_schema = country.country_table_schema and inf_schema.table_name = country.country_table_name WHERE inf_schema.table_schema='<Schemaname>'"
var dfTbl = (spark.read
.format("jdbc")
.option("url", url)
.option("ssl","true")
.option("sslfactory",sslfactory)
.option("user", user)
.option("password", pw)
.option("driver", driver)
.option("query",sql_lookup)
.load())
var dfTbl_withCountry = (dfTbl.select(dfTbl.col("*")).filter(dfTbl.col( "country_table_column_name" ).isNotNull)).select("table_name")
val dfTbl_wc = dfTbl_withCountry.collect().foreach(row => row.toSeq.foreach(col => (col)))
for (table <- dfTbl_wc ){
var sql = " select * from <Schemaname>."+s"${table}" + " where <Colume_name> = '<Value>'"
var df = (spark.read
.format("jdbc")
.option("url", url)
.option("ssl","true")
.option("sslfactory",sslfactory)
.option("user", user)
.option("password", pw)
.option("driver", driver)
.option("query",sql)
.load())
var File_withCountry = df
.coalesce(1)
.write
.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter", "~")
.mode(SaveMode.Overwrite)
.option("encoding", "UTF-8")
.csv("wasbs://<BlobContainer>@<StorageAccount>.blob.core.windows.net/<Targetdirectory>/"+s"${table}")
val partition_path = dbutils.fs.ls("wasbs://<BlobContainer>@<StorageAccount>.blob.core.windows.net/<Targetdirectory>/"+s"${table}")
.filter(file=>file.name.startsWith("part"))(0).path
dbutils.fs.cp(partition_path,"wasbs://<BlobContainer>@<StorageAccount>.blob.core.windows.net/<Targetdirectory>/"+s"${table}"+".csv")
dbutils.fs.rm (partition_path, recurse = true)
}
下面是查询输出内部子查询
SELECT table_schema as country_table_schema ,table_name as country_table_name, column_name as country_table_column_name FROM information_schema.columns WHERE table_schema = '<Schema_name>' AND columns.column_name = 'Column_A'
“country\u table\u name”列中dataframe sql\u lookup的输出的每个表名,我都要提取它们。我将其输出存储在Dataframedftbl中。所以,在dataframedftbl\uwc中,我迭代dataframedftbl中的每一行。在这里,我使用for循环从dftbl\u wc的dataframe中的每一行中选择完整的数据
但是由于某些原因,这段代码在for循环部分不能正常工作。请帮帮我!
2条答案
按热度按时间vxqlmq5t1#
可以在dataframe中创建包含要运行的查询的新列。然后您可以选择查询列并将其转换为数组并在其上循环以获得最终的Dataframe,然后对其执行任何操作,如将其保存为表、Parquet文件、csv文件等。如果要单独保存每个表数据,则必须在下面的for循环中编写代码才能完成此操作。
现在您可以根据需要保存组合的Dataframe。
pgky5nke2#
我对代码做了一些调整(基本上结合了我之前使用的代码和@nikunkakadiya共享的代码中的几行),它对我很有用。共享代码以供参考-
如果有什么问题,请告诉我。
谢谢你们的支持。真的很感激。干杯!