将数据作为parquet从sqlserver加载到s3-aws emr

kokeuurv  于 2021-05-29  发布在  Hadoop
关注(0)|答案(4)|浏览(373)

目前,我们的数据在sqlserver中,我们正在尝试将它们作为Parquet文件移动到s3存储桶中。目的是分析aws emr中的s3数据(主要是spark、hive和presto)。我们不想将数据存储在hdfs中。
这里有什么选择?据我们所知,似乎我们可以使用spark或sqoop进行导入。虽然由于并行性(parallel db connections),在这种情况下sqoop比spark快,但似乎不可能将parquet文件从sqoop写入s3,因为sqoop+s3+parquet会导致错误的fs错误。解决方法是先转到hdfs,然后转到s3。然而,这似乎是无效的。如何使用sparksql从sqlserver中提取这些数据并在s3中作为parquet写入?
一旦我们以这种格式将这些数据加载为Parquet

s3://mybucket/table_a/day_1/(parquet files 1 ... n).
s3://mybucket/table_a/day_2/(parquet files 1 ... n).
s3://mybucket/table_a/day_3/(parquet files 1 ... n).

如何使用配置单元将它们组合为单个表和查询。我知道我们可以创建指向s3的hive外部表,但是我们可以指向多个文件吗?
谢谢。
编辑:按要求添加。
org.apache.hive.service.cli.hivesqlexception:处理语句时出错:失败:执行错误,从org.apache.hive.service.cli.operation.operation.tosqlexception(操作)中的org.apache.hadoop.hive.ql.exec.ddltask返回代码1。java:380)在org.apache.hive.service.cli.operation.sqloperation.runquery(sqloperation。java:257)在org.apache.hive.service.cli.operation.sqloperation.access$800(sqloperation。java:91)在org.apache.hive.service.cli.operation.sqloperation$backgroundwork$1.run(sqloperation。java:348)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:422)在org.apache.hadoop.security.usergroupinformation.doas(usergroupinformation。java:1698)在org.apache.hive.service.cli.operation.sqloperation$backgroundwork.run(sqloperation。java:362)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)

fwzugrvs

fwzugrvs1#

spark read jdbc使用多个连接来拉取数据。这是链接

def
jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): 

Construct a DataFrame representing the database table accessible via JDBC URL url named table. Partitions of the table will be retrieved in parallel based on the parameters passed to this function.

Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.

url
JDBC database url of the form jdbc:subprotocol:subname.

table
Name of the table in the external database.

columnName
the name of a column of integral type that will be used for partitioning.

lowerBound
the minimum value of columnName used to decide partition stride.

upperBound
the maximum value of columnName used to decide partition stride.

numPartitions
the number of partitions. This, along with lowerBound (inclusive), upperBound (exclusive), form partition strides for generated WHERE clause expressions used to split the column columnName evenly. When the input is less than 1, the number is set to 1.

connectionProperties
JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least a "user" and "password" property should be included. "fetchsize" can be used to control the number of rows per fetch.DataFrame

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.dataframereader
http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-到其他数据库
创建分区列为日期的配置单元表,保存并指定以下位置

create table table_name (
  id                int,
  dtDontQuery       string,
  name              string
)
partitioned by (date string) Location s3://s3://mybucket/table_name/

在数据中添加一个名为date的列,并用sysdate填充它。如果不需要,您无需添加列,我们只需填充位置即可。但它也可以是你分析的审计专栏。使用Spark dataframe.partitionBy(date).write.parquet.location(s3://mybucket/table_name/) 每日执行 MSCK repair on the hive table 因此新分区被添加到表中。
对非数字列应用numpartitions是通过将该列的哈希函数创建为所需的连接数并使用该列

m4pnthwp

m4pnthwp2#

spark是一个很好的实用工具。您可以轻松地连接到jdbc数据源,并且可以通过指定凭据和s3路径(例如pyspark save dataframe to s3)来写入s3。
如果您使用的是aws,那么spark、presto和hive的最佳选择就是使用aws胶水metastore。这是一个数据目录,它将s3对象注册为数据库中的表,并提供了一个用于定位这些对象的api。
问题2的答案是肯定的,您可以有一个引用多个文件的表。如果已经对数据进行了分区,则通常需要这样做。

wfauudbj

wfauudbj3#

您可以按如下方式创建配置单元外部表

create external table table_a (
 siteid                    string,
 nodeid                    string,
 aggregation_type          string
 )
 PARTITIONED BY (day string)
 STORED AS PARQUET
 LOCATION 's3://mybucket/table_a';

然后可以运行以下命令将存储在each days目录下的分区文件注册到hivematastore中

MSCK REPAIR TABLE table_a;

现在您可以通过配置单元查询访问文件。我们在项目中使用了这种方法,效果很好。执行上述命令后,可以运行查询

select * from table_a where day='day_1';

希望这有帮助。
-拉维

ig9co6j1

ig9co6j14#

虽然我有点晚了,但是为了将来的参考。在我们的项目中,我们正是这样做的,我更喜欢sqoop而不是spark。
原因:我使用glue将mysql中的数据读取到s3,但是读取不是并行的(aws支持这样做,glue(使用pyspark)就是这样工作的,但是一旦读取完成,就会写入s3,这是并行的)。这样做效率低,速度慢。100gb的数据读写到s3需要1.5小时。
所以我在emr上使用了sqoop,并打开了glue catalog(所以hive metastore在aws上),我可以直接从sqoop写入s3,这比读取100gb的数据要快得多,需要20分钟。
您必须设置hive.metastore.warehouse.dir=s3://并且如果您执行配置单元导入或直接写入,您应该看到数据正在写入s3。

相关问题