org.apache.beam.sdk.Pipeline.apply()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(12.4k)|赞(0)|评价(0)|浏览(140)

本文整理了Java中org.apache.beam.sdk.Pipeline.apply()方法的一些代码示例,展示了Pipeline.apply()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Pipeline.apply()方法的具体详情如下:
包路径:org.apache.beam.sdk.Pipeline
类名称:Pipeline
方法名:apply

Pipeline.apply介绍

[英]Adds a root PTransform, such as Read or Create, to this Pipeline.

The node in the Pipeline graph will use the provided name. This name is used in various places, including the monitoring UI, logging, and to stably identify this node in the Pipeline graph upon update.

Alias for begin().apply(name, root).
[中]将根PTransform(例如读取或创建)添加到此管道。
管道图中的节点将使用提供的名称。此名称用于各种地方,包括监控UI、日志记录,并在更新时在管道图中稳定地标识此节点。
begin()的别名。应用(名称、根)。

代码示例

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void runAvroToCsv(SampleOptions options)
  throws IOException, IllegalArgumentException {
 FileSystems.setDefaultPipelineOptions(options);
 // Get Avro Schema
 String schemaJson = getSchema(options.getAvroSchema());
 Schema schema = new Schema.Parser().parse(schemaJson);
 // Check schema field types before starting the Dataflow job
 checkFieldTypes(schema);
 // Create the Pipeline object with the options we defined above.
 Pipeline pipeline = Pipeline.create(options);
 // Convert Avro To CSV
 pipeline.apply("Read Avro files",
   AvroIO.readGenericRecords(schemaJson).from(options.getInputFile()))
   .apply("Convert Avro to CSV formatted data",
     ParDo.of(new ConvertAvroToCsv(schemaJson, options.getCsvDelimiter())))
   .apply("Write CSV formatted data", TextIO.write().to(options.getOutput())
     .withSuffix(".csv"));
 // Run the pipeline.
 pipeline.run().waitUntilFinish();
}

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void runCsvToAvro(SampleOptions options)
  throws IOException, IllegalArgumentException {
 FileSystems.setDefaultPipelineOptions(options);
 // Get Avro Schema
 String schemaJson = getSchema(options.getAvroSchema());
 Schema schema = new Schema.Parser().parse(schemaJson);
 // Check schema field types before starting the Dataflow job
 checkFieldTypes(schema);
 // Create the Pipeline object with the options we defined above.
 Pipeline pipeline = Pipeline.create(options);
 // Convert CSV to Avro
 pipeline.apply("Read CSV files", TextIO.read().from(options.getInputFile()))
   .apply("Convert CSV to Avro formatted data",
     ParDo.of(new ConvertCsvToAvro(schemaJson, options.getCsvDelimiter())))
   .setCoder(AvroCoder.of(GenericRecord.class, schema))
   .apply("Write Avro formatted data", AvroIO.writeGenericRecords(schemaJson)
     .to(options.getOutput()).withCodec(CodecFactory.snappyCodec()).withSuffix(".avro"));
 // Run the pipeline.
 pipeline.run().waitUntilFinish();
}

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void main(String[] args) {
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline p = Pipeline.create(options);

  String instanceId = options.getInstanceId();
  String databaseId = options.getDatabaseId();
  // [START spanner_dataflow_read]
  // Query for all the columns and rows in the specified Spanner table
  PCollection<Struct> records = p.apply(
    SpannerIO.read()
      .withInstanceId(instanceId)
      .withDatabaseId(databaseId)
      .withQuery("SELECT * FROM " + options.getTable()));
  // [END spanner_dataflow_read]

  PCollection<Long> tableEstimatedSize = records
    // Estimate the size of every row
    .apply(EstimateSize.create())
    // Sum all the row sizes to get the total estimated size of the table
    .apply(Sum.longsGlobally());

  // Write the total size to a file
  tableEstimatedSize
    .apply(ToString.elements())
    .apply(TextIO.write().to(options.getOutput()).withoutSharding());

  p.run().waitUntilFinish();
 }
}

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

