org.apache.beam.sdk.transforms.windowing.Window.withAllowedLateness()方法的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(14.0k)|赞(0)|评价(0)|浏览(91)

本文整理了Java中org.apache.beam.sdk.transforms.windowing.Window.withAllowedLateness()方法的一些代码示例,展示了Window.withAllowedLateness()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Window.withAllowedLateness()方法的具体详情如下:
包路径:org.apache.beam.sdk.transforms.windowing.Window
类名称:Window
方法名:withAllowedLateness

Window.withAllowedLateness介绍

[英]Override the amount of lateness allowed for data elements in the output PCollection and downstream PCollection until explicitly set again. Like the other properties on this Window operation, this will be applied at the next GroupByKey. Any elements that are later than this as decided by the system-maintained watermark will be dropped.

This value also determines how long state will be kept around for old windows. Once no elements will be added to a window (because this duration has passed) any state associated with the window will be cleaned up.

Depending on the trigger this may not produce a pane with PaneInfo#isLast. See ClosingBehavior#FIRE_IF_NON_EMPTY for more details.
[中]重写输出PCollection和下游PCollection中的数据元素允许的延迟量,直到再次显式设置。与此窗口操作上的其他属性一样,这将应用于下一个GroupByKey。根据系统维护水印的决定,任何晚于此时间的元素都将被删除。
该值还确定旧窗口的状态保持多长时间。一旦没有元素添加到窗口中(因为该持续时间已过),与该窗口关联的任何状态都将被清除。
根据触发器的不同,这可能不会生成包含PaneInfo#isLast的窗格。有关详细信息,请参阅ClosingBehavior#FIRE _IF_NON _EMPTY。

代码示例

代码示例来源:origin: org.apache.beam/beam-examples-java

@Override
 public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
  return input
    .apply(
      "LeaderboardUserGlobalWindow",
      Window.<GameActionInfo>into(new GlobalWindows())
        // Get periodic results every ten minutes.
        .triggering(
          Repeatedly.forever(
            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES)))
        .accumulatingFiredPanes()
        .withAllowedLateness(allowedLateness))
    // Extract and sum username/score pairs from the event data.
    .apply("ExtractUserScore", new ExtractAndSumScore("user"));
 }
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
   .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
      .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
         .withAllowedLateness(Duration.standardDays(1000))
         .accumulatingFiredPanes())
   .apply(Sum.integersPerKey())
   .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testDisplayDataExcludesDefaults() {
 Window<?> window =
   Window.into(new GlobalWindows())
     .triggering(DefaultTrigger.of())
     .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 DisplayData data = DisplayData.from(window);
 assertThat(data, not(hasDisplayItem("trigger")));
 assertThat(data, not(hasDisplayItem("allowedLateness")));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TWO_MINUTES)))
        .withAllowedLateness(Duration.standardDays(1000))
        .accumulatingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(Repeatedly.forever(AfterProcessingTime
                       .pastFirstElementInPane()
                       .alignedTo(TWO_MINUTES, Utils.parseTime("12:05:00"))))
        .withAllowedLateness(Duration.standardDays(1000))
        .accumulatingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
              .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(TWO_MINUTES)
        .accumulatingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
              .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.standardDays(1000))
        .discardingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(ONE_MINUTE))
        .triggering(AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
              .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.standardDays(1000))
        .accumulatingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
  public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
    return input
      .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
          .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
                .withLateFirings(AfterPane.elementCountAtLeast(1)))
          .withAllowedLateness(Duration.standardDays(1000))
          .accumulatingFiredPanes())
      .apply(Sum.integersPerKey())
      .apply(ParDo.of(new FormatAsStrings()));
  }
}

代码示例来源:origin: org.apache.beam/beam-examples-java

