org.apache.beam.sdk.Pipeline.getCoderRegistry()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(12.4k)|赞(0)|评价(0)|浏览(150)

本文整理了Java中org.apache.beam.sdk.Pipeline.getCoderRegistry()方法的一些代码示例,展示了Pipeline.getCoderRegistry()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Pipeline.getCoderRegistry()方法的具体详情如下:
包路径:org.apache.beam.sdk.Pipeline
类名称:Pipeline
方法名:getCoderRegistry

Pipeline.getCoderRegistry介绍

[英]Returns the CoderRegistry that this Pipeline uses.
[中]返回此管道使用的CoderRegistry。

代码示例

代码示例来源:origin: stackoverflow.com

void registerCoders(Pipeline p) {
   CoderRegistry cr = p.getCoderRegistry();
   Reflections r = new Reflections("com.brightcove.rna.model");
   Set<Class<? extends IndexedRecord>> classes = r.getSubTypesOf(IndexedRecord.class);
   for (Class<? extends IndexedRecord> clazz : classes) {
     registerAvroType(cr, clazz);
   }
 }
 <T extends IndexedRecord> void registerAvroType(CoderRegistry cr, Class<T> clazz) {
   cr.registerCoder(clazz, AvroCoder.of(clazz));
 }

代码示例来源:origin: com.google.cloud.genomics/google-genomics-dataflow

public static void registerPipelineCoders(Pipeline p) {
 CoderRegistry cr = p.getCoderRegistry();
 cr.registerCoderForClass(ReadCounts.class,
  (Coder<ReadCounts>) GenericJsonCoder.of(ReadCounts.class));
 cr.registerCoderForClass(Position.class, ProtoCoder.of(Position.class));
}

代码示例来源:origin: googlegenomics/dataflow-java

