我有一个lambda进程,它偶尔会轮询一个API以获取最近的数据。这些数据有唯一的键,我想使用Glue来更新MySQL中的表。是否有使用此键覆盖数据的选项?(类似于Spark的mode=overwrite)。如果没有-我是否可以在插入所有新数据之前在Glue中截断表?谢谢
zbwhf8kr1#
我在Glue中发现了一种更简单的JDBC连接处理方法。Glue团队推荐的截断表的方法是在向Redshift集群写入数据时使用以下示例代码:
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf( frame = resolvechoice4, catalog_connection = "<connection-name>", connection_options = { "dbtable": "<target-table>", "database": "testdb", "preactions":"TRUNCATE TABLE <table-name>"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5" )
其中
connection-name your Glue connection name to your Redshift Cluster target-table the table you're loading the data in testdb name of the database table-name name of the table to truncate (ideally the table you're loading into)
qhhrdooz2#
我在Redshift中遇到了同样的问题,我们能想到的最好的解决方案是创建一个Java类,它加载MySQL驱动程序并发出一个truncate表:
package com.my.glue.utils.mysql; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; @SuppressWarnings("unused") public class MySQLTruncateClient { public void truncate(String tableName, String url) throws SQLException, ClassNotFoundException { Class.forName("com.mysql.jdbc.Driver"); try (Connection mysqlConnection = DriverManager.getConnection(url); Statement statement = mysqlConnection.createStatement()) { statement.execute(String.format("TRUNCATE TABLE %s", tableName)); } } }
将JAR和MySQL Jar依赖项沿着上传到S3,并使您的作业依赖于这些JAR和MySQL Jar依赖项。
java_import(glue_context._jvm, "com.my.glue.utils.mysql.MySQLTruncateClient") truncate_client = glue_context._jvm.MySQLTruncateClient() truncate_client.truncate('my_table', 'jdbc:mysql://...')
zzoitvuj3#
The workaround I've come up with, which is a little simpler than the alternative posted, is the following:
REPLACE INTO myTable SELECT * FROM myStagingTable;
This can be done with:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) import pymysql pymysql.install_as_MySQLdb() import MySQLdb db = MySQLdb.connect("URL", "USERNAME", "PASSWORD", "DATABASE") cursor = db.cursor() cursor.execute("REPLACE INTO myTable SELECT * FROM myStagingTable") cursor.fetchall() db.close() job.commit()
3条答案
按热度按时间zbwhf8kr1#
我在Glue中发现了一种更简单的JDBC连接处理方法。Glue团队推荐的截断表的方法是在向Redshift集群写入数据时使用以下示例代码:
其中
qhhrdooz2#
我在Redshift中遇到了同样的问题,我们能想到的最好的解决方案是创建一个Java类,它加载MySQL驱动程序并发出一个truncate表:
将JAR和MySQL Jar依赖项沿着上传到S3,并使您的作业依赖于这些JAR和MySQL Jar依赖项。
zzoitvuj3#
The workaround I've come up with, which is a little simpler than the alternative posted, is the following:
REPLACE INTO myTable SELECT * FROM myStagingTable;
This can be done with: