本文整理了Java中org.apache.beam.sdk.transforms.windowing.Window.configure()
方法的一些代码示例,展示了Window.configure()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Window.configure()
方法的具体详情如下:
包路径:org.apache.beam.sdk.transforms.windowing.Window
类名称:Window
方法名:configure
[英]Returns a new builder for a Window transform for setting windowing parameters other than the windowing function.
[中]返回窗口转换的新生成器,用于设置除窗口函数以外的窗口参数。
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* Creates a {@code Window} {@code PTransform} that uses the given {@link WindowFn} to window the
* data.
*
* <p>The resulting {@code PTransform}'s types have been bound, with both the input and output
* being a {@code PCollection<T>}, inferred from the types of the argument {@code WindowFn}. It is
* ready to be applied, or further properties can be set on it first.
*/
public static <T> Window<T> into(WindowFn<? super T, ?> fn) {
try {
fn.windowCoder().verifyDeterministic();
} catch (NonDeterministicException e) {
throw new IllegalArgumentException("Window coders must be deterministic.", e);
}
return Window.<T>configure().withWindowFn(fn);
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
private <SignalT> PCollectionView<?> expandTyped(PCollection<SignalT> input) {
return input
.apply(Window.<SignalT>configure().triggering(Never.ever()).discardingFiredPanes())
// Perform a per-window pre-combine so that our performance does not critically depend
// on combiner lifting.
.apply(ParDo.of(new CollectWindowsFn<>()))
.apply(Sample.any(1))
.apply(View.asList());
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testMissingLateness() {
FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("requires that the allowed lateness");
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply("Mode", Window.<String>configure().accumulatingFiredPanes())
.apply("Window", Window.into(fixed10))
.apply("Trigger", Window.<String>configure().triggering(trigger));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testDisplayDataExcludesUnspecifiedProperties() {
Window<?> onlyHasAccumulationMode = Window.configure().discardingFiredPanes();
assertThat(
DisplayData.from(onlyHasAccumulationMode),
not(
hasDisplayItem(
hasKey(
isOneOf(
"windowFn",
"trigger",
"timestampCombiner",
"allowedLateness",
"closingBehavior")))));
Window<?> noAccumulationMode = Window.into(new GlobalWindows());
assertThat(
DisplayData.from(noAccumulationMode), not(hasDisplayItem(hasKey("accumulationMode"))));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testWindowPropagatesEachPart() {
FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
WindowingStrategy<?, ?> strategy =
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply("Mode", Window.<String>configure().accumulatingFiredPanes())
.apply(
"Lateness",
Window.<String>configure().withAllowedLateness(Duration.standardDays(1)))
.apply("Trigger", Window.<String>configure().triggering(trigger))
.apply("Window", Window.into(fixed10))
.getWindowingStrategy();
assertEquals(fixed10, strategy.getWindowFn());
assertEquals(trigger, strategy.getTrigger());
assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* With {@link #testWindowIntoWindowFnAssign()}, demonstrates that the expansions of the {@link
* Window} transform depends on if it actually assigns elements to windows.
*/
@Test
public void testWindowIntoNullWindowFnNoAssign() {
pipeline
.apply(Create.of(1, 2, 3))
.apply(
Window.<Integer>configure()
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
pipeline.traverseTopologically(
new PipelineVisitor.Defaults() {
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
assertThat(node.getTransform(), not(instanceOf(Window.Assign.class)));
}
});
}
代码示例来源:origin: org.apache.beam/beam-runners-direct-java
Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure()
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes()
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
private void testOutputAfterCheckpoint(IsBounded bounded) {
PCollection<Integer> outputs =
p.apply(Create.of("foo"))
.apply(ParDo.of(sdfWithMultipleOutputsPerBlock(bounded, 3)))
.apply(Window.<Integer>configure().triggering(Never.ever()).discardingFiredPanes());
PAssert.thatSingleton(outputs.apply(Count.globally()))
.isEqualTo((long) SDFWithMultipleOutputsPerBlockBase.MAX_INDEX);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>>
removeTriggering =
Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>configure()
.triggering(Never.ever())
.discardingFiredPanes()
.apply(
"NeverTrigger",
Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>configure()
.triggering(Never.ever())
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testMissingModeViaLateness() {
FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10));
PCollection<String> input =
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply("Window", Window.into(fixed));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("allowed lateness");
thrown.expectMessage("accumulation mode be specified");
input.apply(
"Lateness", Window.<String>configure().withAllowedLateness(Duration.standardDays(1)));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testMissingMode() {
FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
PCollection<String> input =
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply("Window", Window.into(fixed10));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("requires that the accumulation mode");
input.apply(
"Triggering",
Window.<String>configure()
.withAllowedLateness(Duration.standardDays(1))
.triggering(trigger));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testMultipleStreams() {
TestStream<String> stream =
TestStream.create(StringUtf8Coder.of())
.addElements("foo", "bar")
.advanceWatermarkToInfinity();
TestStream<Integer> other =
TestStream.create(VarIntCoder.of()).addElements(1, 2, 3, 4).advanceWatermarkToInfinity();
PCollection<String> createStrings =
p.apply("CreateStrings", stream)
.apply(
"WindowStrings",
Window.<String>configure()
.triggering(AfterPane.elementCountAtLeast(2))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
PCollection<Integer> createInts =
p.apply("CreateInts", other)
.apply(
"WindowInts",
Window.<Integer>configure()
.triggering(AfterPane.elementCountAtLeast(4))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testProcessingTimeTrigger() {
TestStream<Long> source =
TestStream.create(VarLongCoder.of())
.addElements(
TimestampedValue.of(1L, new Instant(1000L)),
TimestampedValue.of(2L, new Instant(2000L)))
.advanceProcessingTime(Duration.standardMinutes(12))
.addElements(TimestampedValue.of(3L, new Instant(3000L)))
.advanceProcessingTime(Duration.standardMinutes(6))
.advanceWatermarkToInfinity();
PCollection<Long> sum =
p.apply(source)
.apply(
Window.<Long>configure()
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(5))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply(Sum.longsGlobally());
PAssert.that(sum).inEarlyGlobalWindowPanes().containsInAnyOrder(3L, 6L);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
Window.<Boolean>configure()
.triggering(Never.ever())
.withAllowedLateness(Duration.ZERO)
代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql
.apply(
"Triggering",
Window.<Row>configure()
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.ZERO)
内容来源于网络,如有侵权,请联系作者删除!