org.apache.beam.sdk.Pipeline.create()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(140)

本文整理了Java中org.apache.beam.sdk.Pipeline.create()方法的一些代码示例,展示了Pipeline.create()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Pipeline.create()方法的具体详情如下:
包路径:org.apache.beam.sdk.Pipeline
类名称:Pipeline
方法名:create

Pipeline.create介绍

[英]Constructs a pipeline from default PipelineOptions.
[中]从默认的PipelineOptions构造管道。

代码示例

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void runAvroToCsv(SampleOptions options)
  throws IOException, IllegalArgumentException {
 FileSystems.setDefaultPipelineOptions(options);
 // Get Avro Schema
 String schemaJson = getSchema(options.getAvroSchema());
 Schema schema = new Schema.Parser().parse(schemaJson);
 // Check schema field types before starting the Dataflow job
 checkFieldTypes(schema);
 // Create the Pipeline object with the options we defined above.
 Pipeline pipeline = Pipeline.create(options);
 // Convert Avro To CSV
 pipeline.apply("Read Avro files",
   AvroIO.readGenericRecords(schemaJson).from(options.getInputFile()))
   .apply("Convert Avro to CSV formatted data",
     ParDo.of(new ConvertAvroToCsv(schemaJson, options.getCsvDelimiter())))
   .apply("Write CSV formatted data", TextIO.write().to(options.getOutput())
     .withSuffix(".csv"));
 // Run the pipeline.
 pipeline.run().waitUntilFinish();
}

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void runCsvToAvro(SampleOptions options)
  throws IOException, IllegalArgumentException {
 FileSystems.setDefaultPipelineOptions(options);
 // Get Avro Schema
 String schemaJson = getSchema(options.getAvroSchema());
 Schema schema = new Schema.Parser().parse(schemaJson);
 // Check schema field types before starting the Dataflow job
 checkFieldTypes(schema);
 // Create the Pipeline object with the options we defined above.
 Pipeline pipeline = Pipeline.create(options);
 // Convert CSV to Avro
 pipeline.apply("Read CSV files", TextIO.read().from(options.getInputFile()))
   .apply("Convert CSV to Avro formatted data",
     ParDo.of(new ConvertCsvToAvro(schemaJson, options.getCsvDelimiter())))
   .setCoder(AvroCoder.of(GenericRecord.class, schema))
   .apply("Write Avro formatted data", AvroIO.writeGenericRecords(schemaJson)
     .to(options.getOutput()).withCodec(CodecFactory.snappyCodec()).withSuffix(".avro"));
 // Run the pipeline.
 pipeline.run().waitUntilFinish();
}

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void main(String[] args) {
 Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
 Pipeline p = Pipeline.create(options);

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void main(String[] args) {
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline p = Pipeline.create(options);

  String instanceId = options.getInstanceId();
  String databaseId = options.getDatabaseId();
  // [START spanner_dataflow_read]
  // Query for all the columns and rows in the specified Spanner table
  PCollection<Struct> records = p.apply(
    SpannerIO.read()
      .withInstanceId(instanceId)
      .withDatabaseId(databaseId)
      .withQuery("SELECT * FROM " + options.getTable()));
  // [END spanner_dataflow_read]

  PCollection<Long> tableEstimatedSize = records
    // Estimate the size of every row
    .apply(EstimateSize.create())
    // Sum all the row sizes to get the total estimated size of the table
    .apply(Sum.longsGlobally());

  // Write the total size to a file
  tableEstimatedSize
    .apply(ToString.elements())
    .apply(TextIO.write().to(options.getOutput()).withoutSharding());

  p.run().waitUntilFinish();
 }
}

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void main(String[] args) {
 Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
 Pipeline p = Pipeline.create(options);

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void main(String[] args) {
 Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
 Pipeline p = Pipeline.create(options);
 SpannerConfig spannerConfig = SpannerConfig.create()
   .withInstanceId(options.getInstanceId())
   .withDatabaseId(options.getDatabaseId());
 // [START spanner_dataflow_readall]
 PCollection<Struct> allRecords = p.apply(SpannerIO.read()
   .withSpannerConfig(spannerConfig)
   .withQuery("SELECT t.table_name FROM information_schema.tables AS t WHERE t"
     + ".table_catalog = '' AND t.table_schema = ''")).apply(
   MapElements.into(TypeDescriptor.of(ReadOperation.class))
     .via((SerializableFunction<Struct, ReadOperation>) input -> {
      String tableName = input.getString(0);
      return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
     })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));
 // [END spanner_dataflow_readall]
 PCollection<Long> dbEstimatedSize = allRecords.apply(EstimateSize.create())
   .apply(Sum.longsGlobally());
 dbEstimatedSize.apply(ToString.elements()).apply(TextIO.write().to(options.getOutput())
   .withoutSharding());
 p.run().waitUntilFinish();
}

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void main(String[] args) {
 Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
 Pipeline p = Pipeline.create(options);

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void main(String[] args) {
 Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
 Pipeline p = Pipeline.create(options);

代码示例来源:origin: org.apache.beam/beam-runners-direct-java

private Pipeline getPipeline() {
 PipelineOptions opts = PipelineOptionsFactory.create();
 opts.setRunner(DirectRunner.class);
 return Pipeline.create(opts);
}

代码示例来源:origin: org.apache.beam/beam-runners-direct-java

@Test
public void getRootTransformsSucceeds() {
 Pipeline pipeline = Pipeline.create();
 pipeline.apply("impulse", Impulse.create());
 pipeline.apply("otherImpulse", Impulse.create());
 PortableGraph graph = PortableGraph.forPipeline(PipelineTranslation.toProto(pipeline));
 assertThat(graph.getRootTransforms(), hasSize(2));
 assertThat(
   graph.getRootTransforms().stream().map(PTransformNode::getId).collect(Collectors.toSet()),
   containsInAnyOrder("impulse", "otherImpulse"));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-google-cloud-platform

@Test
public void testValueProviderTopic() {
 StaticValueProvider<String> provider = StaticValueProvider.of("projects/project/topics/topic");
 Read<String> pubsubRead = PubsubIO.readStrings().fromTopic(provider);
 Pipeline.create().apply(pubsubRead);
 assertThat(pubsubRead.getTopicProvider(), not(nullValue()));
 assertThat(pubsubRead.getTopicProvider().isAccessible(), is(true));
 assertThat(pubsubRead.getTopicProvider().get().asPath(), equalTo(provider.get()));
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

private Pipeline createTestBatchRunner() {
 DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
 options.setRunner(DataflowRunner.class);
 options.setProject("someproject");
 options.setGcpTempLocation("gs://staging");
 options.setPathValidatorClass(NoopPathValidator.class);
 options.setDataflowClient(dataflow);
 return Pipeline.create(options);
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Test
public void testInaccessibleProvider() throws Exception {
 DataflowPipelineOptions options = buildPipelineOptions();
 Pipeline pipeline = Pipeline.create(options);
 DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
 pipeline.apply(TextIO.read().from(new TestValueProvider()));
 // Check that translation does not fail.
 t.translate(pipeline, DataflowRunner.fromOptions(options), Collections.emptyList());
}

代码示例来源:origin: org.apache.beam/beam-runners-flink_2.11

@Test
public void testTranslationModeOverrideWithUnboundedSources() {
 FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
 options.setRunner(FlinkRunner.class);
 options.setStreaming(false);
 FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
 Pipeline pipeline = Pipeline.create(options);
 pipeline.apply(GenerateSequence.from(0));
 flinkEnv.translate(pipeline);
 assertThat(options.isStreaming(), is(true));
}

代码示例来源:origin: org.apache.beam/beam-runners-flink_2.10-examples

public static void main(String[] args) {
 Options options = PipelineOptionsFactory.fromArgs(args).withValidation()
   .as(Options.class);
 options.setRunner(FlinkRunner.class);
 Pipeline p = Pipeline.create(options);
 p.apply("ReadLines", TextIO.Read.from(options.getInput()))
   .apply(new CountWords())
   .apply(MapElements.via(new FormatAsTextFn()))
   .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 p.run();
}

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
   .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
   .apply(new CountWords())
   .apply(MapElements.via(new FormatAsTextFn()))
   .apply("WriteCounts", TextIO.write().to(options.getOutput()));

  p.run();
 }
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Test
public void testPartiallyBoundFailure() throws IOException {
 Pipeline p = Pipeline.create(buildPipelineOptions());
 PCollection<Integer> input = p.begin().apply(Create.of(1, 2, 3));
 thrown.expect(IllegalArgumentException.class);
 input.apply(new PartiallyBoundOutputCreator());
 Assert.fail("Failure expected from use of partially bound output");
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
 public void runThrows() {
  PipelineOptions opts = PipelineOptionsFactory.create();
  opts.setRunner(CrashingRunner.class);

  Pipeline p = Pipeline.create(opts);
  p.apply(Create.of(1, 2, 3));

  thrown.expect(IllegalArgumentException.class);
  thrown.expectMessage("Cannot call #run");
  thrown.expectMessage(TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);

  p.run();
 }
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Test
public void testSingleOutputOverrideNonCrashing() throws Exception {
 DataflowPipelineOptions options = buildPipelineOptions();
 options.setRunner(DataflowRunner.class);
 Pipeline pipeline = Pipeline.create(options);
 DummyStatefulDoFn fn = new DummyStatefulDoFn();
 pipeline.apply(Create.of(KV.of(1, 2))).apply(ParDo.of(fn));
 DataflowRunner runner = DataflowRunner.fromOptions(options);
 runner.replaceTransforms(pipeline);
 assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Test
public void testFnApiSingleOutputOverrideNonCrashing() throws Exception {
 DataflowPipelineOptions options = buildPipelineOptions("--experiments=beam_fn_api");
 options.setRunner(DataflowRunner.class);
 Pipeline pipeline = Pipeline.create(options);
 DummyStatefulDoFn fn = new DummyStatefulDoFn();
 pipeline.apply(Create.of(KV.of(1, 2))).apply(ParDo.of(fn));
 DataflowRunner runner = DataflowRunner.fromOptions(options);
 runner.replaceTransforms(pipeline);
 assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
}

相关文章