com.hazelcast.jet.pipeline.Pipeline类的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(234)

本文整理了Java中com.hazelcast.jet.pipeline.Pipeline类的一些代码示例,展示了Pipeline类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Pipeline类的具体详情如下:
包路径:com.hazelcast.jet.pipeline.Pipeline
类名称:Pipeline

Pipeline介绍

[英]Models a distributed computation job using an analogy with a system of interconnected water pipes. The basic element is a stage which can be attached to one or more other stages. A stage accepts the data coming from its upstream stages, transforms it, and directs the resulting data to its downstream stages.

The Pipeline object is a container of all the stages defined on a pipeline: the source stages obtained directly from it by calling #drawFrom(BatchSource) as well as all the stages attached (directly or indirectly) to them.

Note that there is no simple one-to-one correspondence between pipeline stages and Core API's DAG vertices. Some stages map to several vertices (e.g., grouping and co-grouping are implemented as a cascade of two vertices) and some stages may be merged with others into a single vertex (e.g., a cascade of map/filter/flatMap stages can be fused into one vertex).
[中]使用与互连水管系统的类比来模拟分布式计算作业。基本元素是一个可连接到一个或多个其他阶段的“阶段”。阶段接受来自其上游阶段的数据,对其进行转换,并将生成的数据定向到其下游阶段。
Pipeline对象是管道上定义的所有阶段的容器:通过调用#drawFrom(BatchSource)直接从中获得的源阶段,以及(直接或间接)附加到它们的所有阶段。
请注意,管道阶段和核心API的DAG顶点之间没有简单的一对一对应关系。一些阶段映射到多个顶点(例如,分组和联合分组实现为两个顶点的级联),一些阶段可以与其他阶段合并到单个顶点(例如,映射/过滤/平面映射阶段的级联可以融合到一个顶点)。

代码示例

代码示例来源:origin: hazelcast/hazelcast-jet-demos

