我有一个python脚本 stream.py
,从stdin读取json行,对其进行处理,然后将json行写入stdout。
示例输入行来自 data.json
:
{"user_id":3217,"description":"some text"}
输出线示例:
{"user_id":3217,"description":"some text PROCESSED","rating":1.78}
在pig中,我尝试以这种方式流式处理数据:
data = LOAD 'data.json';
DEFINE my_stream `./stream.py` output (stdout USING JsonLoader('user_id:int, description:chararray, rating:float'));
data_streamed = STREAM data THROUGH my_stream;
ratings = FOREACH data_streamed GENERATE rating;
ratings_unique = DISTINCT ratings;
ratings_test = LIMIT ratings_unique 10;
DUMP ratings_test;
当我尝试执行时,出现以下错误:
pig script failed to validate: java.lang.ClassCastException: class org.apache.pig.builtin.JsonLoader does not implement interface org.apache.pig.StreamToPig
到目前为止,我只看到两种解决方案(如果可能的话,我希望避免):
将流数据存储到一个“临时”文件中,并使用jsonload加载它。
修改 stream.py
写tsv行而不是json行,这样我就可以用默认的pigstorage加载它。
有没有可能用jsonloader让pig流媒体工作?
暂无答案!
目前还没有任何答案,快来回答吧!