CREATE TABLE IF NOT EXISTS input_raw(line STRING);
LOAD DATA LOCAL INPATH '${hiveconf:input}' OVERWRITE INTO TABLE input_raw;
CREATE TABLE IF NOT EXISTS processed_data(
field1 STRING,
field2 STRING
field3 STRING);
INSERT INTO TABLE processed_data
SELECT
substr(line,1,4) as field1,
substr(line,5,4) as field2,
substr(line,9,4) as field3
FROM input_raw;
DROP TABLE input_raw;
import sys
for line in sys.stdin:
# line will have a newline on the end you don't want
line = line.strip()
output = []
output.append(line[:4])
output.append(line[4:8])
output.append(line[8:12])
print '\t'.join(output)
您的配置单元脚本将如下所示:
CREATE TABLE IF NOT EXISTS input_raw(line STRING);
LOAD DATA LOCAL INPATH '${hiveconf:input}' OVERWRITE INTO TABLE input_raw;
CREATE TABLE IF NOT EXISTS processed_data(
field1 STRING,
field2 STRING
field3 STRING);
delete FILE processing.py;
add FILE processing.py;
INSERT INTO TABLE processed_data
SELECT
TRANSFORM (line)
USING 'python processing.py'
AS(field1, field2, field3)
FROM input_raw;
DROP TABLE input_raw;
2条答案
按热度按时间lmyy7pcs1#
您还可以按照您的方法避免流式处理和python以及所有这些,但是使用hivesubstr()
kyvafyod2#
我用这种方法解决了类似的问题:
给定换行分隔行、固定偏移列的输入,
首先将数据输入到一个包含一列的表中,一个字符串
然后通过流式hadoop传递java类或python模块,该模块接收一行并返回多个字段:
您的配置单元脚本将如下所示: