pyspark 使用AWS Glue覆盖MySQL表

js81xvg6  于 2022-12-11  发布在  Spark
关注(0)|答案(3)|浏览(256)

我有一个lambda进程,它偶尔会轮询一个API以获取最近的数据。这些数据有唯一的键,我想使用Glue来更新MySQL中的表。是否有使用此键覆盖数据的选项?(类似于Spark的mode=overwrite)。如果没有-我是否可以在插入所有新数据之前在Glue中截断表?
谢谢

zbwhf8kr

zbwhf8kr1#

我在Glue中发现了一种更简单的JDBC连接处理方法。Glue团队推荐的截断表的方法是在向Redshift集群写入数据时使用以下示例代码:

datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(
                frame = resolvechoice4, 
                catalog_connection = "<connection-name>", 
                connection_options = {
                          "dbtable": "<target-table>", 
                          "database": "testdb", 
                          "preactions":"TRUNCATE TABLE <table-name>"}, 
                redshift_tmp_dir = args["TempDir"], 
                transformation_ctx = "datasink5"
            )

其中

connection-name your Glue connection name to your Redshift Cluster
target-table    the table you're loading the data in 
testdb          name of the database 
table-name      name of the table to truncate (ideally the table you're loading into)
qhhrdooz

qhhrdooz2#

我在Redshift中遇到了同样的问题,我们能想到的最好的解决方案是创建一个Java类,它加载MySQL驱动程序并发出一个truncate表:

package com.my.glue.utils.mysql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

@SuppressWarnings("unused")
public class MySQLTruncateClient {
    public void truncate(String tableName, String url) throws SQLException, ClassNotFoundException {
        Class.forName("com.mysql.jdbc.Driver");
        try (Connection mysqlConnection = DriverManager.getConnection(url);
            Statement statement = mysqlConnection.createStatement()) {
            statement.execute(String.format("TRUNCATE TABLE %s", tableName));
        }
    }
}

将JAR和MySQL Jar依赖项沿着上传到S3,并使您的作业依赖于这些JAR和MySQL Jar依赖项。

java_import(glue_context._jvm, "com.my.glue.utils.mysql.MySQLTruncateClient")
truncate_client = glue_context._jvm.MySQLTruncateClient()
truncate_client.truncate('my_table', 'jdbc:mysql://...')
zzoitvuj

zzoitvuj3#

The workaround I've come up with, which is a little simpler than the alternative posted, is the following:

  • Create a staging table in mysql, and load your new data into this table.
  • Run the command: REPLACE INTO myTable SELECT * FROM myStagingTable;
  • Truncate the staging table

This can be done with:

import sys from awsglue.transforms
import * from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

import pymysql
pymysql.install_as_MySQLdb()
import MySQLdb
db = MySQLdb.connect("URL", "USERNAME", "PASSWORD", "DATABASE")
cursor = db.cursor()
cursor.execute("REPLACE INTO myTable SELECT * FROM myStagingTable")
cursor.fetchall()

db.close()
job.commit()

相关问题