本文整理了Java中org.apache.beam.sdk.transforms.windowing.Window.discardingFiredPanes()
方法的一些代码示例,展示了Window.discardingFiredPanes()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Window.discardingFiredPanes()
方法的具体详情如下:
包路径:org.apache.beam.sdk.transforms.windowing.Window
类名称:Window
方法名:discardingFiredPanes
[英]Returns a new Window PTransform that uses the registered WindowFn and Triggering behavior, and that discards elements in a pane after they are triggered.
Does not modify this transform. The resulting PTransform is sufficiently specified to be applied, but more properties can still be specified.
[中]返回一个新的窗口PTransform,它使用已注册的WindowFn和触发行为,并在触发窗格中的元素后丢弃这些元素。
不会修改此转换。所产生的PTransform已被充分指定以应用,但仍可以指定更多属性。
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
private <SignalT> PCollectionView<?> expandTyped(PCollection<SignalT> input) {
return input
.apply(Window.<SignalT>configure().triggering(Never.ever()).discardingFiredPanes())
// Perform a per-window pre-combine so that our performance does not critically depend
// on combiner lifting.
.apply(ParDo.of(new CollectWindowsFn<>()))
.apply(Sample.any(1))
.apply(View.asList());
}
}
代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates
@Override
public PCollection<Void> expand(PCollection<KV<K, V>> input) {
int numShards = spec.getNumShards();
if (numShards <= 0) {
try (Consumer<?, ?> consumer = openConsumer(spec)) {
numShards = consumer.partitionsFor(spec.getTopic()).size();
LOG.info(
"Using {} shards for exactly-once writer, matching number of partitions "
+ "for topic '{}'",
numShards,
spec.getTopic());
}
}
checkState(numShards > 0, "Could not set number of shards");
return input
.apply(
Window.<KV<K, V>>into(new GlobalWindows()) // Everything into global window.
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
.apply(
String.format("Shuffle across %d shards", numShards),
ParDo.of(new Reshard<>(numShards)))
.apply("Persist sharding", GroupByKey.create())
.apply("Assign sequential ids", ParDo.of(new Sequencer<>()))
.apply("Persist ids", GroupByKey.create())
.apply(
String.format("Write to Kafka topic '%s'", spec.getTopic()),
ParDo.of(new ExactlyOnceWriter<>(spec, input.getCoder())));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-kafka
@Override
public PCollection<Void> expand(PCollection<KV<K, V>> input) {
int numShards = spec.getNumShards();
if (numShards <= 0) {
try (Consumer<?, ?> consumer = openConsumer(spec)) {
numShards = consumer.partitionsFor(spec.getTopic()).size();
LOG.info(
"Using {} shards for exactly-once writer, matching number of partitions "
+ "for topic '{}'",
numShards,
spec.getTopic());
}
}
checkState(numShards > 0, "Could not set number of shards");
return input
.apply(
Window.<KV<K, V>>into(new GlobalWindows()) // Everything into global window.
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
.apply(
String.format("Shuffle across %d shards", numShards),
ParDo.of(new Reshard<>(numShards)))
.apply("Persist sharding", GroupByKey.create())
.apply("Assign sequential ids", ParDo.of(new Sequencer<>()))
.apply("Persist ids", GroupByKey.create())
.apply(
String.format("Write to Kafka topic '%s'", spec.getTopic()),
ParDo.of(new ExactlyOnceWriter<>(spec, input.getCoder())));
}
代码示例来源:origin: takidau/streamingbook
@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
return input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardDays(1000))
.discardingFiredPanes())
.apply(Sum.integersPerKey())
.apply(ParDo.of(new FormatAsStrings()));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testDisplayDataExcludesUnspecifiedProperties() {
Window<?> onlyHasAccumulationMode = Window.configure().discardingFiredPanes();
assertThat(
DisplayData.from(onlyHasAccumulationMode),
not(
hasDisplayItem(
hasKey(
isOneOf(
"windowFn",
"trigger",
"timestampCombiner",
"allowedLateness",
"closingBehavior")))));
Window<?> noAccumulationMode = Window.into(new GlobalWindows());
assertThat(
DisplayData.from(noAccumulationMode), not(hasDisplayItem(hasKey("accumulationMode"))));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(NeedsRunner.class)
public void testReadWatchForNewFiles() throws IOException, InterruptedException {
final Path basePath = tempFolder.getRoot().toPath().resolve("readWatch");
basePath.toFile().mkdir();
p.apply(GenerateSequence.from(0).to(10).withRate(1, Duration.millis(100)))
.apply(
Window.<Long>into(FixedWindows.of(Duration.millis(150)))
.withAllowedLateness(Duration.ZERO)
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
.apply(ToString.elements())
.apply(
TextIO.write()
.to(basePath.resolve("data").toString())
.withNumShards(1)
.withWindowedWrites());
PCollection<String> lines =
p.apply(
TextIO.read()
.from(basePath.resolve("*").toString())
.watchForNewFiles(
Duration.millis(100),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3))));
PAssert.that(lines).containsInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
p.run();
}
}
代码示例来源:origin: gojektech/feast
public void logNRows(PFeatureRows pFeatureRows, String name, int limit) {
PCollection<FeatureRowExtended> main = pFeatureRows.getMain();
PCollection<FeatureRowExtended> errors = pFeatureRows.getErrors();
if (main.isBounded().equals(IsBounded.UNBOUNDED)) {
Window<FeatureRowExtended> minuteWindow =
Window.<FeatureRowExtended>into(FixedWindows.of(Duration.standardMinutes(1L)))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(1));
main = main.apply(minuteWindow);
errors = errors.apply(minuteWindow);
}
main.apply("Sample success", Sample.any(limit))
.apply("Log success sample", ParDo.of(new LoggerDoFn(Level.INFO, name + " MAIN ")));
errors
.apply("Sample errors", Sample.any(limit))
.apply("Log errors sample", ParDo.of(new LoggerDoFn(Level.ERROR, name + " ERRORS ")));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Override
public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
return input
.apply(Reify.windows())
.apply(
WithKeys.<Integer, ValueInSingleWindow<T>>of(0)
.withKeyType(new TypeDescriptor<Integer>() {}))
.apply(
Window.into(
new IdentityWindowFn<KV<Integer, ValueInSingleWindow<T>>>(
originalWindowFn.windowCoder()))
.triggering(Never.ever())
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
.discardingFiredPanes())
// all values have the same key so they all appear as a single output element
.apply(GroupByKey.create())
.apply(Values.create())
.setWindowingStrategyInternal(input.getWindowingStrategy());
}
}
代码示例来源:origin: jbonofre/beam-samples
public final static void main(String[] args) throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
ClientDataSource dataSource = new ClientDataSource();
dataSource.setPortNumber(1527);
dataSource.setServerName("localhost");
dataSource.setDatabaseName("test");
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
.apply(ParDo.of(new DoFn<JmsRecord, String>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
JmsRecord element = processContext.element();
processContext.output(element.getPayload());
}
}))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(JdbcIO.<String>write().withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(dataSource))
.withStatement("insert into test values(?)")
.withPreparedStatementSetter((element, statement) -> {
statement.setString(1, element);
}));
pipeline.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
private void testOutputAfterCheckpoint(IsBounded bounded) {
PCollection<Integer> outputs =
p.apply(Create.of("foo"))
.apply(ParDo.of(sdfWithMultipleOutputsPerBlock(bounded, 3)))
.apply(Window.<Integer>configure().triggering(Never.ever()).discardingFiredPanes());
PAssert.thatSingleton(outputs.apply(Count.globally()))
.isEqualTo((long) SDFWithMultipleOutputsPerBlockBase.MAX_INDEX);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-google-cloud-platform
@Override
public PDone expand(PCollection<PubsubMessage> input) {
input
.apply(
"PubsubUnboundedSink.Window",
Window.<PubsubMessage>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(publishBatchSize),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(maxLatency))))
.discardingFiredPanes())
.apply("PubsubUnboundedSink.Shard", ParDo.of(new ShardFn(numShards, recordIdMethod)))
.setCoder(KvCoder.of(VarIntCoder.of(), CODER))
.apply(GroupByKey.create())
.apply(
"PubsubUnboundedSink.Writer",
ParDo.of(
new WriterFn(
pubsubFactory,
topic,
timestampAttribute,
idAttribute,
publishBatchSize,
publishBatchBytes)));
return PDone.in(input.getPipeline());
}
}
代码示例来源:origin: gojektech/feast
/** Writes to different file sinks based on a */
@Override
public PDone expand(PCollection<FeatureRowExtended> input) {
final String folderName = options.jobName != null ? options.jobName : "unknown-jobs";
FileIO.Write<String, FeatureRowExtended> write =
FileIO.<String, FeatureRowExtended>writeDynamic()
.by((rowExtended) -> rowExtended.getRow().getEntityName())
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(
Contextful.fn(
(entityName) -> FileIO.Write.defaultNaming(folderName + "/" + entityName, suffix)))
.via(Contextful.fn(toTextFunction), Contextful.fn((entityName) -> TextIO.sink()))
.to(options.path);
if (input.isBounded().equals(IsBounded.UNBOUNDED)) {
Window<FeatureRowExtended> minuteWindow =
Window.<FeatureRowExtended>into(FixedWindows.of(options.getWindowDuration()))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO);
input = input.apply(minuteWindow);
write = write.withNumShards(10);
}
WriteFilesResult<String> outputFiles = input.apply(write);
return PDone.in(outputFiles.getPipeline());
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(NeedsRunner.class)
public void testWindowedWritesWithOnceTrigger() throws Throwable {
// Tests for https://issues.apache.org/jira/browse/BEAM-3169
PCollection<String> data =
p.apply(Create.of("0", "1", "2"))
.apply(
Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
// According to this trigger, all data should be written.
// However, the continuation of this trigger is elementCountAtLeast(1),
// so with a buggy implementation that used a GBK before renaming files,
// only 1 file would be renamed.
.triggering(AfterPane.elementCountAtLeast(3))
.withAllowedLateness(Duration.standardMinutes(1))
.discardingFiredPanes());
PCollection<String> filenames =
data.apply(
TextIO.write()
.to(new File(tempFolder.getRoot(), "windowed-writes").getAbsolutePath())
.withNumShards(2)
.withWindowedWrites()
.<Void>withOutputFilenames())
.getPerDestinationOutputFilenames()
.apply(Values.create());
PAssert.that(filenames.apply(TextIO.readAll())).containsInAnyOrder("0", "1", "2");
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-google-cloud-platform
@Override
public PCollection<KV<TableDestination, String>> expand(
PCollection<KV<ShardedKey<DestinationT>, List<String>>> input) {
PCollectionTuple writeTablesOutputs =
input.apply(
ParDo.of(new WriteTablesDoFn())
.withSideInputs(sideInputs)
.withOutputTags(mainOutputTag, TupleTagList.of(temporaryFilesTag)));
// Garbage collect temporary files.
// We mustn't start garbage collecting files until we are assured that the WriteTablesDoFn has
// succeeded in loading those files and won't be retried. Otherwise, we might fail part of the
// way through deleting temporary files, and retry WriteTablesDoFn. This will then fail due
// to missing files, causing either the entire workflow to fail or get stuck (depending on how
// the runner handles persistent failures).
writeTablesOutputs
.get(temporaryFilesTag)
.setCoder(StringUtf8Coder.of())
.apply(WithKeys.of((Void) null))
.setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
.apply(
Window.<KV<Void, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
.apply(GroupByKey.create())
.apply(Values.create())
.apply(ParDo.of(new GarbageCollectTemporaryFiles()));
return writeTablesOutputs.get(mainOutputTag);
}
代码示例来源:origin: jbonofre/beam-samples
public final static void main(String args[]) throws Exception {
final Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(KafkaIO.<Long, String>read()
.withBootstrapServers(options.getBootstrap())
.withTopic(options.getTopic())
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
.apply(Values.<String>create())
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
.apply(TextIO.write()
.to(options.getOutput())
.withWindowedWrites()
.withNumShards(1));
pipeline.run().waitUntilFinish();
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql
Window.<Row>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(2)))
.discardingFiredPanes()
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY));
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(NeedsRunner.class)
public void singlePaneSingleReifiedPane() {
PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> accumulatedPanes =
p.apply(GenerateSequence.from(0).to(20000))
.apply(WithTimestamps.of(input -> new Instant(input * 10)))
.apply(
Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(GatherAllPanes.globally());
PAssert.that(accumulatedPanes)
.satisfies(
input -> {
for (Iterable<ValueInSingleWindow<Iterable<Long>>> windowedInput : input) {
if (Iterables.size(windowedInput) > 1) {
fail("Expected all windows to have exactly one pane, got " + windowedInput);
return null;
}
}
return null;
});
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.withAllowedLateness(Duration.standardMinutes(1))
.discardingFiredPanes());
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
.plusDelayOf(Duration.standardSeconds(30))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(
Distinct.withRepresentativeValueFn(new Keys<Integer>())
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
.withEarlyFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
.apply(GroupByKey.create())
内容来源于网络,如有侵权,请联系作者删除!