org.apache.beam.sdk.transforms.windowing.Window.discardingFiredPanes()方法的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(15.9k)|赞(0)|评价(0)|浏览(110)

本文整理了Java中org.apache.beam.sdk.transforms.windowing.Window.discardingFiredPanes()方法的一些代码示例,展示了Window.discardingFiredPanes()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Window.discardingFiredPanes()方法的具体详情如下:
包路径:org.apache.beam.sdk.transforms.windowing.Window
类名称:Window
方法名:discardingFiredPanes

Window.discardingFiredPanes介绍

[英]Returns a new Window PTransform that uses the registered WindowFn and Triggering behavior, and that discards elements in a pane after they are triggered.

Does not modify this transform. The resulting PTransform is sufficiently specified to be applied, but more properties can still be specified.
[中]返回一个新的窗口PTransform,它使用已注册的WindowFn和触发行为,并在触发窗格中的元素后丢弃这些元素。
不会修改此转换。所产生的PTransform已被充分指定以应用,但仍可以指定更多属性。

代码示例

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

private <SignalT> PCollectionView<?> expandTyped(PCollection<SignalT> input) {
  return input
    .apply(Window.<SignalT>configure().triggering(Never.ever()).discardingFiredPanes())
    // Perform a per-window pre-combine so that our performance does not critically depend
    // on combiner lifting.
    .apply(ParDo.of(new CollectWindowsFn<>()))
    .apply(Sample.any(1))
    .apply(View.asList());
 }
}

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

@Override
public PCollection<Void> expand(PCollection<KV<K, V>> input) {
 int numShards = spec.getNumShards();
 if (numShards <= 0) {
  try (Consumer<?, ?> consumer = openConsumer(spec)) {
   numShards = consumer.partitionsFor(spec.getTopic()).size();
   LOG.info(
     "Using {} shards for exactly-once writer, matching number of partitions "
       + "for topic '{}'",
     numShards,
     spec.getTopic());
  }
 }
 checkState(numShards > 0, "Could not set number of shards");
 return input
   .apply(
     Window.<KV<K, V>>into(new GlobalWindows()) // Everything into global window.
       .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
       .discardingFiredPanes())
   .apply(
     String.format("Shuffle across %d shards", numShards),
     ParDo.of(new Reshard<>(numShards)))
   .apply("Persist sharding", GroupByKey.create())
   .apply("Assign sequential ids", ParDo.of(new Sequencer<>()))
   .apply("Persist ids", GroupByKey.create())
   .apply(
     String.format("Write to Kafka topic '%s'", spec.getTopic()),
     ParDo.of(new ExactlyOnceWriter<>(spec, input.getCoder())));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-kafka

@Override
public PCollection<Void> expand(PCollection<KV<K, V>> input) {
 int numShards = spec.getNumShards();
 if (numShards <= 0) {
  try (Consumer<?, ?> consumer = openConsumer(spec)) {
   numShards = consumer.partitionsFor(spec.getTopic()).size();
   LOG.info(
     "Using {} shards for exactly-once writer, matching number of partitions "
       + "for topic '{}'",
     numShards,
     spec.getTopic());
  }
 }
 checkState(numShards > 0, "Could not set number of shards");
 return input
   .apply(
     Window.<KV<K, V>>into(new GlobalWindows()) // Everything into global window.
       .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
       .discardingFiredPanes())
   .apply(
     String.format("Shuffle across %d shards", numShards),
     ParDo.of(new Reshard<>(numShards)))
   .apply("Persist sharding", GroupByKey.create())
   .apply("Assign sequential ids", ParDo.of(new Sequencer<>()))
   .apply("Persist ids", GroupByKey.create())
   .apply(
     String.format("Write to Kafka topic '%s'", spec.getTopic()),
     ParDo.of(new ExactlyOnceWriter<>(spec, input.getCoder())));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
              .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.standardDays(1000))
        .discardingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testDisplayDataExcludesUnspecifiedProperties() {
 Window<?> onlyHasAccumulationMode = Window.configure().discardingFiredPanes();
 assertThat(
   DisplayData.from(onlyHasAccumulationMode),
   not(
     hasDisplayItem(
       hasKey(
         isOneOf(
           "windowFn",
           "trigger",
           "timestampCombiner",
           "allowedLateness",
           "closingBehavior")))));
 Window<?> noAccumulationMode = Window.into(new GlobalWindows());
 assertThat(
   DisplayData.from(noAccumulationMode), not(hasDisplayItem(hasKey("accumulationMode"))));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
 @Category(NeedsRunner.class)
 public void testReadWatchForNewFiles() throws IOException, InterruptedException {
  final Path basePath = tempFolder.getRoot().toPath().resolve("readWatch");
  basePath.toFile().mkdir();
  p.apply(GenerateSequence.from(0).to(10).withRate(1, Duration.millis(100)))
    .apply(
      Window.<Long>into(FixedWindows.of(Duration.millis(150)))
        .withAllowedLateness(Duration.ZERO)
        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
        .discardingFiredPanes())
    .apply(ToString.elements())
    .apply(
      TextIO.write()
        .to(basePath.resolve("data").toString())
        .withNumShards(1)
        .withWindowedWrites());
  PCollection<String> lines =
    p.apply(
      TextIO.read()
        .from(basePath.resolve("*").toString())
        .watchForNewFiles(
          Duration.millis(100),
          Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3))));
  PAssert.that(lines).containsInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
  p.run();
 }
}

