本文整理了Java中org.apache.beam.sdk.transforms.windowing.Window.triggering()
方法的一些代码示例,展示了Window.triggering()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Window.triggering()
方法的具体详情如下:
包路径:org.apache.beam.sdk.transforms.windowing.Window
类名称:Window
方法名:triggering
[英]Sets a non-default trigger for this Window PTransform. Elements that are assigned to a specific window will be output when the trigger fires.
org.apache.beam.sdk.transforms.windowing.Trigger has more details on the available triggers.
Must also specify allowed lateness using #withAllowedLateness and accumulation mode using either #discardingFiredPanes() or #accumulatingFiredPanes().
[中]为此窗口PTransform设置非默认触发器。触发器触发时,将输出指定给特定窗口的元素。
组织。阿帕奇。梁sdk。转变。开窗。Trigger提供了有关可用触发器的更多详细信息。
还必须使用#withAllowedLateness指定允许的延迟,并使用#discardingFiredPanes()或#AccumatingFiredPanes()指定累积模式。
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
private <SignalT> PCollectionView<?> expandTyped(PCollection<SignalT> input) {
return input
.apply(Window.<SignalT>configure().triggering(Never.ever()).discardingFiredPanes())
// Perform a per-window pre-combine so that our performance does not critically depend
// on combiner lifting.
.apply(ParDo.of(new CollectWindowsFn<>()))
.apply(Sample.any(1))
.apply(View.asList());
}
}
代码示例来源:origin: org.apache.beam/beam-examples-java
@Override
public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
return input
.apply(
"LeaderboardUserGlobalWindow",
Window.<GameActionInfo>into(new GlobalWindows())
// Get periodic results every ten minutes.
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES)))
.accumulatingFiredPanes()
.withAllowedLateness(allowedLateness))
// Extract and sum username/score pairs from the event data.
.apply("ExtractUserScore", new ExtractAndSumScore("user"));
}
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardDays(1000))
.accumulatingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testDisplayDataExcludesDefaults() {
Window<?> window =
Window.into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
DisplayData data = DisplayData.from(window);
assertThat(data, not(hasDisplayItem("trigger")));
assertThat(data, not(hasDisplayItem("allowedLateness")));
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TWO_MINUTES)))
.withAllowedLateness(Duration.standardDays(1000))
.accumulatingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
.triggering(Repeatedly.forever(AfterProcessingTime
.pastFirstElementInPane()
.alignedTo(TWO_MINUTES, Utils.parseTime("12:05:00"))))
.withAllowedLateness(Duration.standardDays(1000))
.accumulatingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(TWO_MINUTES)
.accumulatingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardDays(1000))
.discardingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(ONE_MINUTE))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardDays(1000))
.accumulatingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardDays(1000))
.accumulatingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql
@Test
public void testUnsupportedGlobalWindowWithDefaultTrigger() {
exceptions.expect(UnsupportedOperationException.class);
pipeline.enableAbandonedNodeEnforcement(false);
PCollection<Row> input =
unboundedInput1.apply(
"unboundedInput1.globalWindow",
Window.<Row>into(new GlobalWindows()).triggering(DefaultTrigger.of()));
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
input.apply("testUnsupportedGlobalWindows", SqlTransform.query(sql));
}
代码示例来源:origin: gojektech/feast
public void logNRows(PFeatureRows pFeatureRows, String name, int limit) {
PCollection<FeatureRowExtended> main = pFeatureRows.getMain();
PCollection<FeatureRowExtended> errors = pFeatureRows.getErrors();
if (main.isBounded().equals(IsBounded.UNBOUNDED)) {
Window<FeatureRowExtended> minuteWindow =
Window.<FeatureRowExtended>into(FixedWindows.of(Duration.standardMinutes(1L)))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(1));
main = main.apply(minuteWindow);
errors = errors.apply(minuteWindow);
}
main.apply("Sample success", Sample.any(limit))
.apply("Log success sample", ParDo.of(new LoggerDoFn(Level.INFO, name + " MAIN ")));
errors
.apply("Sample errors", Sample.any(limit))
.apply("Log errors sample", ParDo.of(new LoggerDoFn(Level.ERROR, name + " ERRORS ")));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Override
public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
return input
.apply(Reify.windows())
.apply(
WithKeys.<Integer, ValueInSingleWindow<T>>of(0)
.withKeyType(new TypeDescriptor<Integer>() {}))
.apply(
Window.into(
new IdentityWindowFn<KV<Integer, ValueInSingleWindow<T>>>(
originalWindowFn.windowCoder()))
.triggering(Never.ever())
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
.discardingFiredPanes())
// all values have the same key so they all appear as a single output element
.apply(GroupByKey.create())
.apply(Values.create())
.setWindowingStrategyInternal(input.getWindowingStrategy());
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
private void testOutputAfterCheckpoint(IsBounded bounded) {
PCollection<Integer> outputs =
p.apply(Create.of("foo"))
.apply(ParDo.of(sdfWithMultipleOutputsPerBlock(bounded, 3)))
.apply(Window.<Integer>configure().triggering(Never.ever()).discardingFiredPanes());
PAssert.thatSingleton(outputs.apply(Count.globally()))
.isEqualTo((long) SDFWithMultipleOutputsPerBlockBase.MAX_INDEX);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(ValidatesRunner.class)
public void testHotKeyCombiningWithAccumulationMode() {
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5));
PCollection<Integer> output =
input
.apply(
Window.<Integer>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS))
.apply(Sum.integersGlobally().withoutDefaults().withFanout(2))
.apply(ParDo.of(new GetLast()));
PAssert.that(output)
.satisfies(
input1 -> {
assertThat(input1, hasItem(15));
return null;
});
pipeline.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testMissingLateness() {
FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("requires that the allowed lateness");
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply("Mode", Window.<String>configure().accumulatingFiredPanes())
.apply("Window", Window.into(fixed10))
.apply("Trigger", Window.<String>configure().triggering(trigger));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testMissingMode() {
FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
PCollection<String> input =
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply("Window", Window.into(fixed10));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("requires that the accumulation mode");
input.apply(
"Triggering",
Window.<String>configure()
.withAllowedLateness(Duration.standardDays(1))
.triggering(trigger));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testWindowIntoTriggersAndAccumulating() {
FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
WindowingStrategy<?, ?> strategy =
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply(
Window.<String>into(fixed10)
.triggering(trigger)
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.getWindowingStrategy();
assertEquals(fixed10, strategy.getWindowFn());
assertEquals(trigger, strategy.getTrigger());
assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testWindowIntoPropagatesLateness() {
FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25));
WindowingStrategy<?, ?> strategy =
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply(
"WindowInto10",
Window.<String>into(fixed10)
.withAllowedLateness(Duration.standardDays(1))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
.accumulatingFiredPanes())
.apply("WindowInto25", Window.into(fixed25))
.getWindowingStrategy();
assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
assertEquals(fixed25, strategy.getWindowFn());
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testWindowPropagatesEachPart() {
FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
WindowingStrategy<?, ?> strategy =
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply("Mode", Window.<String>configure().accumulatingFiredPanes())
.apply(
"Lateness",
Window.<String>configure().withAllowedLateness(Duration.standardDays(1)))
.apply("Trigger", Window.<String>configure().triggering(trigger))
.apply("Window", Window.into(fixed10))
.getWindowingStrategy();
assertEquals(fixed10, strategy.getWindowFn());
assertEquals(trigger, strategy.getTrigger());
assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
}
内容来源于网络,如有侵权,请联系作者删除!