public static Pipeline build(String bootstrapServers) {
  Properties properties = new Properties();
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  Pipeline pipeline = Pipeline.create();
  pipeline
      .drawFrom(KafkaSources.kafka(properties, Constants.TOPIC_NAME_PRECIOUS))
      .drainTo(Sinks.map(Constants.IMAP_NAME_PRECIOUS));
  return pipeline;
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Creates and returns an executable job based on the supplied pipeline.
 * Jet will asynchronously start executing the job.
 */
@Nonnull
default Job newJob(@Nonnull Pipeline pipeline) {
  return newJob(pipeline.toDag());
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

/**
 * This will take the contents of source map and writes it into the sink map.
 */
private static Pipeline mapSourceAndSink(String sourceMapName, String sinkMapName) {
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(Sources.map(sourceMapName))
      .drainTo(Sinks.map(sinkMapName));
  return pipeline;
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Creates and returns a Jet job based on the supplied pipeline and job
 * configuration. Jet will asynchronously start executing the job.
 *
 * <p>If the name in the JobConfig is non-null, Jet checks if there is an
 * active job with equal name, in which case it throws {@link
 * JobAlreadyExistsException}. Job is active if it is running,
 * suspended or waiting to be run; that is it has not completed or failed.
 * Thus there can be at most one active job with a given name at a time and
 * you can re-use the job name after the previous job completed.
 *
 * <p>See also {@link #newJobIfAbsent}.
 *
 * @throws JobAlreadyExistsException if there is an active job with
 *      an equal name
 */
@Nonnull
default Job newJob(@Nonnull Pipeline pipeline, @Nonnull JobConfig config) {
  return newJob(pipeline.toDag(), config);
}

代码示例来源:origin: hazelcast/hazelcast-jet-demos

public static Pipeline build() {
  Pipeline p = Pipeline.create();
  // Palladium and Platinum only
  p.drawFrom(Sources.<String, Object>mapJournal(
      Constants.IMAP_NAME_PRECIOUS, JournalInitialPosition.START_FROM_OLDEST)
  ).map(e -> e.getKey() + "==" + e.getValue())
   .filter(str -> str.toLowerCase().startsWith("p"))
   .drainTo(Sinks.logger())
  ;
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Creates and returns a Jet job based on the supplied pipeline and job
 * configuration. Jet will asynchronously start executing the job.
 *
 * <p>If the name in the JobConfig is non-null, Jet checks if there is an
 * active job with equal name. If there is, it will join that job instead
 * of submitting a new one. Job is active if it is running, suspended or
 * waiting to be run; that is it has not completed or failed. In other
 * words, this method ensures that the job with this name is running and is
 * not running multiple times in parallel.
 *
 * <p>This method is useful for microservices deployment when each package
 * contains a jet member and the job and you want the job to run only once.
 * But if the job is a batch job and runs very quickly, it can happen that
 * it executes multiple times, because the job name can be reused after a
 * previous execution completed.
 *
 * <p>If the job name is null, a new job is always submitted.
 *
 * <p>See also {@link #newJob}.
 */
@Nonnull
default Job newJobIfAbsent(@Nonnull Pipeline pipeline, @Nonnull JobConfig config) {
  return newJobIfAbsent(pipeline.toDag(), config);
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(KafkaSources.kafka(brokerProperties(), "t1", "t2"))
   .drainTo(Sinks.map(SINK_NAME));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<String, User>map(MAP_NAME))
   .map(Map.Entry::getValue)
   .drainTo(AvroSinks.files(DIRECTORY_NAME, AvroSink::schemaForUser, User.class));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(KafkaSources.kafka(brokerProperties(), TOPIC))
   .drainTo(Sinks.logger());
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.files(getBooksPath()))
   .filter(line -> line.startsWith("The "))
   .drainTo(buildTopicSink());
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline(String connectionUrl) {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.jdbc(connectionUrl,
      "SELECT * FROM " + TABLE_NAME,
      resultSet -> new User(resultSet.getInt(1), resultSet.getString(2))))
   .map(user -> Util.entry(user.getId(), user))
   .drainTo(Sinks.map(MAP_NAME));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

/**
 * This will take the contents of source map, converts values to the string and
 * suffixes the value with {@code odd} if the key is odd and with {@code event} if the key is even.
 */
private static Pipeline mapWithUpdating(String sourceMapName, String sinkMapName) {
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(Sources.<Integer, Integer>map(sourceMapName))
      .map(e -> entry(e.getKey(), String.valueOf(e.getValue())))
      .drainTo(
          Sinks.mapWithUpdating(
              sinkMapName,
              (oldValue, item) ->
                  item.getKey() % 2 == 0
                      ? oldValue + "-even"
                      : oldValue + "-odd"
          )
      );
  return pipeline;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline(String connectionUrl) {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<Integer, User>map(MAP_NAME))
   .map(Map.Entry::getValue)
   .drainTo(Sinks.jdbc("INSERT INTO " + TABLE_NAME + "(id, name) VALUES(?, ?)",
       connectionUrl,
       (stmt, user) -> {
         // Bind the values from the stream item to a PreparedStatement created from
         // the above query.
         stmt.setInt(1, user.getId());
         stmt.setString(2, user.getName());
       }));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

/**
 * This will take the contents of source map, maps all keys to a key called {@code sum }
 * and write it into sink map using an merge function which merges the map values by adding
 * old value and new value.
 */
private static Pipeline mapWithMerging(String sourceMapName, String sinkMapName) {
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(Sources.<Integer, Integer>map(sourceMapName))
      .map(e -> entry("sum", e.getValue()))
      .drainTo(
          Sinks.mapWithMerging(
              sinkMapName,
              (oldValue, newValue) -> oldValue + newValue
          )
      );
  return pipeline;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pattern delimiter = Pattern.compile("\\W+");
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<Long, String>map(BOOK_LINES))
   .flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowerCase())))
   .filter(word -> !word.isEmpty())
   .groupingKey(wholeItem())
   .aggregate(counting())
   .drainTo(Sinks.map(COUNTS));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static Pipeline buildPipeline(String sourceName, String sinkName) {
  Pattern pattern = Pattern.compile("\\W+");
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(Sources.<Integer, String>map(sourceName))
      .flatMap(e -> Traversers.traverseArray(pattern.split(e.getValue().toLowerCase()))
                  .filter(w -> !w.isEmpty()))
      .groupingKey(wholeItem())
      .aggregate(counting())
      .drainTo(Sinks.map(sinkName));
  return pipeline;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline(JobConf jobConfig) {
  Pipeline p = Pipeline.create();
  p.drawFrom(HdfsSources.<AvroWrapper<User>, NullWritable>hdfs(jobConfig))
   .filter(entry -> entry.getKey().datum().get(3).equals(Boolean.TRUE))
   .peek(entry -> entry.getKey().datum().toString())
   .drainTo(HdfsSinks.hdfs(jobConfig));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

/**
 * This will take the contents of source map and apply entry processor to
 * increment the values by 5.
 */
private static Pipeline mapWithEntryProcessor(String sourceMapName, String sinkMapName) {
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(Sources.<Integer, Integer>map(sourceMapName))
      .drainTo(
          Sinks.mapWithEntryProcessor(
              sinkMapName,
              entryKey(),
              item -> new IncrementEntryProcessor(5)
          )
      );
  return pipeline;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

@RequestMapping("/submitJob")
public void submitJob() {
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(CustomSourceP.customSource())
      .drainTo(Sinks.logger());
  JobConfig jobConfig = new JobConfig()
      .addClass(SpringBootSample.class)
      .addClass(CustomSourceP.class);
  instance.newJob(pipeline, jobConfig).join();
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
      DistributedPredicate.alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
   .groupingKey(Trade::getTicker)
   .rollingAggregate(summingLong(Trade::getPrice))
   .drainTo(Sinks.map(VOLUME_MAP_NAME));
  return p;
}

相关文章