代码示例来源:origin: gojektech/feast

public void logNRows(PFeatureRows pFeatureRows, String name, int limit) {
 PCollection<FeatureRowExtended> main = pFeatureRows.getMain();
 PCollection<FeatureRowExtended> errors = pFeatureRows.getErrors();
 if (main.isBounded().equals(IsBounded.UNBOUNDED)) {
  Window<FeatureRowExtended> minuteWindow =
    Window.<FeatureRowExtended>into(FixedWindows.of(Duration.standardMinutes(1L)))
      .triggering(AfterWatermark.pastEndOfWindow())
      .discardingFiredPanes()
      .withAllowedLateness(Duration.standardMinutes(1));
  main = main.apply(minuteWindow);
  errors = errors.apply(minuteWindow);
 }
 main.apply("Sample success", Sample.any(limit))
   .apply("Log success sample", ParDo.of(new LoggerDoFn(Level.INFO, name + " MAIN ")));
 errors
   .apply("Sample errors", Sample.any(limit))
   .apply("Log errors sample", ParDo.of(new LoggerDoFn(Level.ERROR, name + " ERRORS ")));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
 public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
  WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();

  return input
    .apply(Reify.windows())
    .apply(
      WithKeys.<Integer, ValueInSingleWindow<T>>of(0)
        .withKeyType(new TypeDescriptor<Integer>() {}))
    .apply(
      Window.into(
          new IdentityWindowFn<KV<Integer, ValueInSingleWindow<T>>>(
            originalWindowFn.windowCoder()))
        .triggering(Never.ever())
        .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
        .discardingFiredPanes())
    // all values have the same key so they all appear as a single output element
    .apply(GroupByKey.create())
    .apply(Values.create())
    .setWindowingStrategyInternal(input.getWindowingStrategy());
 }
}

代码示例来源:origin: jbonofre/beam-samples

public final static void main(String[] args) throws Exception {
  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
  ClientDataSource dataSource = new ClientDataSource();
  dataSource.setPortNumber(1527);
  dataSource.setServerName("localhost");
  dataSource.setDatabaseName("test");
  PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
  Pipeline pipeline = Pipeline.create(options);
  pipeline
      .apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
      .apply(ParDo.of(new DoFn<JmsRecord, String>() {
        @ProcessElement
        public void processElement(ProcessContext processContext) {
          JmsRecord element = processContext.element();
          processContext.output(element.getPayload());
        }
      }))
      .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30)))
          .triggering(AfterWatermark.pastEndOfWindow())
          .withAllowedLateness(Duration.ZERO)
          .discardingFiredPanes())
      .apply(JdbcIO.<String>write().withDataSourceConfiguration(
          JdbcIO.DataSourceConfiguration.create(dataSource))
          .withStatement("insert into test values(?)")
          .withPreparedStatementSetter((element, statement) -> {
            statement.setString(1, element);
          }));
  pipeline.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