p.apply("ReadSingers", TextIO.read().from(options.getSingersFilename()))
  .apply("ParseSingers", ParDo.of(new ParseSinger()))
  .apply("CreateSingerMutation", ParDo.of(new DoFn<Singer, Mutation>() {
   @ProcessElement
   public void processElement(ProcessContext c) {
  .apply("WriteSingers", SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId));
  .apply("ReadAlbums", TextIO.read().from(options.getAlbumsFilename()))
  .apply("ParseAlbums", ParDo.of(new ParseAlbum()));

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

PCollection<Struct> records = p.apply(
  SpannerIO.read()
    .withInstanceId(instanceId)
  .apply(EstimateSize.create())
  .apply(Sum.longsGlobally());
  .apply(ToString.elements())
  .apply(TextIO.write().to(options.getOutput()).withoutSharding());

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void main(String[] args) {
 Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
 Pipeline p = Pipeline.create(options);
 SpannerConfig spannerConfig = SpannerConfig.create()
   .withInstanceId(options.getInstanceId())
   .withDatabaseId(options.getDatabaseId());
 // [START spanner_dataflow_readall]
 PCollection<Struct> allRecords = p.apply(SpannerIO.read()
   .withSpannerConfig(spannerConfig)
   .withQuery("SELECT t.table_name FROM information_schema.tables AS t WHERE t"
     + ".table_catalog = '' AND t.table_schema = ''")).apply(
   MapElements.into(TypeDescriptor.of(ReadOperation.class))
     .via((SerializableFunction<Struct, ReadOperation>) input -> {
      String tableName = input.getString(0);
      return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
     })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));
 // [END spanner_dataflow_readall]
 PCollection<Long> dbEstimatedSize = allRecords.apply(EstimateSize.create())
   .apply(Sum.longsGlobally());
 dbEstimatedSize.apply(ToString.elements()).apply(TextIO.write().to(options.getOutput())
   .withoutSharding());
 p.run().waitUntilFinish();
}

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

.withInstanceId(instanceId)
  .withDatabaseId(databaseId);
PCollectionView<Transaction> tx = p.apply(
  SpannerIO.createTransaction()
    .withSpannerConfig(spannerConfig)
    .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers = p.apply(SpannerIO.read()
  .withSpannerConfig(spannerConfig)
  .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
  .withTransaction(tx));
PCollection<Struct> albums = p.apply(SpannerIO.read().withSpannerConfig(spannerConfig)
  .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
  .withTransaction(tx));
singers.apply(MapElements.via(new SimpleFunction<Struct, String>() {
  return Joiner.on(DELIMITER).join(input.getLong(0), input.getString(1), input.getString(2));
})).apply(TextIO.write().to(options.getSingersFilename()).withoutSharding());
albums.apply(MapElements.via(new SimpleFunction<Struct, String>() {

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

PCollection<String> suspiciousUserIds = p.apply(TextIO.read().from(usersIdFile));
  .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {
mutations.apply(SpannerIO.write()
  .withInstanceId(instanceId)
  .withDatabaseId(databaseId)

代码示例来源:origin: jbonofre/beam-samples

public static void runPipeline(Pipeline p) {
    System.out.println("Sleep time: " + TearDown.SLEEP_TIME + " ms");

    long tId = Thread.currentThread().getId();
    long beginTs = System.currentTimeMillis();

    p.apply(Create.of("value"))
      .apply(ParDo.of(new LongTearDownFn()));
    p.run().waitUntilFinish();

    long endTs = System.currentTimeMillis();

    System.out.println("Thread #" + tId +  ", run for " + (endTs - beginTs) + " ms");
  }
}

代码示例来源:origin: GoogleCloudPlatform/cloud-bigtable-client

@VisibleForTesting
static Pipeline buildPipeline(ImportOptions opts) {
 Pipeline pipeline = Pipeline.create(Utils.tweakOptions(opts));
 pipeline
   .apply(
     "Read Sequence File",
     Read.from(new ShuffledSource<>(createSource(opts.getSourcePattern()))))
   .apply("Create Mutations", ParDo.of(new HBaseResultToMutationFn()))
   .apply("Write to Bigtable", createSink(opts));
 return pipeline;
}

代码示例来源:origin: jbonofre/beam-samples

private static void runReadPipeline(Options options) {
 Pipeline pipeline = Pipeline.create(options);
 pipeline
   .apply("Find files", FileIO.match().filepattern(options.getInput()))
   .apply("Read matched files", FileIO.readMatches())
   .apply("Read parquet files", ParquetIO.readFiles(SCHEMA))
   .apply("Map records to strings", MapElements.into(strings()).via(new GetRecordsFn()));
 pipeline.run();
}

代码示例来源:origin: org.apache.beam/beam-examples-java

static void runWordCount(WordCountOptions options) {
 Pipeline p = Pipeline.create(options);
 // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
 // static FormatAsTextFn() to the ParDo transform.
 p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
   .apply(new CountWords())
   .apply(MapElements.via(new FormatAsTextFn()))
   .apply("WriteCounts", TextIO.write().to(options.getOutput()));
 p.run().waitUntilFinish();
}

代码示例来源:origin: org.apache.beam/beam-examples-java

public static void run(Options options) {
 Pipeline p = Pipeline.create(options);
 double samplingThreshold = 0.1;
 p.apply(TextIO.read().from(options.getWikiInput()))
   .apply(MapElements.via(new ParseTableRowJson()))
   .apply(new ComputeTopSessions(samplingThreshold))
   .apply("Write", TextIO.write().to(options.getOutput()));
 p.run().waitUntilFinish();
}

代码示例来源:origin: org.apache.beam/beam-runners-flink_2.10-examples

public static void main(String[] args) {
 Options options = PipelineOptionsFactory.fromArgs(args).withValidation()
   .as(Options.class);
 options.setRunner(FlinkRunner.class);
 Pipeline p = Pipeline.create(options);
 p.apply("ReadLines", TextIO.Read.from(options.getInput()))
   .apply(new CountWords())
   .apply(MapElements.via(new FormatAsTextFn()))
   .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-runners-flink_2.10-examples

public static void main(String[] args) {
 Pipeline p = initializePipeline(args);
 KafkaOptions options = getOptions(p);
 PCollection<String> words =
   p.apply(Create.of("These", "are", "some", "words"));
 FlinkKafkaProducer08<String> kafkaSink =
   new FlinkKafkaProducer08<>(options.getKafkaTopic(),
     new SimpleStringSchema(), getKafkaProps(options));
 words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Override
 public PCollection<Integer> expand(PCollection<Integer> input) {
  // Apply an operation so that this is a composite transform.
  input.apply(Count.perElement());
  // Return a value unrelated to the input.
  return input.getPipeline().apply(Create.of(1, 2, 3, 4));
 }
}

代码示例来源:origin: org.apache.beam/beam-examples-java

public static void main(String[] args) throws Exception {
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline p = Pipeline.create(options);
  // the following two 'applys' create multiple inputs to our pipeline, one for each
  // of our two input sources.
  PCollection<TableRow> eventsTable =
    p.apply(BigQueryIO.readTableRows().from(GDELT_EVENTS_TABLE));
  PCollection<TableRow> countryCodes = p.apply(BigQueryIO.readTableRows().from(COUNTRY_CODES));
  PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
  formattedResults.apply(TextIO.write().to(options.getOutput()));
  p.run().waitUntilFinish();
 }
}

代码示例来源:origin: org.apache.beam/beam-examples-java

static void runTfIdf(Options options) throws Exception {
 Pipeline pipeline = Pipeline.create(options);
 pipeline.getCoderRegistry().registerCoderForClass(URI.class, StringDelegateCoder.of(URI.class));
 pipeline
   .apply(new ReadDocuments(listInputDocuments(options)))
   .apply(new ComputeTfIdf())
   .apply(new WriteTfIdf(options.getOutput()));
 pipeline.run().waitUntilFinish();
}

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
   .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
   .apply(new CountWords())
   .apply(MapElements.via(new FormatAsTextFn()))
   .apply("WriteCounts", TextIO.write().to(options.getOutput()));

  p.run();
 }
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Test
public void testTransformTranslatorMissing() throws IOException {
 DataflowPipelineOptions options = buildPipelineOptions();
 Pipeline p = Pipeline.create(options);
 p.apply(Create.of(Arrays.asList(1, 2, 3))).apply(new TestTransform());
 thrown.expect(IllegalStateException.class);
 thrown.expectMessage(containsString("no translator registered"));
 DataflowPipelineTranslator.fromOptions(options)
   .translate(p, DataflowRunner.fromOptions(options), Collections.emptyList());
 ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
 Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
 assertValidJob(jobCaptor.getValue());
}

相关文章