cascading.pipe.Each类的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(12.8k)|赞(0)|评价(0)|浏览(138)

本文整理了Java中cascading.pipe.Each类的一些代码示例,展示了Each类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Each类的具体详情如下:
包路径:cascading.pipe.Each
类名称:Each

Each介绍

[英]The Each operator applies either a Function or a Filter to each entry in the Tuplestream. Any number of Each operators can follow an Each, Splice, or Everyoperator.
[中]Each操作符对Tuplestream中的每个条目应用函数或过滤器。任何数量的每个操作符都可以跟随一个Each、Splice或Everyoperator。

代码示例

代码示例来源:origin: cwensel/cascading

@Test
public void testGeneratorAggregator() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new TestAggregator( new Fields( "count1" ), new Fields( "ip" ), new Tuple( "first1" ), new Tuple( "first2" ) ) );
 pipe = new Every( pipe, new TestAggregator( new Fields( "count2" ), new Fields( "ip" ), new Tuple( "second" ), new Tuple( "second2" ), new Tuple( "second3" ) ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "generatoraggregator" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 8 * 2 * 3, null );
 }

代码示例来源:origin: cascading/lingual-core

public Pipe addDebug( CascadingRelNode node, Pipe pipe )
 {
 if( pipe instanceof Each && ( (Each) pipe ).getOperation() instanceof Debug )
  return pipe;
 String name = makeName( node, pipe );
 return new Each( pipe, debugLevel, new Debug( name, true ) );
 }

代码示例来源:origin: cwensel/cascading