private void testOutputAfterCheckpoint(IsBounded bounded) {
 PCollection<Integer> outputs =
   p.apply(Create.of("foo"))
     .apply(ParDo.of(sdfWithMultipleOutputsPerBlock(bounded, 3)))
     .apply(Window.<Integer>configure().triggering(Never.ever()).discardingFiredPanes());
 PAssert.thatSingleton(outputs.apply(Count.globally()))
   .isEqualTo((long) SDFWithMultipleOutputsPerBlockBase.MAX_INDEX);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-google-cloud-platform

@Override
 public PDone expand(PCollection<PubsubMessage> input) {
  input
    .apply(
      "PubsubUnboundedSink.Window",
      Window.<PubsubMessage>into(new GlobalWindows())
        .triggering(
          Repeatedly.forever(
            AfterFirst.of(
              AfterPane.elementCountAtLeast(publishBatchSize),
              AfterProcessingTime.pastFirstElementInPane().plusDelayOf(maxLatency))))
        .discardingFiredPanes())
    .apply("PubsubUnboundedSink.Shard", ParDo.of(new ShardFn(numShards, recordIdMethod)))
    .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
    .apply(GroupByKey.create())
    .apply(
      "PubsubUnboundedSink.Writer",
      ParDo.of(
        new WriterFn(
          pubsubFactory,
          topic,
          timestampAttribute,
          idAttribute,
          publishBatchSize,
          publishBatchBytes)));
  return PDone.in(input.getPipeline());
 }
}

代码示例来源:origin: gojektech/feast

