数据块/ Apache Spark PySpark重构/重新编码问题

7jmck4yq  于 2022-12-11  发布在  Spark
关注(0)|答案(1)|浏览(163)

我正在使用PySpark on Databricks将数据写入SQL数据库的模式OCC和dbo,如下所示:

l_schemas=["OCC"]# list for storing your schemas
l_tables=[table] # list for storing respective tables in that particular indexed schema
for i in range(0,len(l_schemas)):
    s=l_schemas[i]+"."+l_tables[i] # concatenation and making as schema.table_name
    my_dfone.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()

l_schemas=["dbo"]# list for storing your schemas
l_tables=[table] # list for storing respective tables in that particular indexed schema
for i in range(0,len(l_schemas)):
    s=l_schemas[i]+"."+l_tables[i] # concatenation and making as schema.table_name
    my_dftwo.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()

有人建议我使用下面的代码(这要干净得多)

l_schemas=["OCC","dbo","one"]# list for storing your schemas
l_tables=[table] # list for storing respective tables in that particular indexed schema
for i in range(0,len(l_schemas)):
    s=l_schemas[i]+"."+l_tables[i] # concatenation and making as schema.table_name
    df.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()

上面的方法很好,但是我必须将df赋给一个变量,因为我需要同时执行my_dfonemy_dftwo
我的代码尝试如下:

l_schemas=["OCC", "dbo"]# list for storing your schemas
l_tables=[table] # list for storing respective tables in that particular indexed schema

for i in range(0,len(l_schemas)):
    s=l_schemas[i]+"."+l_tables[i] # concatenation and making as schema.table_name
    my_dfone.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()\
    .my_dftwo.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()

但它失败了。
因此,有人能告诉我如何将代码写入SQL数据库,而不必实际复制代码吗?基本上,这样会更干净
P.S.是变量
我还尝试了以下方法:

l_schemas=["HR", "FPP"]# list for storing your schemas
l_tables= str(table) # list for storing respective tables in that particular indexed schema

for i in range(0,len(l_schemas)):
    s=l_schemas[i]+"."+l_tables[i] # concatenation and making as schema.table_name
    df_fpp.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()
    
    df_hr.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()

但是失败了
我现在尝试以下方法,但还是没有成功:

l_schemas=["FPP","HR"]# list for storing schemas
l_tables=[table, table] # list for storing respective tables in the particular indexed schema

for i in range(0,len(l_schemas)):
    s=l_schemas[i]+"."+l_tables[i] # concatenation and making as schema.table_name
    df_fpp.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()

for i in range(0,len(l_schemas)):
    s=l_schemas[i]+"."+l_tables[i] # concatenation and making as schema.table_name
    df_hr.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()

更新问题,很遗憾,提供的以下代码不起作用

table = Ancestor['ancestorPath'].split("/")[7]
l_tables=[table]
l_schemas = ['dbo']
for i in range(0,len(l_tables)):
    s=l_schemas[i]+"."+l_tables[i] # concatenation and making as schema.table_name
    my_dfone.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()
    
    my_dftwo.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()

我尝试修改代码如下:

table = Ancestor['ancestorPath'].split("/")[7]
l_tables=[table]
l_schemas = ['FPP','HR']
for i in range(0,len(l_tables)):
    s=l_schemas[i]+"."+l_tables[i] # concatenation and making as schema.table_name
    df_fpp.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()
    
    df_hr.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()

但是,它一直只显示df_hr.write.mode(“overwrite”)的结果

sgtfey8w

sgtfey8w1#

我尝试在我的环境中重现相同的错误,但还是出现了相同的错误

注意:删除第二个Dataframe前的点(.)

如您所见, Dataframe 已成功执行。

代码:

l_schemas=["dbo", "dbo"]# list for storing your schemas
l_tables=["Persons","db123"] # list for storing respective tables in that particular indexed schema

for i in range(0,len(l_schemas)):
    s=l_schemas[i]+"."+l_tables[i] # concatenation and making as schema.table_name
    df_one.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()
    
    df_two.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()

更新日期:

我采用了saideep建议方法
正如你可以检查它的工作罚款:

试试这个方法

table = Ancestor['ancestorPath'].split("/")[7]
l_tables=[table]
l_schemas = ['dbo']
for i in range(0,len(l_tables)):
    s=l_schemas[i]+"."+l_tables[i] # concatenation and making as schema.table_name
    df_one.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()
    
    df_two.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", s)\
    .save()

相关问题