org.apache.beam.sdk.transforms.windowing.Window.accumulatingFiredPanes()方法的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(15.2k)|赞(0)|评价(0)|浏览(81)

本文整理了Java中org.apache.beam.sdk.transforms.windowing.Window.accumulatingFiredPanes()方法的一些代码示例,展示了Window.accumulatingFiredPanes()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Window.accumulatingFiredPanes()方法的具体详情如下:
包路径:org.apache.beam.sdk.transforms.windowing.Window
类名称:Window
方法名:accumulatingFiredPanes

Window.accumulatingFiredPanes介绍

[英]Returns a new Window PTransform that uses the registered WindowFn and Triggering behavior, and that accumulates elements in a pane after they are triggered.

Does not modify this transform. The resulting PTransform is sufficiently specified to be applied, but more properties can still be specified.
[中]返回一个新的窗口PTransform,该窗口PTransform使用已注册的WindowFn和触发行为,并在触发后在窗格中累积元素。
不会修改此转换。所产生的PTransform已被充分指定以应用,但仍可以指定更多属性。

代码示例

代码示例来源:origin: org.apache.beam/beam-examples-java

@Override
 public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
  return input
    .apply(
      "LeaderboardUserGlobalWindow",
      Window.<GameActionInfo>into(new GlobalWindows())
        // Get periodic results every ten minutes.
        .triggering(
          Repeatedly.forever(
            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES)))
        .accumulatingFiredPanes()
        .withAllowedLateness(allowedLateness))
    // Extract and sum username/score pairs from the event data.
    .apply("ExtractUserScore", new ExtractAndSumScore("user"));
 }
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
   .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
      .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
         .withAllowedLateness(Duration.standardDays(1000))
         .accumulatingFiredPanes())
   .apply(Sum.integersPerKey())
   .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TWO_MINUTES)))
        .withAllowedLateness(Duration.standardDays(1000))
        .accumulatingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(Repeatedly.forever(AfterProcessingTime
                       .pastFirstElementInPane()
                       .alignedTo(TWO_MINUTES, Utils.parseTime("12:05:00"))))
        .withAllowedLateness(Duration.standardDays(1000))
        .accumulatingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
              .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(TWO_MINUTES)
        .accumulatingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(ONE_MINUTE))
        .triggering(AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
              .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.standardDays(1000))
        .accumulatingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
  public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
    return input
      .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
          .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
                .withLateFirings(AfterPane.elementCountAtLeast(1)))
          .withAllowedLateness(Duration.standardDays(1000))
          .accumulatingFiredPanes())
      .apply(Sum.integersPerKey())
      .apply(ParDo.of(new FormatAsStrings()));
  }
}

代码示例来源:origin: org.apache.beam/beam-examples-java

@Override
 public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
  return infos
    .apply(
      "LeaderboardTeamFixedWindows",
      Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
        // We will get early (speculative) results as well as cumulative
        // processing of late data.
        .triggering(
          AfterWatermark.pastEndOfWindow()
            .withEarlyFirings(
              AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(FIVE_MINUTES))
            .withLateFirings(
              AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TEN_MINUTES)))
        .withAllowedLateness(allowedLateness)
        .accumulatingFiredPanes())
    // Extract and sum teamname/score pairs from the event data.
    .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowIntoAccumulatingLatenessNoTrigger() {
 FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply(
       "Lateness",
       Window.<String>into(fixed)
         .withAllowedLateness(Duration.standardDays(1))
         .accumulatingFiredPanes())
     .getWindowingStrategy();
 assertThat(strategy.isTriggerSpecified(), is(false));
 assertThat(strategy.isModeSpecified(), is(true));
 assertThat(strategy.isAllowedLatenessSpecified(), is(true));
 assertThat(strategy.getMode(), equalTo(AccumulationMode.ACCUMULATING_FIRED_PANES));
 assertThat(strategy.getAllowedLateness(), equalTo(Duration.standardDays(1)));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * With {@link #testWindowIntoWindowFnAssign()}, demonstrates that the expansions of the {@link
 * Window} transform depends on if it actually assigns elements to windows.
 */
@Test
public void testWindowIntoNullWindowFnNoAssign() {
 pipeline
   .apply(Create.of(1, 2, 3))
   .apply(
     Window.<Integer>configure()
       .triggering(AfterWatermark.pastEndOfWindow())
       .withAllowedLateness(Duration.ZERO)
       .accumulatingFiredPanes());
 pipeline.traverseTopologically(
   new PipelineVisitor.Defaults() {
    @Override
    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
     assertThat(node.getTransform(), not(instanceOf(Window.Assign.class)));
    }
   });
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql

@Test
public void testRejectsGlobalWindowsWithEndOfWindowTrigger() throws Exception {
 String sql =
   "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id  "
     + "FROM ORDER_DETAILS1 o1"
     + " JOIN ORDER_DETAILS2 o2"
     + " on "
     + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
 PCollection<Row> orders =
   ordersUnbounded()
     .apply(
       "window",
       Window.<Row>into(new GlobalWindows())
         .triggering(AfterWatermark.pastEndOfWindow())
         .withAllowedLateness(Duration.ZERO)
         .accumulatingFiredPanes());
 PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);
 thrown.expect(UnsupportedOperationException.class);
 thrown.expectMessage(
   stringContainsInOrder(Arrays.asList("once per window", "default trigger")));
 inputs.apply("sql", SqlTransform.query(sql));
 pipeline.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(ValidatesRunner.class)
public void testGlobalCombineWithDefaultsAndTriggers() {
 PCollection<Integer> input = pipeline.apply(Create.of(1, 1));
 PCollection<String> output =
   input
     .apply(
       Window.<Integer>into(new GlobalWindows())
         .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
         .accumulatingFiredPanes()
         .withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS))
     .apply(Sum.integersGlobally())
     .apply(ParDo.of(new FormatPaneInfo()));
 // The actual elements produced are nondeterministic. Could be one, could be two.
 // But it should certainly have a final element with the correct final sum.
 PAssert.that(output)
   .satisfies(
     input1 -> {
      assertThat(input1, hasItem("2: true"));
      return null;
     });
 pipeline.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql

@Test
public void testRejectsUnboundedWithinWindowsWithEndOfWindowTrigger() throws Exception {
 String sql =
   "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id  "
     + "FROM ORDER_DETAILS1 o1"
     + " JOIN ORDER_DETAILS2 o2"
     + " on "
     + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
 PCollection<Row> orders =
   ordersUnbounded()
     .apply(
       "window",
       Window.<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
         .triggering(AfterWatermark.pastEndOfWindow())
         .withAllowedLateness(Duration.ZERO)
         .accumulatingFiredPanes());
 PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);
 thrown.expect(UnsupportedOperationException.class);
 thrown.expectMessage(
   stringContainsInOrder(Arrays.asList("once per window", "default trigger")));
 inputs.apply("sql", SqlTransform.query(sql));
 pipeline.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(ValidatesRunner.class)
public void testHotKeyCombiningWithAccumulationMode() {
 PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5));
 PCollection<Integer> output =
   input
     .apply(
       Window.<Integer>into(new GlobalWindows())
         .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
         .accumulatingFiredPanes()
         .withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS))
     .apply(Sum.integersGlobally().withoutDefaults().withFanout(2))
     .apply(ParDo.of(new GetLast()));
 PAssert.that(output)
   .satisfies(
     input1 -> {
      assertThat(input1, hasItem(15));
      return null;
     });
 pipeline.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql

@Test
public void testRejectsNonGlobalWindowsWithRepeatingTrigger() throws Exception {
 String sql =
   "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id  "
     + "FROM ORDER_DETAILS1 o1"
     + " JOIN ORDER_DETAILS2 o2"
     + " on "
     + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
 PCollection<Row> orders =
   ordersUnbounded()
     .apply(
       "window",
       Window.<Row>into(FixedWindows.of(Duration.standardSeconds(203)))
         .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
         .withAllowedLateness(Duration.standardMinutes(2))
         .accumulatingFiredPanes());
 PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);
 thrown.expect(UnsupportedOperationException.class);
 thrown.expectMessage(
   stringContainsInOrder(Arrays.asList("once per window", "default trigger")));
 inputs.apply("sql", SqlTransform.query(sql));
 pipeline.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testMissingLateness() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 thrown.expect(IllegalArgumentException.class);
 thrown.expectMessage("requires that the allowed lateness");
 pipeline
   .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
   .apply("Mode", Window.<String>configure().accumulatingFiredPanes())
   .apply("Window", Window.into(fixed10))
   .apply("Trigger", Window.<String>configure().triggering(trigger));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowIntoTriggersAndAccumulating() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply(
       Window.<String>into(fixed10)
         .triggering(trigger)
         .accumulatingFiredPanes()
         .withAllowedLateness(Duration.ZERO))
     .getWindowingStrategy();
 assertEquals(fixed10, strategy.getWindowFn());
 assertEquals(trigger, strategy.getTrigger());
 assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testDisplayData() {
 FixedWindows windowFn = FixedWindows.of(Duration.standardHours(5));
 AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow();
 Duration allowedLateness = Duration.standardMinutes(10);
 Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
 TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW;
 Window<?> window =
   Window.into(windowFn)
     .triggering(triggerBuilder)
     .accumulatingFiredPanes()
     .withAllowedLateness(allowedLateness, closingBehavior)
     .withTimestampCombiner(timestampCombiner);
 DisplayData displayData = DisplayData.from(window);
 assertThat(displayData, hasDisplayItem("windowFn", windowFn.getClass()));
 assertThat(displayData, includesDisplayDataFor("windowFn", windowFn));
 assertThat(displayData, hasDisplayItem("trigger", triggerBuilder.toString()));
 assertThat(
   displayData,
   hasDisplayItem("accumulationMode", AccumulationMode.ACCUMULATING_FIRED_PANES.toString()));
 assertThat(displayData, hasDisplayItem("allowedLateness", allowedLateness));
 assertThat(displayData, hasDisplayItem("closingBehavior", closingBehavior.toString()));
 assertThat(displayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString()));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowIntoPropagatesLateness() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply(
       "WindowInto10",
       Window.<String>into(fixed10)
         .withAllowedLateness(Duration.standardDays(1))
         .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
         .accumulatingFiredPanes())
     .apply("WindowInto25", Window.into(fixed25))
     .getWindowingStrategy();
 assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
 assertEquals(fixed25, strategy.getWindowFn());
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowPropagatesEachPart() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply("Mode", Window.<String>configure().accumulatingFiredPanes())
     .apply(
       "Lateness",
       Window.<String>configure().withAllowedLateness(Duration.standardDays(1)))
     .apply("Trigger", Window.<String>configure().triggering(trigger))
     .apply("Window", Window.into(fixed10))
     .getWindowingStrategy();
 assertEquals(fixed10, strategy.getWindowFn());
 assertEquals(trigger, strategy.getTrigger());
 assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
 assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
}

相关文章