@Override
public void initialize()
 {
 super.initialize();
 filter = each.getFilter();
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSinkUnknown() throws IOException
 {
 getPlatform().copyFromLocal( inputFileCross );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.RESULTS );
 Tap sink = getPlatform().getTabDelimitedFile( Fields.UNKNOWN, getOutputPath( "unknownsinks" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 37, null );
 TupleEntryIterator iterator = flow.openSink();
 String line = iterator.next().getTuple().toString();
 assertTrue( "not equal: wrong values: " + line, line.matches( "[0-9]\t[a-z]\t[A-Z]" ) );
 iterator.close();
 }

代码示例来源:origin: cwensel/cascading

@Override
public List<Pipe> resolveTails( Context context )
 {
 Pipe pipe = new Pipe( (String) context.getFlow().getSourceNames().get( 0 ) );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 return Arrays.asList( pipe );
 }
};

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSinkUnknown() throws IOException
 {
 getPlatform().copyFromLocal( inputFileCross );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.RESULTS );
 Tap sink = getPlatform().getTabDelimitedFile( Fields.UNKNOWN, getOutputPath( "unknownsinks" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 37, null );
 TupleEntryIterator iterator = flow.openSink();
 String line = iterator.next().getTuple().toString();
 assertTrue( "not equal: wrong values: " + line, line.matches( "[0-9]\t[a-z]\t[A-Z]" ) );
 iterator.close();
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testGeneratorAggregator() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new TestAggregator( new Fields( "count1" ), new Fields( "ip" ), new Tuple( "first1" ), new Tuple( "first2" ) ) );
 pipe = new Every( pipe, new TestAggregator( new Fields( "count2" ), new Fields( "ip" ), new Tuple( "second" ), new Tuple( "second2" ), new Tuple( "second3" ) ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "generatoraggregator" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 8 * 2 * 3, null );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testUnGroup() throws Exception
 {
 copyFromLocal( inputFileJoined );
 Tap source = getPlatform().getTextFile( Fields.size( 2 ), inputFileJoined );
 Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( 1 ), new RegexSplitter( Fields.size( 3 ) ) );
 pipe = new Each( pipe, new UnGroup( Fields.size( 2 ), new Fields( 0 ), Fields.fields( new Fields( 1 ), new Fields( 2 ) ) ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 10 );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testUnGroup() throws Exception
 {
 getPlatform().copyFromLocal( inputFileJoined );
 Tap source = getPlatform().getTextFile( inputFileJoined );
 Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
 pipe = new Each( pipe, new UnGroup( new Fields( "num", "char" ), new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 10 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testUnGroup() throws Exception
 {
 getPlatform().copyFromLocal( inputFileJoined );
 Tap source = getPlatform().getTextFile( inputFileJoined );
 Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
 pipe = new Each( pipe, new UnGroup( new Fields( "num", "char" ), new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 10 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testUnGroup() throws Exception
 {
 copyFromLocal( inputFileJoined );
 Tap source = getPlatform().getTextFile( Fields.size( 2 ), inputFileJoined );
 Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( 1 ), new RegexSplitter( Fields.size( 3 ) ) );
 pipe = new Each( pipe, new UnGroup( Fields.size( 2 ), new Fields( 0 ), Fields.fields( new Fields( 1 ), new Fields( 2 ) ) ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 10 );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSwap() throws Exception
 {
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
 Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ipaddress" ), getOutputPath( "swap" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" );
 pipe = new Each( pipe, new Fields( "line" ), parser, Fields.SWAP );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) );
 pipe = new Each( pipe, new Fields( "ip" ), new Identity( new Fields( "ipaddress" ) ), Fields.SWAP );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) );
 }

代码示例来源:origin: cascading/cascading-platform

private Flow secondFlow( String name, Tap source )
 {
 Pipe pipe = new Pipe( name );
 pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third", "fourth" ), "\\." ) );
 pipe = new Each( pipe, new FieldJoiner( new Fields( "mangled" ), "-" ) );
 Tap sink = getPlatform().getTabDelimitedFile( new Fields( "mangled" ), getOutputPath( name ), SinkMode.REPLACE );
 return getPlatform().getFlowConnector().connect( source, sink, pipe );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testUnGroupAnon() throws Exception
 {
 getPlatform().copyFromLocal( inputFileJoined );
 Tap source = getPlatform().getTextFile( inputFileJoined );
 Tap sink = getPlatform().getTextFile( getOutputPath( "ungroupedanon" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
 pipe = new Each( pipe, new UnGroup( new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 10 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testUnGroupAnon() throws Exception
 {
 getPlatform().copyFromLocal( inputFileJoined );
 Tap source = getPlatform().getTextFile( inputFileJoined );
 Tap sink = getPlatform().getTextFile( getOutputPath( "ungroupedanon" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
 pipe = new Each( pipe, new UnGroup( new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 10 );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testLastEachNotModified() throws Exception
 {
 copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new TestFunction( new Fields( "insert" ), new Tuple( "inserted" ) ) );
 pipe = new GroupBy( pipe, new Fields( "insert" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "lasteachmodified" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 10, null );
 }

代码示例来源:origin: cwensel/cascading

private Flow firstFlow( String name )
 {
 Tap source = getPlatform().getTextFile( inputFileIps );
 Pipe pipe = new Pipe( name );
 pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "ip" ) ), new Fields( "ip" ) );
 Tap sink = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( name ), SinkMode.REPLACE );
 return getPlatform().getFlowConnector().connect( source, sink, pipe );
 }

代码示例来源:origin: cwensel/cascading

private Flow firstFlow( String path )
 {
 Tap source = getPlatform().getTextFile( inputFileIps );
 Pipe pipe = new Pipe( "first" );
 pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "ip" ) ), new Fields( "ip" ) );
 Tap sink = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( path + "/first" ), SinkMode.REPLACE );
 return getPlatform().getFlowConnector().connect( source, sink, pipe );
 }

代码示例来源:origin: cascading/cascading-platform

private Flow firstFlow( String path )
 {
 Tap source = getPlatform().getTextFile( inputFileIps );
 Pipe pipe = new Pipe( "first" );
 pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "ip" ) ), new Fields( "ip" ) );
 Tap sink = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( path + "/first" ), SinkMode.REPLACE );
 return getPlatform().getFlowConnector().connect( source, sink, pipe );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testLastEachNotModified() throws Exception
 {
 copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new TestFunction( new Fields( "insert" ), new Tuple( "inserted" ) ) );
 pipe = new GroupBy( pipe, new Fields( "insert" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "lasteachmodified" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 10, null );
 }

相关文章