假设我们有这样的工作:
class MRjob(JobTask):
def output(self):
return ...
def requires(self):
return ...
def mapper(self, line):
# some line process
yield key, (...information, stored in hashable type...)
def reducer(self,key,values):
# some reduce logic... for example this
unique = set(values)
for elem in unique:
yield key, elem[0], elem[1]
在output方法中,我应该如何将数据插入现有的表分区(表也是以orc格式存储的)?我想跳过将数据转换成orc的过程,所以我试着
return HivePartitionTarget(self.insert_table, database=self.database_name, partition=partition)
但这不管用。我还发现luigi试图将输出传递给某个文件。使用hivepartitiontarget时,luigi返回错误,如“object has no attribute write”,因此我的假设是hivepartitiontarget不包含write方法。因此,我认为我做错了什么,应该使用另一种方法,但没有找到一个单一的例子
1条答案
按热度按时间6yjfywim1#
我不太清楚如何在未来实现这一目标
luigi
. 我的建议是使用一种简单的方法来编写输出luigi
以普通分隔格式编写脚本(比如逗号分隔格式)。在上面创建一个外部配置单元表:
使用简单配置单元将数据插入原始表(具有orc格式的数据)
insert-into-select
查询。您的目标表将具有orc格式的数据,而hive将为您处理转换。
有关详细语法,请参阅https://cwiki.apache.org/confluence/display/hive/languagemanual+dml#languagemanualdml-将数据从查询插入HiveTables