将csv文件(包含空字符串和副本)导入到dynamodb中

wj8zmpe1  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(490)

我有一个csv文件,我正试图导入亚马逊dynamodb。所以我把它上传到s3,建立一个emr集群,然后创建一个如下的外部表:

hive> CREATE EXTERNAL TABLE s3_table_myitems (colA BIGINT, colB STRING, colC STRING, colD DOUBLE, colE DOUBLE, colF STRING, colG STRING)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES ('serialization.null.format'='""')
    STORED AS TEXTFILE
    LOCATION 's3://bucketname/dirname/'
    TBLPROPERTIES ('skip.header.line.count'='1');

csv中的任何列都可能为空,但dynamodb不能处理空字符串(“ com.amazonaws.AmazonServiceException: One or more parameter values were invalid: An AttributeValue may not contain an empty string ").
亚马逊如是说:
我们将在将来的版本中考虑这个可选的“忽略空字符串”行为…作为一种解决方法,您可以…将空属性值转换为null。例如,您可以…使用更复杂的select表达式将空字符串转换为其他内容,包括将它们设置为null。
这就是我想出来的,但看起来很难看:

hive> INSERT INTO TABLE ddb_tbl_ingredients
    SELECT
    regexp_replace(colA, '^$', 'NULL'),
    regexp_replace(colB, '^$', 'NULL'),
    regexp_replace(colC, '^$', 'NULL'),
    regexp_replace(colD, '^$', 'NULL'),
    regexp_replace(colE, '^$', 'NULL'),
    regexp_replace(colF, '^$', 'NULL'),
    regexp_replace(colG, '^$', 'NULL')
    FROM s3_table_ingredients;

是否有更好的解决方案来解决整个问题(缺少对csv的预处理),或者至少有更好的解决方案 SELECT 语法?
编辑:我最后也不得不处理重复的问题(“ com.amazonaws.AmazonServiceException: Provided list of item keys contains duplicates ").
为了子孙后代,这里是我的完整流程。我很想听到一个更好的方法来做到这一点,无论是美学还是性能。这项任务看起来很简单(“将csv文件导入dynamodb”),但到目前为止,这项工作已经花了几个小时:p


# source

hive> CREATE EXTERNAL TABLE s3_table_myitems (colA STRING, colB STRING, colC DOUBLE, colD DOUBLE, colE STRING, colF STRING)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES ('serialization.null.format'='""')
    STORED AS TEXTFILE
    LOCATION 's3://bucketname/dirname/'
    TBLPROPERTIES ('skip.header.line.count'='1');

# destination

hive> CREATE EXTERNAL TABLE ddb_tbl_myitems (colA STRING, colB STRING, colC DOUBLE, colD DOUBLE, colE STRING, colF STRING)
    STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
    TBLPROPERTIES ("dynamodb.table.name" = "myitems",
        "dynamodb.column.mapping" = "colA:colA,colB:colB,colC:colC,colD:colD,colE:colE,colF:colF");

# remove dupes - http://stackoverflow.com/a/34165762/594211

hive> CREATE TABLE tbl_myitems_deduped AS
    SELECT colA, min(colB) AS colB, min(colC) AS colC, min(colD) AS colD, min(colE) AS colE, min(colF) AS colF
    FROM (SELECT colA, colB, colC, colD, unit, colF, rank() OVER
        (PARTITION BY colA ORDER BY colB, colC, colD, colE, colF)
        AS col_rank FROM s3_table_myitems) t
    WHERE t.col_rank = 1
    GROUP BY colA;

# replace empty strings with placeholder 'NULL'

hive> CREATE TABLE tbl_myitems_noempty AS
    SELECT colA,
    regexp_replace(colB, '^$', 'NULL') AS colB,
    regexp_replace(colC, '^$', 'NULL') AS colC,
    regexp_replace(colD, '^$', 'NULL') AS colD,
    regexp_replace(colE, '^$', 'NULL') AS colE,
    regexp_replace(colF, '^$', 'NULL') AS colF
    FROM tbl_myitems_deduped
    WHERE LENGTH(colA) > 0;

# ...other preprocessing here...

# insert to DB

hive> INSERT INTO TABLE ddb_tbl_myitems
    SELECT * FROM tbl_myitems_noempty;

注: colA 是分区键。

rjee0c15

rjee0c151#

可以向create table语句中添加其他表属性,该语句将任何指定的字符视为空值。

TBLPROPERTIES('serialization.null.format'='');

相关问题