org.apache.beam.sdk.Pipeline.traverseTopologically()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(118)

本文整理了Java中org.apache.beam.sdk.Pipeline.traverseTopologically()方法的一些代码示例,展示了Pipeline.traverseTopologically()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Pipeline.traverseTopologically()方法的具体详情如下:
包路径:org.apache.beam.sdk.Pipeline
类名称:Pipeline
方法名:traverseTopologically

Pipeline.traverseTopologically介绍

[英]For internal use only; no backwards-compatibility guarantees.

Invokes the PipelineVisitor PipelineVisitor#visitPrimitiveTransform and PipelineVisitor#visitValue operations on each of this Pipeline transform and value nodes, in forward topological order.

Traversal of the Pipeline causes PTransform and PValue owned by the Pipeline to be marked as finished, at which point they may no longer be modified.

Typically invoked by PipelineRunner subclasses.
[中]仅供内部使用;没有向后兼容性保证。
在每个管道转换和值节点上,按正向拓扑顺序调用PipelineVisitor PipelineVisitor#VisitiveTransform和PipelineVisitor#visitValue操作。
管道的遍历导致管道拥有的PTransform和PValue被标记为已完成,此时它们可能不再被修改。
通常由PipelineRunner子类调用。

代码示例

代码示例来源:origin: org.apache.beam/beam-runners-flink_2.10

/**
 * Translates the pipeline by passing this class as a visitor.
 * @param pipeline The pipeline to be translated
 */
public void translate(Pipeline pipeline) {
 pipeline.traverseTopologically(this);
}

代码示例来源:origin: org.apache.beam/beam-runners-direct-java

private static void validateTransforms(Pipeline pipeline) {
 pipeline.traverseTopologically(Visitor.INSTANCE);
}

代码示例来源:origin: org.apache.beam/beam-runners-flink_2.11

/**
 * Translates the pipeline by passing this class as a visitor.
 *
 * @param pipeline The pipeline to be translated
 */
public void translate(Pipeline pipeline) {
 pipeline.traverseTopologically(this);
}

代码示例来源:origin: org.apache.beam/beam-runners-flink

/**
 * Translates the pipeline by passing this class as a visitor.
 *
 * @param pipeline The pipeline to be translated
 */
public void translate(Pipeline pipeline) {
 pipeline.traverseTopologically(this);
}

代码示例来源:origin: org.apache.beam/beam-runners-direct-java

