本文整理了Java中org.apache.beam.sdk.transforms.windowing.Window.accumulatingFiredPanes()
方法的一些代码示例,展示了Window.accumulatingFiredPanes()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Window.accumulatingFiredPanes()
方法的具体详情如下:
包路径:org.apache.beam.sdk.transforms.windowing.Window
类名称:Window
方法名:accumulatingFiredPanes
[英]Returns a new Window PTransform that uses the registered WindowFn and Triggering behavior, and that accumulates elements in a pane after they are triggered.
Does not modify this transform. The resulting PTransform is sufficiently specified to be applied, but more properties can still be specified.
[中]返回一个新的窗口PTransform,该窗口PTransform使用已注册的WindowFn和触发行为,并在触发后在窗格中累积元素。
不会修改此转换。所产生的PTransform已被充分指定以应用,但仍可以指定更多属性。
代码示例来源:origin: org.apache.beam/beam-examples-java
@Override
public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
return input
.apply(
"LeaderboardUserGlobalWindow",
Window.<GameActionInfo>into(new GlobalWindows())
// Get periodic results every ten minutes.
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES)))
.accumulatingFiredPanes()
.withAllowedLateness(allowedLateness))
// Extract and sum username/score pairs from the event data.
.apply("ExtractUserScore", new ExtractAndSumScore("user"));
}
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardDays(1000))
.accumulatingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TWO_MINUTES)))
.withAllowedLateness(Duration.standardDays(1000))
.accumulatingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
.triggering(Repeatedly.forever(AfterProcessingTime
.pastFirstElementInPane()
.alignedTo(TWO_MINUTES, Utils.parseTime("12:05:00"))))
.withAllowedLateness(Duration.standardDays(1000))
.accumulatingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(TWO_MINUTES)
.accumulatingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(ONE_MINUTE))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardDays(1000))
.accumulatingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardDays(1000))
.accumulatingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
}
代码示例来源:origin: org.apache.beam/beam-examples-java
@Override
public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
return infos
.apply(
"LeaderboardTeamFixedWindows",
Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
// We will get early (speculative) results as well as cumulative
// processing of late data.
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES))
.withLateFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_MINUTES)))
.withAllowedLateness(allowedLateness)
.accumulatingFiredPanes())
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndSumScore("team"));
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testWindowIntoAccumulatingLatenessNoTrigger() {
FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10));
WindowingStrategy<?, ?> strategy =
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply(
"Lateness",
Window.<String>into(fixed)
.withAllowedLateness(Duration.standardDays(1))
.accumulatingFiredPanes())
.getWindowingStrategy();
assertThat(strategy.isTriggerSpecified(), is(false));
assertThat(strategy.isModeSpecified(), is(true));
assertThat(strategy.isAllowedLatenessSpecified(), is(true));
assertThat(strategy.getMode(), equalTo(AccumulationMode.ACCUMULATING_FIRED_PANES));
assertThat(strategy.getAllowedLateness(), equalTo(Duration.standardDays(1)));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* With {@link #testWindowIntoWindowFnAssign()}, demonstrates that the expansions of the {@link
* Window} transform depends on if it actually assigns elements to windows.
*/
@Test
public void testWindowIntoNullWindowFnNoAssign() {
pipeline
.apply(Create.of(1, 2, 3))
.apply(
Window.<Integer>configure()
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
pipeline.traverseTopologically(
new PipelineVisitor.Defaults() {
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
assertThat(node.getTransform(), not(instanceOf(Window.Assign.class)));
}
});
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql
@Test
public void testRejectsGlobalWindowsWithEndOfWindowTrigger() throws Exception {
String sql =
"SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id "
+ "FROM ORDER_DETAILS1 o1"
+ " JOIN ORDER_DETAILS2 o2"
+ " on "
+ " o1.order_id=o2.site_id AND o2.price=o1.site_id";
PCollection<Row> orders =
ordersUnbounded()
.apply(
"window",
Window.<Row>into(new GlobalWindows())
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage(
stringContainsInOrder(Arrays.asList("once per window", "default trigger")));
inputs.apply("sql", SqlTransform.query(sql));
pipeline.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(ValidatesRunner.class)
public void testGlobalCombineWithDefaultsAndTriggers() {
PCollection<Integer> input = pipeline.apply(Create.of(1, 1));
PCollection<String> output =
input
.apply(
Window.<Integer>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS))
.apply(Sum.integersGlobally())
.apply(ParDo.of(new FormatPaneInfo()));
// The actual elements produced are nondeterministic. Could be one, could be two.
// But it should certainly have a final element with the correct final sum.
PAssert.that(output)
.satisfies(
input1 -> {
assertThat(input1, hasItem("2: true"));
return null;
});
pipeline.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql
@Test
public void testRejectsUnboundedWithinWindowsWithEndOfWindowTrigger() throws Exception {
String sql =
"SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id "
+ "FROM ORDER_DETAILS1 o1"
+ " JOIN ORDER_DETAILS2 o2"
+ " on "
+ " o1.order_id=o2.site_id AND o2.price=o1.site_id";
PCollection<Row> orders =
ordersUnbounded()
.apply(
"window",
Window.<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage(
stringContainsInOrder(Arrays.asList("once per window", "default trigger")));
inputs.apply("sql", SqlTransform.query(sql));
pipeline.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(ValidatesRunner.class)
public void testHotKeyCombiningWithAccumulationMode() {
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5));
PCollection<Integer> output =
input
.apply(
Window.<Integer>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS))
.apply(Sum.integersGlobally().withoutDefaults().withFanout(2))
.apply(ParDo.of(new GetLast()));
PAssert.that(output)
.satisfies(
input1 -> {
assertThat(input1, hasItem(15));
return null;
});
pipeline.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql
@Test
public void testRejectsNonGlobalWindowsWithRepeatingTrigger() throws Exception {
String sql =
"SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id "
+ "FROM ORDER_DETAILS1 o1"
+ " JOIN ORDER_DETAILS2 o2"
+ " on "
+ " o1.order_id=o2.site_id AND o2.price=o1.site_id";
PCollection<Row> orders =
ordersUnbounded()
.apply(
"window",
Window.<Row>into(FixedWindows.of(Duration.standardSeconds(203)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.standardMinutes(2))
.accumulatingFiredPanes());
PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage(
stringContainsInOrder(Arrays.asList("once per window", "default trigger")));
inputs.apply("sql", SqlTransform.query(sql));
pipeline.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testMissingLateness() {
FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("requires that the allowed lateness");
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply("Mode", Window.<String>configure().accumulatingFiredPanes())
.apply("Window", Window.into(fixed10))
.apply("Trigger", Window.<String>configure().triggering(trigger));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testWindowIntoTriggersAndAccumulating() {
FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
WindowingStrategy<?, ?> strategy =
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply(
Window.<String>into(fixed10)
.triggering(trigger)
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.getWindowingStrategy();
assertEquals(fixed10, strategy.getWindowFn());
assertEquals(trigger, strategy.getTrigger());
assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testDisplayData() {
FixedWindows windowFn = FixedWindows.of(Duration.standardHours(5));
AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow();
Duration allowedLateness = Duration.standardMinutes(10);
Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW;
Window<?> window =
Window.into(windowFn)
.triggering(triggerBuilder)
.accumulatingFiredPanes()
.withAllowedLateness(allowedLateness, closingBehavior)
.withTimestampCombiner(timestampCombiner);
DisplayData displayData = DisplayData.from(window);
assertThat(displayData, hasDisplayItem("windowFn", windowFn.getClass()));
assertThat(displayData, includesDisplayDataFor("windowFn", windowFn));
assertThat(displayData, hasDisplayItem("trigger", triggerBuilder.toString()));
assertThat(
displayData,
hasDisplayItem("accumulationMode", AccumulationMode.ACCUMULATING_FIRED_PANES.toString()));
assertThat(displayData, hasDisplayItem("allowedLateness", allowedLateness));
assertThat(displayData, hasDisplayItem("closingBehavior", closingBehavior.toString()));
assertThat(displayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString()));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testWindowIntoPropagatesLateness() {
FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25));
WindowingStrategy<?, ?> strategy =
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply(
"WindowInto10",
Window.<String>into(fixed10)
.withAllowedLateness(Duration.standardDays(1))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
.accumulatingFiredPanes())
.apply("WindowInto25", Window.into(fixed25))
.getWindowingStrategy();
assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
assertEquals(fixed25, strategy.getWindowFn());
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testWindowPropagatesEachPart() {
FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
WindowingStrategy<?, ?> strategy =
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
.apply("Mode", Window.<String>configure().accumulatingFiredPanes())
.apply(
"Lateness",
Window.<String>configure().withAllowedLateness(Duration.standardDays(1)))
.apply("Trigger", Window.<String>configure().triggering(trigger))
.apply("Window", Window.into(fixed10))
.getWindowingStrategy();
assertEquals(fixed10, strategy.getWindowFn());
assertEquals(trigger, strategy.getTrigger());
assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
}
内容来源于网络,如有侵权,请联系作者删除!