@Override
 public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
  return infos
    .apply(
      "LeaderboardTeamFixedWindows",
      Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
        // We will get early (speculative) results as well as cumulative
        // processing of late data.
        .triggering(
          AfterWatermark.pastEndOfWindow()
            .withEarlyFirings(
              AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(FIVE_MINUTES))
            .withLateFirings(
              AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TEN_MINUTES)))
        .withAllowedLateness(allowedLateness)
        .accumulatingFiredPanes())
    // Extract and sum teamname/score pairs from the event data.
    .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowIntoAccumulatingLatenessNoTrigger() {
 FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply(
       "Lateness",
       Window.<String>into(fixed)
         .withAllowedLateness(Duration.standardDays(1))
         .accumulatingFiredPanes())
     .getWindowingStrategy();
 assertThat(strategy.isTriggerSpecified(), is(false));
 assertThat(strategy.isModeSpecified(), is(true));
 assertThat(strategy.isAllowedLatenessSpecified(), is(true));
 assertThat(strategy.getMode(), equalTo(AccumulationMode.ACCUMULATING_FIRED_PANES));
 assertThat(strategy.getAllowedLateness(), equalTo(Duration.standardDays(1)));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * With {@link #testWindowIntoWindowFnAssign()}, demonstrates that the expansions of the {@link
 * Window} transform depends on if it actually assigns elements to windows.
 */
@Test
public void testWindowIntoNullWindowFnNoAssign() {
 pipeline
   .apply(Create.of(1, 2, 3))
   .apply(
     Window.<Integer>configure()
       .triggering(AfterWatermark.pastEndOfWindow())
       .withAllowedLateness(Duration.ZERO)
       .accumulatingFiredPanes());
 pipeline.traverseTopologically(
   new PipelineVisitor.Defaults() {
    @Override
    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
     assertThat(node.getTransform(), not(instanceOf(Window.Assign.class)));
    }
   });
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
 public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
  WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();

  return input
    .apply(Reify.windows())
    .apply(
      WithKeys.<Integer, ValueInSingleWindow<T>>of(0)
        .withKeyType(new TypeDescriptor<Integer>() {}))
    .apply(
      Window.into(
          new IdentityWindowFn<KV<Integer, ValueInSingleWindow<T>>>(
            originalWindowFn.windowCoder()))
        .triggering(Never.ever())
        .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
        .discardingFiredPanes())
    // all values have the same key so they all appear as a single output element
    .apply(GroupByKey.create())
    .apply(Values.create())
    .setWindowingStrategyInternal(input.getWindowingStrategy());
 }
}

代码示例来源:origin: gojektech/feast

public void logNRows(PFeatureRows pFeatureRows, String name, int limit) {
 PCollection<FeatureRowExtended> main = pFeatureRows.getMain();
 PCollection<FeatureRowExtended> errors = pFeatureRows.getErrors();
 if (main.isBounded().equals(IsBounded.UNBOUNDED)) {
  Window<FeatureRowExtended> minuteWindow =
    Window.<FeatureRowExtended>into(FixedWindows.of(Duration.standardMinutes(1L)))
      .triggering(AfterWatermark.pastEndOfWindow())
      .discardingFiredPanes()
      .withAllowedLateness(Duration.standardMinutes(1));
  main = main.apply(minuteWindow);
  errors = errors.apply(minuteWindow);
 }
 main.apply("Sample success", Sample.any(limit))
   .apply("Log success sample", ParDo.of(new LoggerDoFn(Level.INFO, name + " MAIN ")));
 errors
   .apply("Sample errors", Sample.any(limit))
   .apply("Log errors sample", ParDo.of(new LoggerDoFn(Level.ERROR, name + " ERRORS ")));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(ValidatesRunner.class)
public void testHotKeyCombiningWithAccumulationMode() {
 PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5));
 PCollection<Integer> output =
   input
     .apply(
       Window.<Integer>into(new GlobalWindows())
         .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
         .accumulatingFiredPanes()
         .withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS))
     .apply(Sum.integersGlobally().withoutDefaults().withFanout(2))
     .apply(ParDo.of(new GetLast()));
 PAssert.that(output)
   .satisfies(
     input1 -> {
      assertThat(input1, hasItem(15));
      return null;
     });
 pipeline.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testMissingModeViaLateness() {
 FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10));
 PCollection<String> input =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply("Window", Window.into(fixed));
 thrown.expect(IllegalArgumentException.class);
 thrown.expectMessage("allowed lateness");
 thrown.expectMessage("accumulation mode be specified");
 input.apply(
   "Lateness", Window.<String>configure().withAllowedLateness(Duration.standardDays(1)));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testMissingMode() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 PCollection<String> input =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply("Window", Window.into(fixed10));
 thrown.expect(IllegalArgumentException.class);
 thrown.expectMessage("requires that the accumulation mode");
 input.apply(
   "Triggering",
   Window.<String>configure()
     .withAllowedLateness(Duration.standardDays(1))
     .triggering(trigger));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowIntoTriggersAndAccumulating() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply(
       Window.<String>into(fixed10)
         .triggering(trigger)
         .accumulatingFiredPanes()
         .withAllowedLateness(Duration.ZERO))
     .getWindowingStrategy();
 assertEquals(fixed10, strategy.getWindowFn());
 assertEquals(trigger, strategy.getTrigger());
 assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowIntoPropagatesLateness() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply(
       "WindowInto10",
       Window.<String>into(fixed10)
         .withAllowedLateness(Duration.standardDays(1))
         .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
         .accumulatingFiredPanes())
     .apply("WindowInto25", Window.into(fixed25))
     .getWindowingStrategy();
 assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
 assertEquals(fixed25, strategy.getWindowFn());
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowPropagatesEachPart() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply("Mode", Window.<String>configure().accumulatingFiredPanes())
     .apply(
       "Lateness",
       Window.<String>configure().withAllowedLateness(Duration.standardDays(1)))
     .apply("Trigger", Window.<String>configure().triggering(trigger))
     .apply("Window", Window.into(fixed10))
     .getWindowingStrategy();
 assertEquals(fixed10, strategy.getWindowFn());
 assertEquals(trigger, strategy.getTrigger());
 assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
 assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
}

相关文章