org.apache.beam.sdk.transforms.windowing.Window.configure()方法的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(103)

本文整理了Java中org.apache.beam.sdk.transforms.windowing.Window.configure()方法的一些代码示例,展示了Window.configure()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Window.configure()方法的具体详情如下:
包路径:org.apache.beam.sdk.transforms.windowing.Window
类名称:Window
方法名:configure

Window.configure介绍

[英]Returns a new builder for a Window transform for setting windowing parameters other than the windowing function.
[中]返回窗口转换的新生成器,用于设置除窗口函数以外的窗口参数。

代码示例

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * Creates a {@code Window} {@code PTransform} that uses the given {@link WindowFn} to window the
 * data.
 *
 * <p>The resulting {@code PTransform}'s types have been bound, with both the input and output
 * being a {@code PCollection<T>}, inferred from the types of the argument {@code WindowFn}. It is
 * ready to be applied, or further properties can be set on it first.
 */
public static <T> Window<T> into(WindowFn<? super T, ?> fn) {
 try {
  fn.windowCoder().verifyDeterministic();
 } catch (NonDeterministicException e) {
  throw new IllegalArgumentException("Window coders must be deterministic.", e);
 }
 return Window.<T>configure().withWindowFn(fn);
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

private <SignalT> PCollectionView<?> expandTyped(PCollection<SignalT> input) {
  return input
    .apply(Window.<SignalT>configure().triggering(Never.ever()).discardingFiredPanes())
    // Perform a per-window pre-combine so that our performance does not critically depend
    // on combiner lifting.
    .apply(ParDo.of(new CollectWindowsFn<>()))
    .apply(Sample.any(1))
    .apply(View.asList());
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testMissingLateness() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 thrown.expect(IllegalArgumentException.class);
 thrown.expectMessage("requires that the allowed lateness");
 pipeline
   .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
   .apply("Mode", Window.<String>configure().accumulatingFiredPanes())
   .apply("Window", Window.into(fixed10))
   .apply("Trigger", Window.<String>configure().triggering(trigger));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testDisplayDataExcludesUnspecifiedProperties() {
 Window<?> onlyHasAccumulationMode = Window.configure().discardingFiredPanes();
 assertThat(
   DisplayData.from(onlyHasAccumulationMode),
   not(
     hasDisplayItem(
       hasKey(
         isOneOf(
           "windowFn",
           "trigger",
           "timestampCombiner",
           "allowedLateness",
           "closingBehavior")))));
 Window<?> noAccumulationMode = Window.into(new GlobalWindows());
 assertThat(
   DisplayData.from(noAccumulationMode), not(hasDisplayItem(hasKey("accumulationMode"))));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowPropagatesEachPart() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply("Mode", Window.<String>configure().accumulatingFiredPanes())
     .apply(
       "Lateness",
       Window.<String>configure().withAllowedLateness(Duration.standardDays(1)))
     .apply("Trigger", Window.<String>configure().triggering(trigger))
     .apply("Window", Window.into(fixed10))
     .getWindowingStrategy();
 assertEquals(fixed10, strategy.getWindowFn());
 assertEquals(trigger, strategy.getTrigger());
 assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
 assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * With {@link #testWindowIntoWindowFnAssign()}, demonstrates that the expansions of the {@link
 * Window} transform depends on if it actually assigns elements to windows.
 */
@Test
public void testWindowIntoNullWindowFnNoAssign() {
 pipeline
   .apply(Create.of(1, 2, 3))
   .apply(
     Window.<Integer>configure()
       .triggering(AfterWatermark.pastEndOfWindow())
       .withAllowedLateness(Duration.ZERO)
       .accumulatingFiredPanes());
 pipeline.traverseTopologically(
   new PipelineVisitor.Defaults() {
    @Override
    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
     assertThat(node.getTransform(), not(instanceOf(Window.Assign.class)));
    }
   });
}

代码示例来源:origin: org.apache.beam/beam-runners-direct-java

Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure()
  .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
  .discardingFiredPanes()

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

private void testOutputAfterCheckpoint(IsBounded bounded) {
 PCollection<Integer> outputs =
   p.apply(Create.of("foo"))
     .apply(ParDo.of(sdfWithMultipleOutputsPerBlock(bounded, 3)))
     .apply(Window.<Integer>configure().triggering(Never.ever()).discardingFiredPanes());
 PAssert.thatSingleton(outputs.apply(Count.globally()))
   .isEqualTo((long) SDFWithMultipleOutputsPerBlockBase.MAX_INDEX);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>>
removeTriggering =
  Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>configure()
    .triggering(Never.ever())
    .discardingFiredPanes()
  .apply(
    "NeverTrigger",
    Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>configure()
      .triggering(Never.ever())
      .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testMissingModeViaLateness() {
 FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10));
 PCollection<String> input =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply("Window", Window.into(fixed));
 thrown.expect(IllegalArgumentException.class);
 thrown.expectMessage("allowed lateness");
 thrown.expectMessage("accumulation mode be specified");
 input.apply(
   "Lateness", Window.<String>configure().withAllowedLateness(Duration.standardDays(1)));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testMissingMode() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 PCollection<String> input =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply("Window", Window.into(fixed10));
 thrown.expect(IllegalArgumentException.class);
 thrown.expectMessage("requires that the accumulation mode");
 input.apply(
   "Triggering",
   Window.<String>configure()
     .withAllowedLateness(Duration.standardDays(1))
     .triggering(trigger));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testMultipleStreams() {
 TestStream<String> stream =
   TestStream.create(StringUtf8Coder.of())
     .addElements("foo", "bar")
     .advanceWatermarkToInfinity();
 TestStream<Integer> other =
   TestStream.create(VarIntCoder.of()).addElements(1, 2, 3, 4).advanceWatermarkToInfinity();
 PCollection<String> createStrings =
   p.apply("CreateStrings", stream)
     .apply(
       "WindowStrings",
       Window.<String>configure()
         .triggering(AfterPane.elementCountAtLeast(2))
         .withAllowedLateness(Duration.ZERO)
         .accumulatingFiredPanes());
 PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
 PCollection<Integer> createInts =
   p.apply("CreateInts", other)
     .apply(
       "WindowInts",
       Window.<Integer>configure()
         .triggering(AfterPane.elementCountAtLeast(4))
         .withAllowedLateness(Duration.ZERO)
         .accumulatingFiredPanes());
 PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testProcessingTimeTrigger() {
 TestStream<Long> source =
   TestStream.create(VarLongCoder.of())
     .addElements(
       TimestampedValue.of(1L, new Instant(1000L)),
       TimestampedValue.of(2L, new Instant(2000L)))
     .advanceProcessingTime(Duration.standardMinutes(12))
     .addElements(TimestampedValue.of(3L, new Instant(3000L)))
     .advanceProcessingTime(Duration.standardMinutes(6))
     .advanceWatermarkToInfinity();
 PCollection<Long> sum =
   p.apply(source)
     .apply(
       Window.<Long>configure()
         .triggering(
           AfterWatermark.pastEndOfWindow()
             .withEarlyFirings(
               AfterProcessingTime.pastFirstElementInPane()
                 .plusDelayOf(Duration.standardMinutes(5))))
         .accumulatingFiredPanes()
         .withAllowedLateness(Duration.ZERO))
     .apply(Sum.longsGlobally());
 PAssert.that(sum).inEarlyGlobalWindowPanes().containsInAnyOrder(3L, 6L);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

Window.<Boolean>configure()
  .triggering(Never.ever())
  .withAllowedLateness(Duration.ZERO)

代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql

.apply(
  "Triggering",
  Window.<Row>configure()
    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
    .withAllowedLateness(Duration.ZERO)

相关文章