private static void validateTransforms(Pipeline pipeline) {
 pipeline.traverseTopologically(Visitor.INSTANCE);
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

private List<TransformHierarchy.Node> recordPipelineNodes(final Pipeline pipeline) {
 final NodeRecorder nodeRecorder = new NodeRecorder();
 pipeline.traverseTopologically(nodeRecorder);
 return nodeRecorder.visited;
}

代码示例来源:origin: org.apache.beam/beam-runners-spark

/** Evaluator that update/populate the cache candidates. */
public static void updateCacheCandidates(
  Pipeline pipeline, SparkPipelineTranslator translator, EvaluationContext evaluationContext) {
 CacheVisitor cacheVisitor = new CacheVisitor(translator, evaluationContext);
 pipeline.traverseTopologically(cacheVisitor);
}

代码示例来源:origin: org.apache.beam/beam-runners-apex

public void translate(Pipeline pipeline, DAG dag) {
 pipeline.traverseTopologically(this);
 translationContext.populateDAG(dag);
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

public static int countAsserts(Pipeline pipeline) {
 AssertionCountingVisitor visitor = new AssertionCountingVisitor();
 pipeline.traverseTopologically(visitor);
 return visitor.getPAssertCount();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

private boolean isEmptyPipeline(final Pipeline pipeline) {
 final IsEmptyVisitor isEmptyVisitor = new IsEmptyVisitor();
 pipeline.traverseTopologically(isEmptyVisitor);
 return isEmptyVisitor.isEmpty();
}

代码示例来源:origin: org.apache.beam/beam-runners-spark

/** Visit the pipeline to determine the translation mode (batch/streaming). */
private void detectTranslationMode(Pipeline pipeline) {
 TranslationModeDetector detector = new TranslationModeDetector();
 pipeline.traverseTopologically(detector);
 if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
  // set streaming mode if it's a streaming pipeline
  this.mOptions.setStreaming(true);
 }
}

代码示例来源:origin: org.apache.beam/beam-runners-direct-java

public static DirectGraph getGraph(Pipeline p) {
 DirectGraphVisitor visitor = new DirectGraphVisitor();
 p.traverseTopologically(visitor);
 return visitor.getGraph();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

private static Set<DisplayData> displayDataForPipeline(Pipeline pipeline, PTransform<?, ?> root) {
 PrimitiveDisplayDataPTransformVisitor visitor = new PrimitiveDisplayDataPTransformVisitor(root);
 pipeline.traverseTopologically(visitor);
 return visitor.getPrimitivesDisplayData();
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

private static DummyStatefulDoFn findBatchStatefulDoFn(Pipeline p) {
 FindBatchStatefulDoFnVisitor findBatchStatefulDoFnVisitor = new FindBatchStatefulDoFnVisitor();
 p.traverseTopologically(findBatchStatefulDoFnVisitor);
 return (DummyStatefulDoFn) findBatchStatefulDoFnVisitor.getStatefulDoFn();
}

代码示例来源:origin: apache/incubator-nemo

/**
  * Method to run the Pipeline.
  * @param pipeline the Pipeline to run.
  * @return The result of the pipeline.
  */
 public NemoPipelineResult run(final Pipeline pipeline) {
  final PipelineVisitor pipelineVisitor = new PipelineVisitor(pipeline, nemoPipelineOptions);
  pipeline.traverseTopologically(pipelineVisitor);
  final NemoPipelineResult nemoPipelineResult = new NemoPipelineResult();
  JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline(), nemoPipelineOptions.getJobName());
  return nemoPipelineResult;
 }
}

代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java

@Test
public void testProtoDirectly() {
 final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, false);
 pipeline.traverseTopologically(new PipelineProtoVerificationVisitor(pipelineProto, false));
}

代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java

@Test
public void testProtoDirectlyWithViewTransform() {
 final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, true);
 pipeline.traverseTopologically(new PipelineProtoVerificationVisitor(pipelineProto, true));
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Test
public void testNetworkConfigMissing() throws IOException {
 DataflowPipelineOptions options = buildPipelineOptions();
 Pipeline p = buildPipeline(options);
 p.traverseTopologically(new RecordingPipelineVisitor());
 Job job =
   DataflowPipelineTranslator.fromOptions(options)
     .translate(p, DataflowRunner.fromOptions(options), Collections.emptyList())
     .getJob();
 assertEquals(1, job.getEnvironment().getWorkerPools().size());
 assertNull(job.getEnvironment().getWorkerPools().get(0).getNetwork());
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Test
public void testSubnetworkConfigMissing() throws IOException {
 DataflowPipelineOptions options = buildPipelineOptions();
 Pipeline p = buildPipeline(options);
 p.traverseTopologically(new RecordingPipelineVisitor());
 Job job =
   DataflowPipelineTranslator.fromOptions(options)
     .translate(p, DataflowRunner.fromOptions(options), Collections.emptyList())
     .getJob();
 assertEquals(1, job.getEnvironment().getWorkerPools().size());
 assertNull(job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Test
public void testSubnetworkConfig() throws IOException {
 final String testSubnetwork = "regions/REGION/subnetworks/SUBNETWORK";
 DataflowPipelineOptions options = buildPipelineOptions();
 options.setSubnetwork(testSubnetwork);
 Pipeline p = buildPipeline(options);
 p.traverseTopologically(new RecordingPipelineVisitor());
 Job job =
   DataflowPipelineTranslator.fromOptions(options)
     .translate(p, DataflowRunner.fromOptions(options), Collections.emptyList())
     .getJob();
 assertEquals(1, job.getEnvironment().getWorkerPools().size());
 assertEquals(testSubnetwork, job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
}

相关文章