本文整理了Java中cz.seznam.euphoria.core.client.operator.FlatMap.of()
方法的一些代码示例,展示了FlatMap.of()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。FlatMap.of()
方法的具体详情如下:
包路径:cz.seznam.euphoria.core.client.operator.FlatMap
类名称:FlatMap
方法名:of
[英]Starts building a nameless FlatMap operator to transform the given input dataset.
[中]开始构建一个无名的FlatMap操作符来转换给定的输入数据集。
代码示例来源:origin: seznam/euphoria
public <S> Dataset<S> flatMap(UnaryFunctor<T, S> f) {
return new Dataset<>(FlatMap.of(this.wrap).using(requireNonNull(f)).output());
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Integer> getOutput(Dataset<Integer> input) {
return FlatMap.of(input)
.using((Integer e, Collector<Integer> c) -> {
for (int i = 1; i <= e; i++) {
c.collect(i);
}
})
.output();
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Integer> getOutput(Dataset<Integer> input) {
return FlatMap.of(input)
.using((Integer e, Collector<Integer> c) -> {
for (int i = 1; i <= e; i++) {
c.collect(i);
}
})
.output();
}
代码示例来源:origin: seznam/euphoria
private void run() {
Flow flow = Flow.create();
Dataset<Pair<ImmutableBytesWritable, Result>> ds = flow.createInput(
Utils.getHBaseSource(input, conf.get()));
FlatMap.of(ds)
.using((Pair<ImmutableBytesWritable, Result> p, Collector<byte[]> c) -> {
writeCellsAsBytes(p.getSecond(), c);
})
.output()
.persist(Utils.getSink(output, conf.get()));
LOG.info("Starting flow reading from {} and persisting to {}", input, output);
executor.submit(flow).join();
}
代码示例来源:origin: seznam/euphoria
static <W, T> Dataset<Pair<W, T>>
extractWindowsToPair(Dataset<T> input, Class<W> expectedWindowType) {
return FlatMap.of(input)
.using((UnaryFunctor<T, Pair<W, T>>) (elem, context) -> {
Object actualWindow = context.getWindow();
if (actualWindow != null && !expectedWindowType.isAssignableFrom(actualWindow.getClass())) {
throw new IllegalStateException(
"Encountered window of type " + actualWindow.getClass()
+ " but expected only " + expectedWindowType);
}
@SuppressWarnings("unchecked")
Pair<W, T> out = Pair.of((W) actualWindow, elem);
context.collect(out);
})
.output();
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Integer> getOutput(Dataset<Integer> input) {
return FlatMap.of(input).using(
(UnaryFunctor<Integer, Integer>) (elem, collector) -> {
collector.getCounter("input").increment();
collector.getCounter("sum").increment(elem);
collector.collect(elem * elem);
})
.output();
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Integer> getOutput(Dataset<Integer> input) {
return FlatMap.of(input).using(
(UnaryFunctor<Integer, Integer>) (elem, collector) -> {
collector.getCounter("input").increment();
collector.getCounter("sum").increment(elem);
collector.collect(elem * elem);
})
.output();
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Triple<TimeInterval, Integer, Set<String>>> getOutput
(Dataset<Pair<String, Integer>> input) {
input = AssignEventTime.of(input).using(Pair::getSecond).output();
Dataset<Pair<Integer, Set<String>>> reduced =
ReduceByKey.of(input)
.keyBy(e -> e.getFirst().charAt(0) - '0')
.valueBy(Pair::getFirst)
.reduceBy(s -> s.collect(Collectors.toSet()))
.windowBy(Session.of(Duration.ofMillis(5)))
.output();
return FlatMap.of(reduced)
.using((UnaryFunctor<Pair<Integer, Set<String>>,
Triple<TimeInterval, Integer, Set<String>>>)
(elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(),
elem.getFirst(), elem.getSecond())))
.output();
}
代码示例来源:origin: seznam/euphoria
static <W, F, S> Dataset<Triple<W, F, S>>
extractWindows(Dataset<Pair<F, S>> input, Class<W> expectedWindowType) {
return FlatMap.of(input)
.using((UnaryFunctor<Pair<F, S>, Triple<W, F, S>>) (elem, context) -> {
Object actualWindow = context.getWindow();
if (actualWindow != null && !expectedWindowType.isAssignableFrom(actualWindow.getClass())) {
throw new IllegalStateException(
"Encountered window of type " + actualWindow.getClass()
+ " but expected only " + expectedWindowType);
}
@SuppressWarnings("unchecked")
Triple<W, F, S> out = Triple.of((W) actualWindow, elem.getFirst(), elem.getSecond());
context.collect(out);
})
.output();
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Triple<TimeInterval, Integer, Set<String>>> getOutput
(Dataset<Pair<String, Integer>> input) {
input = AssignEventTime.of(input).using(Pair::getSecond).output();
Dataset<Pair<Integer, Set<String>>> reduced =
ReduceByKey.of(input)
.keyBy(e -> e.getFirst().charAt(0) - '0')
.valueBy(Pair::getFirst)
.reduceBy(s -> s.collect(Collectors.toSet()))
.windowBy(Session.of(Duration.ofMillis(5)))
.output();
return FlatMap.of(reduced)
.using((UnaryFunctor<Pair<Integer, Set<String>>,
Triple<TimeInterval, Integer, Set<String>>>)
(elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(),
elem.getFirst(), elem.getSecond())))
.output();
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Triple<TimeInterval, Integer, String>> getOutput(Dataset<Pair<String, Integer>> input) {
input = AssignEventTime.of(input).using(e -> e.getSecond()).output();
Dataset<Pair<Integer, String>> reduced =
ReduceStateByKey.of(input)
.keyBy(e -> e.getFirst().charAt(0) - '0')
.valueBy(e -> e.getFirst().substring(2))
.stateFactory((StateFactory<String, String, AccState<String>>) AccState::new)
.mergeStatesBy(AccState::combine)
.windowBy(TimeSliding.of(Duration.ofMillis(10), Duration.ofMillis(5)))
.output();
return FlatMap.of(reduced)
.using((UnaryFunctor<Pair<Integer, String>, Triple<TimeInterval, Integer, String>>)
(elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(), elem.getFirst(), elem.getSecond())))
.output();
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Triple<TimeInterval, Integer, String>> getOutput(Dataset<Pair<String, Integer>> input) {
input = AssignEventTime.of(input).using(e -> e.getSecond()).output();
Dataset<Pair<Integer, String>> reduced =
ReduceStateByKey.of(input)
.keyBy(e -> e.getFirst().charAt(0) - '0')
.valueBy(e -> e.getFirst().substring(2))
.stateFactory((StateFactory<String, String, AccState<String>>) AccState::new)
.mergeStatesBy(AccState::combine)
.windowBy(TimeSliding.of(Duration.ofMillis(10), Duration.ofMillis(5)))
.output();
return FlatMap.of(reduced)
.using((UnaryFunctor<Pair<Integer, String>, Triple<TimeInterval, Integer, String>>)
(elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(), elem.getFirst(), elem.getSecond())))
.output();
}
代码示例来源:origin: seznam/euphoria
@Test
public void testBuild_ImplicitName() {
Flow flow = Flow.create("TEST");
Dataset<String> dataset = Util.createMockDataset(flow, 1);
Dataset<String> mapped = FlatMap.of(dataset)
.using((String s, Collector<String> c) -> c.collect(s))
.output();
FlatMap map = (FlatMap) flow.operators().iterator().next();
assertEquals("FlatMap", map.getName());
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Triple<TimeInterval, Integer, String>>
getOutput(Dataset<Pair<String, Integer>> input) {
input = AssignEventTime.of(input).using(e -> e.getSecond()).output();
Dataset<Pair<Integer, String>> reduced =
ReduceStateByKey.of(input)
.keyBy(e -> e.getFirst().charAt(0) - '0')
.valueBy(Pair::getFirst)
.stateFactory(AccState<String>::new)
.mergeStatesBy(AccState::combine)
.windowBy(Time.of(Duration.ofMillis(5)))
.output();
return FlatMap.of(reduced)
.using((UnaryFunctor<Pair<Integer, String>, Triple<TimeInterval, Integer, String>>)
(elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(), elem.getFirst(), elem.getSecond())))
.output();
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Triple<TimeInterval, Integer, String>>
getOutput(Dataset<Pair<String, Integer>> input) {
input = AssignEventTime.of(input).using(e -> e.getSecond()).output();
Dataset<Pair<Integer, String>> reduced =
ReduceStateByKey.of(input)
.keyBy(e -> e.getFirst().charAt(0) - '0')
.valueBy(Pair::getFirst)
.stateFactory((StateFactory<String, String, AccState<String>>) AccState::new)
.mergeStatesBy(AccState::combine)
.windowBy(Session.of(Duration.ofMillis(5)))
.output();
return FlatMap.of(reduced)
.using((UnaryFunctor<Pair<Integer, String>, Triple<TimeInterval, Integer, String>>)
(elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(), elem.getFirst(), elem.getSecond())))
.output();
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Triple<TimeInterval, Integer, String>>
getOutput(Dataset<Pair<String, Integer>> input) {
input = AssignEventTime.of(input).using(e -> e.getSecond()).output();
Dataset<Pair<Integer, String>> reduced =
ReduceStateByKey.of(input)
.keyBy(e -> e.getFirst().charAt(0) - '0')
.valueBy(Pair::getFirst)
.stateFactory((StateFactory<String, String, AccState<String>>) AccState::new)
.mergeStatesBy(AccState::combine)
.windowBy(Session.of(Duration.ofMillis(5)))
.output();
return FlatMap.of(reduced)
.using((UnaryFunctor<Pair<Integer, String>, Triple<TimeInterval, Integer, String>>)
(elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(), elem.getFirst(), elem.getSecond())))
.output();
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Triple<TimeInterval, Integer, String>>
getOutput(Dataset<Pair<String, Integer>> input) {
input = AssignEventTime.of(input).using(e -> e.getSecond()).output();
Dataset<Pair<Integer, String>> reduced =
ReduceStateByKey.of(input)
.keyBy(e -> e.getFirst().charAt(0) - '0')
.valueBy(Pair::getFirst)
.stateFactory(AccState<String>::new)
.mergeStatesBy(AccState::combine)
.windowBy(Time.of(Duration.ofMillis(5)))
.output();
return FlatMap.of(reduced)
.using((UnaryFunctor<Pair<Integer, String>, Triple<TimeInterval, Integer, String>>)
(elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(), elem.getFirst(), elem.getSecond())))
.output();
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Triple<Integer, Integer, Integer>> getOutput(Dataset<Integer> input) {
Dataset<Pair<Integer, Integer>> output = ReduceStateByKey.of(input)
.keyBy(e -> e % 3)
.valueBy(e -> e)
.stateFactory(SortState::new)
.mergeStatesBy(SortState::combine)
.windowBy(new ReduceByKeyTest.TestWindowing())
.output();
return FlatMap.of(output)
.using((UnaryFunctor<Pair<Integer, Integer>, Triple<Integer, Integer, Integer>>)
(elem, c) -> c.collect(Triple.of(((IntWindow) c.getWindow()).getValue(), elem.getFirst(), elem.getSecond())))
.output();
}
代码示例来源:origin: seznam/euphoria
@Override
protected Dataset<Triple<Integer, Integer, Integer>> getOutput(Dataset<Integer> input) {
Dataset<Pair<Integer, Integer>> output = ReduceStateByKey.of(input)
.keyBy(e -> e % 3)
.valueBy(e -> e)
.stateFactory(SortState::new)
.mergeStatesBy(SortState::combine)
.windowBy(new ReduceByKeyTest.TestWindowing())
.output();
return FlatMap.of(output)
.using((UnaryFunctor<Pair<Integer, Integer>, Triple<Integer, Integer, Integer>>)
(elem, c) -> c.collect(Triple.of(((IntWindow) c.getWindow()).getValue(), elem.getFirst(), elem.getSecond())))
.output();
}
代码示例来源:origin: seznam/euphoria
@Test
public void testDistinctOnBatchWithoutWindowingLabels() throws Exception {
Flow flow = Flow.create("Test");
Dataset<String> lines = flow.createInput(ListDataSource.bounded(
asList("one two three four", "one two three", "one two", "one")));
// expand it to words
Dataset<String> words = FlatMap.of(lines)
.using(toWords(w -> w))
.output();
Dataset<String> output = Distinct.of(words).output();
ListDataSink<String> out = ListDataSink.get();
output.persist(out);
executor.submit(flow).get();
DatasetAssert.unorderedEquals(
out.getOutputs(),
"four", "one", "three", "two");
}
内容来源于网络,如有侵权,请联系作者删除!