/** Writes to different file sinks based on a */
 @Override
 public PDone expand(PCollection<FeatureRowExtended> input) {
  final String folderName = options.jobName != null ? options.jobName : "unknown-jobs";
  FileIO.Write<String, FeatureRowExtended> write =
    FileIO.<String, FeatureRowExtended>writeDynamic()
      .by((rowExtended) -> rowExtended.getRow().getEntityName())
      .withDestinationCoder(StringUtf8Coder.of())
      .withNaming(
        Contextful.fn(
          (entityName) -> FileIO.Write.defaultNaming(folderName + "/" + entityName, suffix)))
      .via(Contextful.fn(toTextFunction), Contextful.fn((entityName) -> TextIO.sink()))
      .to(options.path);
  if (input.isBounded().equals(IsBounded.UNBOUNDED)) {
   Window<FeatureRowExtended> minuteWindow =
     Window.<FeatureRowExtended>into(FixedWindows.of(options.getWindowDuration()))
       .triggering(AfterWatermark.pastEndOfWindow())
       .discardingFiredPanes()
       .withAllowedLateness(Duration.ZERO);
   input = input.apply(minuteWindow);
   write = write.withNumShards(10);
  }
  WriteFilesResult<String> outputFiles = input.apply(write);
  return PDone.in(outputFiles.getPipeline());
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(NeedsRunner.class)
public void testWindowedWritesWithOnceTrigger() throws Throwable {
 // Tests for https://issues.apache.org/jira/browse/BEAM-3169
 PCollection<String> data =
   p.apply(Create.of("0", "1", "2"))
     .apply(
       Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
         // According to this trigger, all data should be written.
         // However, the continuation of this trigger is elementCountAtLeast(1),
         // so with a buggy implementation that used a GBK before renaming files,
         // only 1 file would be renamed.
         .triggering(AfterPane.elementCountAtLeast(3))
         .withAllowedLateness(Duration.standardMinutes(1))
         .discardingFiredPanes());
 PCollection<String> filenames =
   data.apply(
       TextIO.write()
         .to(new File(tempFolder.getRoot(), "windowed-writes").getAbsolutePath())
         .withNumShards(2)
         .withWindowedWrites()
         .<Void>withOutputFilenames())
     .getPerDestinationOutputFilenames()
     .apply(Values.create());
 PAssert.that(filenames.apply(TextIO.readAll())).containsInAnyOrder("0", "1", "2");
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-google-cloud-platform

@Override
public PCollection<KV<TableDestination, String>> expand(
  PCollection<KV<ShardedKey<DestinationT>, List<String>>> input) {
 PCollectionTuple writeTablesOutputs =
   input.apply(
     ParDo.of(new WriteTablesDoFn())
       .withSideInputs(sideInputs)
       .withOutputTags(mainOutputTag, TupleTagList.of(temporaryFilesTag)));
 // Garbage collect temporary files.
 // We mustn't start garbage collecting files until we are assured that the WriteTablesDoFn has
 // succeeded in loading those files and won't be retried. Otherwise, we might fail part of the
 // way through deleting temporary files, and retry WriteTablesDoFn. This will then fail due
 // to missing files, causing either the entire workflow to fail or get stuck (depending on how
 // the runner handles persistent failures).
 writeTablesOutputs
   .get(temporaryFilesTag)
   .setCoder(StringUtf8Coder.of())
   .apply(WithKeys.of((Void) null))
   .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
   .apply(
     Window.<KV<Void, String>>into(new GlobalWindows())
       .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
       .discardingFiredPanes())
   .apply(GroupByKey.create())
   .apply(Values.create())
   .apply(ParDo.of(new GarbageCollectTemporaryFiles()));
 return writeTablesOutputs.get(mainOutputTag);
}

代码示例来源:origin: jbonofre/beam-samples

public final static void main(String args[]) throws Exception {
    final Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(KafkaIO.<Long, String>read()
            .withBootstrapServers(options.getBootstrap())
            .withTopic(options.getTopic())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withoutMetadata())
        .apply(Values.<String>create())
        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
            .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
            .withAllowedLateness(Duration.ZERO)
            .discardingFiredPanes()
        )
        .apply(TextIO.write()
            .to(options.getOutput())
            .withWindowedWrites()
            .withNumShards(1));
    pipeline.run().waitUntilFinish();
  }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql

Window.<Row>into(new GlobalWindows())
  .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(2)))
  .discardingFiredPanes()
  .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY));

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(NeedsRunner.class)
public void singlePaneSingleReifiedPane() {
 PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> accumulatedPanes =
   p.apply(GenerateSequence.from(0).to(20000))
     .apply(WithTimestamps.of(input -> new Instant(input * 10)))
     .apply(
       Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
         .triggering(AfterWatermark.pastEndOfWindow())
         .withAllowedLateness(Duration.ZERO)
         .discardingFiredPanes())
     .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
     .apply(GroupByKey.create())
     .apply(Values.create())
     .apply(GatherAllPanes.globally());
 PAssert.that(accumulatedPanes)
   .satisfies(
     input -> {
      for (Iterable<ValueInSingleWindow<Iterable<Long>>> windowedInput : input) {
       if (Iterables.size(windowedInput) > 1) {
        fail("Expected all windows to have exactly one pane, got " + windowedInput);
        return null;
       }
      }
      return null;
     });
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
  .withAllowedLateness(Duration.standardMinutes(1))
  .discardingFiredPanes());

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

.plusDelayOf(Duration.standardSeconds(30))))
    .withAllowedLateness(Duration.ZERO)
    .discardingFiredPanes())
.apply(
  Distinct.withRepresentativeValueFn(new Keys<Integer>())

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

.withEarlyFirings(AfterPane.elementCountAtLeast(1)))
    .withAllowedLateness(Duration.ZERO)
    .discardingFiredPanes())
.apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
.apply(GroupByKey.create())

相关文章