本文整理了Java中cz.seznam.euphoria.core.client.operator.FlatMap
类的一些代码示例,展示了FlatMap
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。FlatMap
类的具体详情如下:
包路径:cz.seznam.euphoria.core.client.operator.FlatMap
类名称:FlatMap
[英]A transformation of a dataset from one type into another allowing user code to generate zero, one, or many output elements for a given input element.
The user supplied map function is supposed to be stateless. It is fed items from the input in no specified order and the results of the map function are "flattened" to the output (equally in no specified order.)
Example:
Dataset strings = ...;catch (NumberFormatException e)
// ~ ignore the input if we failed to parse it
}
})
.output();
}
The above example tries to parse incoming strings as integers, silently skipping those which cannot be successfully converted. While Collector#collect(Object) has been used only once here, a FlatMap operator is free to invoke it multiple times or not at all to generate that many elements to the output dataset.
Dataset strings = ...;catch (NumberFormatException e)
// ~ ignore the input if we failed to parse it
}
})
.output();
}
上面的示例尝试将传入字符串解析为整数,悄悄地跳过那些无法成功转换的字符串。虽然收集器#collect(Object)在这里只使用过一次,但FlatMap操作符可以自由地多次调用它,或者根本不调用它来为输出数据集生成那么多元素。
####建筑商:
1.[命名]。。。。。。。。。。。。。。。。。。为操作员命名[可选]
1.共。。。。。。。。。。。。。。。。。。。。。。。输入数据集
1.使用。。。。。。。。。。。。。。。。。。。。将UnaryFunctor应用于输入元素
1.[eventTimeBy]。。。。。。。。。。。。使用ExtractEventTime更改输出元素的事件时间特性
1.产出。。。。。。。。。。。。。。。。。。。生成输出数据集
代码示例来源:origin: seznam/euphoria
public <S> Dataset<S> flatMap(UnaryFunctor<T, S> f) {
return new Dataset<>(FlatMap.of(this.wrap).using(requireNonNull(f)).output());
}
代码示例来源:origin: seznam/euphoria
@Test
public void testBuild() {
Flow flow = Flow.create("TEST");
Dataset<String> dataset = Util.createMockDataset(flow, 1);
Dataset<String> mapped = FlatMap.named("FlatMap1")
.of(dataset)
.using((String s, Collector<String> c) -> c.collect(s))
.output();
assertEquals(flow, mapped.getFlow());
assertEquals(1, flow.size());
FlatMap map = (FlatMap) flow.operators().iterator().next();
assertEquals(flow, map.getFlow());
assertEquals("FlatMap1", map.getName());
assertNotNull(map.getFunctor());
assertEquals(mapped, map.output());
assertNull(map.getEventTimeExtractor());
}
代码示例来源:origin: seznam/euphoria
@Override
public Dataset<OUT> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
FlatMap<IN, OUT> map = new FlatMap<>(name, flow, input, functor, evtTimeFn,
Sets.newHashSet(outputHints));
flow.add(map);
return map.output();
}
}
代码示例来源:origin: seznam/euphoria
@Override
@SuppressWarnings("unchecked")
public JavaRDD<?> translate(FlatMap operator, SparkExecutorContext context) {
final JavaRDD<?> input = context.getSingleInput(operator);
final UnaryFunctor<?, ?> mapper = operator.getFunctor();
final ExtractEventTime<?> evtTimeFn = operator.getEventTimeExtractor();
LazyAccumulatorProvider accumulators =
new LazyAccumulatorProvider(context.getAccumulatorFactory(), context.getSettings());
if (evtTimeFn != null) {
return input
.flatMap(new EventTimeAssigningUnaryFunctor(mapper, evtTimeFn, accumulators))
.setName(operator.getName() + "::event-time-and-apply-udf");
} else {
return input
.flatMap(new UnaryFunctorWrapper(mapper, accumulators))
.setName(operator.getName() + "::apply-udf");
}
}
}
代码示例来源:origin: seznam/euphoria
new KafkaSource(uri.getAuthority(),
uri.getPath().substring(1), settings));
return FlatMap.of(input)
.using(new UnaryFunctor<Pair<byte[], byte[]>, Pair<Long, String>>() {
private final SearchEventsParser parser = new SearchEventsParser();
DataSource<String> datasource = new SimpleHadoopTextFileSource(uri.toString());
Dataset<String> in = flow.createInput(datasource);
return FlatMap.named("PARSE-INPUT")
.of(in)
.using(new UnaryFunctor<String, Pair<Long, String>>() {
代码示例来源:origin: seznam/euphoria
flatMap.getSingleParentOrNull().get(), flatMap.get());
InputProvider ret = new InputProvider();
final UnaryFunctor mapper = flatMap.get().getFunctor();
final ExtractEventTime eventTimeFn = flatMap.get().getEventTimeExtractor();
for (Supplier s : suppliers) {
final BlockingQueue<Datum> out = new ArrayBlockingQueue(5000);
代码示例来源:origin: seznam/euphoria
@Test
public void testBuild_ImplicitName() {
Flow flow = Flow.create("TEST");
Dataset<String> dataset = Util.createMockDataset(flow, 1);
Dataset<String> mapped = FlatMap.of(dataset)
.using((String s, Collector<String> c) -> c.collect(s))
.output();
FlatMap map = (FlatMap) flow.operators().iterator().next();
assertEquals("FlatMap", map.getName());
}
代码示例来源:origin: seznam/euphoria
/**
* Collects Avro record as JSON string
*
* @param outSink
* @param inSource
* @throws Exception
*/
public static void runFlow(
DataSink<String> outSink,
DataSource<Pair<AvroKey<GenericData.Record>, NullWritable>> inSource)
throws Exception {
Flow flow = Flow.create("simple read avro");
Dataset<Pair<AvroKey<GenericData.Record>, NullWritable>> input = flow.createInput(inSource);
final Dataset<String> output =
FlatMap.named("avro2csv").of(input).using(AvroSourceTest::apply).output();
output.persist(outSink);
Executor executor = new LocalExecutor();
executor.submit(flow).get();
}
代码示例来源:origin: seznam/euphoria
@Override
public DAG<Operator<?, ?>> getBasicOps() {
return DAG.of(new FlatMap<>(
getName(), getFlow(), input,
(i, c) -> c.collect(i), eventTimeFn, getHints()));
}
代码示例来源:origin: seznam/euphoria
flatMap.getSingleParentOrNull().get(), flatMap.get());
InputProvider ret = new InputProvider();
final UnaryFunctor mapper = flatMap.get().getFunctor();
final ExtractEventTime eventTimeFn = flatMap.get().getEventTimeExtractor();
for (Supplier s : suppliers) {
final BlockingQueue<Datum> out = new ArrayBlockingQueue(5000);
代码示例来源:origin: seznam/euphoria
return FlatMap.named("FORMAT-OUTPUT")
.of(aggregated)
.using(((Pair<String, Long> elem, Collector<String> context) -> {
代码示例来源:origin: seznam/euphoria
@Override
public DAG<Operator<?, ?>> getBasicOps() {
return DAG.of(new FlatMap<>(
getName(), getFlow(), input,
(i, c) -> c.collect(i), eventTimeFn, getHints()));
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Integer> getOutput(Dataset<Integer> input) {
return FlatMap.of(input)
.using((Integer e, Collector<Integer> c) -> {
for (int i = 1; i <= e; i++) {
c.collect(i);
}
})
.output();
}
代码示例来源:origin: seznam/euphoria
@Test
public void testBuild_EventTimeExtractor() {
Flow flow = Flow.create("TEST");
Dataset<String> dataset = Util.createMockDataset(flow, 1);
Dataset<BigDecimal> mapped = FlatMap.named("FlatMap2")
.of(dataset)
.using((String s, Collector<BigDecimal> c) -> c.collect(null))
.eventTimeBy(Long::parseLong) // ~ consuming the original input elements
.output();
assertEquals(flow, mapped.getFlow());
assertEquals(1, flow.size());
FlatMap map = (FlatMap) flow.operators().iterator().next();
assertEquals(flow, map.getFlow());
assertEquals("FlatMap2", map.getName());
assertNotNull(map.getFunctor());
assertEquals(mapped, map.output());
assertNotNull(map.getEventTimeExtractor());
}
代码示例来源:origin: seznam/euphoria
@Override
@SuppressWarnings("unchecked")
public DataStream<?> translate(FlinkOperator<FlatMap> operator,
StreamingExecutorContext context) {
Settings settings = context.getSettings();
FlinkAccumulatorFactory accumulatorFactory = context.getAccumulatorFactory();
DataStream input = context.getSingleInputStream(operator);
UnaryFunctor mapper = operator.getOriginalOperator().getFunctor();
ExtractEventTime evtTimeFn = operator.getOriginalOperator().getEventTimeExtractor();
if (evtTimeFn != null) {
input = input.assignTimestampsAndWatermarks(
new EventTimeAssigner(context.getAllowedLateness(), evtTimeFn))
.returns((Class) StreamingElement.class);
}
return input
.flatMap(new StreamingUnaryFunctorWrapper(mapper, accumulatorFactory, settings))
.returns((Class) StreamingElement.class)
.name(operator.getName())
.setParallelism(operator.getParallelism());
}
}
代码示例来源:origin: seznam/euphoria
@Override
public Dataset<OUT> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
FlatMap<IN, OUT> map = new FlatMap<>(name, flow, input, functor, evtTimeFn,
Sets.newHashSet(outputHints));
flow.add(map);
return map.output();
}
}
代码示例来源:origin: seznam/euphoria
final Dataset<String> words = FlatMap.named("TOKENIZER")
.of(lines)
.using((String line, Collector<String> c) ->
代码示例来源:origin: seznam/euphoria
/** This operator can be implemented using FlatMap. */
@Override
public DAG<Operator<?, ?>> getBasicOps() {
return DAG.of(new FlatMap<>(getName(), getFlow(), input,
(elem, collector) -> {
if (predicate.apply(elem)) {
collector.collect(elem);
}
},
null,
getHints()));
}
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Integer> getOutput(Dataset<Integer> input) {
return FlatMap.of(input)
.using((Integer e, Collector<Integer> c) -> {
for (int i = 1; i <= e; i++) {
c.collect(i);
}
})
.output();
}
代码示例来源:origin: seznam/euphoria
@Test
public void testBuild_WithCounters() {
Flow flow = Flow.create("TEST");
Dataset<String> dataset = Util.createMockDataset(flow, 1);
Dataset<String> mapped = FlatMap.named("FlatMap1")
.of(dataset)
.using((String s, Collector<String> c) -> {
c.getCounter("my-counter").increment();
c.collect(s);
})
.output();
assertEquals(flow, mapped.getFlow());
assertEquals(1, flow.size());
FlatMap map = (FlatMap) flow.operators().iterator().next();
assertEquals(flow, map.getFlow());
assertEquals("FlatMap1", map.getName());
assertNotNull(map.getFunctor());
assertEquals(mapped, map.output());
}
内容来源于网络,如有侵权,请联系作者删除!