public static void registerPipelineCoders(Pipeline p) {
 CoderRegistry cr = p.getCoderRegistry();
 cr.registerCoderForClass(ReadCounts.class,
  (Coder<ReadCounts>) GenericJsonCoder.of(ReadCounts.class));
 cr.registerCoderForClass(Position.class, ProtoCoder.of(Position.class));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
public void finishSpecifyingOutput(
  String transformName, PInput input, PTransform<?, ?> transform) {
 this.coderOrFailure =
   inferCoderOrFail(
     input, transform, getPipeline().getCoderRegistry(), getPipeline().getSchemaRegistry());
 super.finishSpecifyingOutput(transformName, input, transform);
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

private Coder<DestinationT> resolveDestinationCoder(PCollection<UserT> input) {
 Coder<DestinationT> destinationCoder = getDestinationCoder();
 if (destinationCoder == null) {
  TypeDescriptor<DestinationT> destinationT =
    TypeDescriptors.outputOf(getDestinationFn().getClosure());
  try {
   destinationCoder = input.getPipeline().getCoderRegistry().getCoder(destinationT);
  } catch (CannotProvideCoderException e) {
   throw new IllegalArgumentException(
     "Unable to infer a coder for destination type (inferred from .by() as \""
       + destinationT
       + "\") - specify it explicitly using .withDestinationCoder()");
  }
 }
 return destinationCoder;
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * After building, finalizes this {@link PValue} to make it ready for running. Automatically
 * invoked whenever the {@link PValue} is "used" (e.g., when apply() is called on it) and when the
 * Pipeline is run (useful if this is a {@link PValue} with no consumers).
 */
@Override
public void finishSpecifying(PInput input, PTransform<?, ?> transform) {
 if (isFinishedSpecifying()) {
  return;
 }
 this.coderOrFailure =
   inferCoderOrFail(
     input, transform, getPipeline().getCoderRegistry(), getPipeline().getSchemaRegistry());
 // Ensure that this TypedPValue has a coder by inferring the coder if none exists; If not,
 // this will throw an exception.
 getCoder();
 super.finishSpecifying(input, transform);
}

代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java

private static <K, InputT, AccumT> Coder<AccumT> extractAccumulatorCoder(
  GlobalCombineFn<InputT, AccumT, ?> combineFn,
  AppliedPTransform<PCollection<KV<K, InputT>>, ?, Combine.PerKey<K, InputT, ?>> transform)
  throws CannotProvideCoderException {
 @SuppressWarnings("unchecked")
 PCollection<KV<K, InputT>> mainInput =
   (PCollection<KV<K, InputT>>)
     Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(transform));
 KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) mainInput.getCoder();
 return AppliedCombineFn.withInputCoder(
     combineFn,
     transform.getPipeline().getCoderRegistry(),
     inputCoder,
     transform.getTransform().getSideInputs(),
     ((PCollection<?>) Iterables.getOnlyElement(transform.getOutputs().values()))
       .getWindowingStrategy())
   .getAccumulatorCoder();
}

代码示例来源:origin: googlegenomics/dataflow-java

public static void registerPipelineCoders(Pipeline p) {
 CoderRegistry cr = p.getCoderRegistry();
 cr.registerCoderForClass(Annotation.class,
  (Coder<Annotation>) GenericJsonCoder.of(Annotation.class));
 cr.registerCoderForClass(AnnotationSet.class,
  (Coder<AnnotationSet>) GenericJsonCoder.of(AnnotationSet.class));
 cr.registerCoderForClass(BatchCreateAnnotationsRequest.class,
  (Coder<BatchCreateAnnotationsRequest>) GenericJsonCoder
   .of(BatchCreateAnnotationsRequest.class));
 cr.registerCoderForClass(PosRgsMq.class,
  (Coder<PosRgsMq>) GenericJsonCoder.of(PosRgsMq.class));
 cr.registerCoderForClass(Position.class,
  (Coder<Position>) GenericJsonCoder.of(Position.class));
}

代码示例来源:origin: com.google.cloud.genomics/google-genomics-dataflow

public static void registerPipelineCoders(Pipeline p) {
 CoderRegistry cr = p.getCoderRegistry();
 cr.registerCoderForClass(Annotation.class,
  (Coder<Annotation>) GenericJsonCoder.of(Annotation.class));
 cr.registerCoderForClass(AnnotationSet.class,
  (Coder<AnnotationSet>) GenericJsonCoder.of(AnnotationSet.class));
 cr.registerCoderForClass(BatchCreateAnnotationsRequest.class,
  (Coder<BatchCreateAnnotationsRequest>) GenericJsonCoder
   .of(BatchCreateAnnotationsRequest.class));
 cr.registerCoderForClass(PosRgsMq.class,
  (Coder<PosRgsMq>) GenericJsonCoder.of(PosRgsMq.class));
 cr.registerCoderForClass(Position.class,
  (Coder<Position>) GenericJsonCoder.of(Position.class));
}

代码示例来源:origin: org.apache.beam/beam-examples-java

static void runTfIdf(Options options) throws Exception {
 Pipeline pipeline = Pipeline.create(options);
 pipeline.getCoderRegistry().registerCoderForClass(URI.class, StringDelegateCoder.of(URI.class));
 pipeline
   .apply(new ReadDocuments(listInputDocuments(options)))
   .apply(new ComputeTfIdf())
   .apply(new WriteTfIdf(options.getOutput()));
 pipeline.run().waitUntilFinish();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-hadoop-input-format

@Override
public PCollection<KV<K, V>> expand(PBegin input) {
 validateTransform();
 // Get the key and value coders based on the key and value classes.
 CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
 Coder<K> keyCoder = getDefaultCoder(getKeyTypeDescriptor(), coderRegistry);
 Coder<V> valueCoder = getDefaultCoder(getValueTypeDescriptor(), coderRegistry);
 HadoopInputFormatBoundedSource<K, V> source =
   new HadoopInputFormatBoundedSource<>(
     getConfiguration(),
     keyCoder,
     valueCoder,
     getKeyTranslationFunction(),
     getValueTranslationFunction());
 return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-hadoop-format

@Override
public PCollection<KV<K, V>> expand(PBegin input) {
 validateTransform();
 // Get the key and value coders based on the key and value classes.
 CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
 Coder<K> keyCoder = getDefaultCoder(getKeyTypeDescriptor(), coderRegistry);
 Coder<V> valueCoder = getDefaultCoder(getValueTypeDescriptor(), coderRegistry);
 HadoopInputFormatBoundedSource<K, V> source =
   new HadoopInputFormatBoundedSource<>(
     getConfiguration(),
     keyCoder,
     valueCoder,
     getKeyTranslationFunction(),
     getValueTranslationFunction());
 return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
 public PCollection<KV<K, V>> expand(PCollection<V> in) {
  PCollection<KV<K, V>> result =
    in.apply(
      "AddKeys",
      MapElements.via(
        new SimpleFunction<V, KV<K, V>>() {
         @Override
         public KV<K, V> apply(V element) {
          return KV.of(fn.apply(element), element);
         }
        }));

  try {
   Coder<K> keyCoder;
   CoderRegistry coderRegistry = in.getPipeline().getCoderRegistry();
   if (keyClass == null) {
    keyCoder = coderRegistry.getOutputCoder(fn, in.getCoder());
   } else {
    keyCoder = coderRegistry.getCoder(TypeDescriptor.of(keyClass));
   }
   // TODO: Remove when we can set the coder inference context.
   result.setCoder(KvCoder.of(keyCoder, in.getCoder()));
  } catch (CannotProvideCoderException exc) {
   // let lazy coder inference have a try
  }

  return result;
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-hadoop-format

@Override
public PDone expand(PCollection<KV<KeyT, ValueT>> input) {
 // streamed pipeline must have defined configuration transformation
 if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)
   || !input.getWindowingStrategy().equals(WindowingStrategy.globalDefault())) {
  checkArgument(
    configTransform != null,
    "Writing of unbounded data can be processed only with configuration transformation provider. See %s.withConfigurationTransform()",
    Write.class);
 }
 verifyInputWindowing(input);
 TypeDescriptor<Configuration> configType = new TypeDescriptor<Configuration>() {};
 input
   .getPipeline()
   .getCoderRegistry()
   .registerCoderForType(configType, new ConfigurationCoder());
 PCollectionView<Configuration> configView = createConfigurationView(input);
 return processJob(input, configView);
}

代码示例来源:origin: org.apache.beam/beam-runners-flink_2.10-examples

public static void main(String[] args) throws Exception {
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

  options.setRunner(FlinkRunner.class);

  Pipeline pipeline = Pipeline.create(options);
  pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));

  pipeline
    .apply(new ReadDocuments(listInputDocuments(options)))
    .apply(new ComputeTfIdf())
    .apply(new WriteTfIdf(options.getOutput()));

  pipeline.run();
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
public PCollection<T> expand(PCollection<String> input) {
 final Coder<T> coder =
   Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
 final SerializableFunction<GenericRecord, T> parseFn = getParseFn();
 final SerializableFunction<String, FileBasedSource<T>> createSource =
   new CreateParseSourceFn<>(parseFn, coder);
 return input
   .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
   .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
   .apply(
     "Parse all via FileBasedSource",
     new ReadAllViaFileBasedSource<>(getDesiredBundleSizeBytes(), createSource, coder));
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

context.getInput(primitiveTransform).getPipeline().getCoderRegistry(),
context.getInput(primitiveTransform).getCoder(),
context.getInput(primitiveTransform).getWindowingStrategy());

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
public PCollection<T> expand(PBegin input) {
 checkNotNull(getFilepattern(), "filepattern");
 Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
 if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
  return input.apply(
    org.apache.beam.sdk.io.Read.from(
      AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
 }
 // All other cases go through ParseAllGenericRecords.
 return input
   .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
   .apply(
     "Via ParseAll",
     parseAllGenericRecords(getParseFn())
       .withCoder(coder)
       .withMatchConfiguration(getMatchConfiguration()));
}

代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java

Coder<RestrictionT> restrictionCoder =
  DoFnInvokers.invokerFor(doFn)
    .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
Coder<KV<InputT, RestrictionT>> splitCoder = KvCoder.of(input.getCoder(), restrictionCoder);

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
 SchemaRegistry schemaRegistry = input.getPipeline().getSchemaRegistry();
 CoderRegistry registry = input.getPipeline().getCoderRegistry();
 finishSpecifyingStateSpecs(fn, registry, input.getCoder());
 TupleTag<OutputT> mainOutput = new TupleTag<>(MAIN_OUTPUT_TAG);
 PCollection<OutputT> res =
   input.apply(withOutputTags(mainOutput, TupleTagList.empty())).get(mainOutput);
 try {
  res.setSchema(
    schemaRegistry.getSchema(getFn().getOutputTypeDescriptor()),
    schemaRegistry.getToRowFunction(getFn().getOutputTypeDescriptor()),
    schemaRegistry.getFromRowFunction(getFn().getOutputTypeDescriptor()));
 } catch (NoSuchSchemaException e) {
  try {
   res.setCoder(
     registry.getCoder(
       getFn().getOutputTypeDescriptor(),
       getFn().getInputTypeDescriptor(),
       ((PCollection<InputT>) input).getCoder()));
  } catch (CannotProvideCoderException e2) {
   // Ignore and leave coder unset.
  }
 }
 return res;
}

相关文章