com.hazelcast.jet.Util.mapPutEvents()方法的使用及代码示例

x33g5p2x  于2022-02-01 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(181)

本文整理了Java中com.hazelcast.jet.Util.mapPutEvents()方法的一些代码示例,展示了Util.mapPutEvents()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Util.mapPutEvents()方法的具体详情如下:
包路径:com.hazelcast.jet.Util
类名称:Util
方法名:mapPutEvents

Util.mapPutEvents介绍

[英]Returns a predicate for Sources#mapJournal and Sources#remoteMapJournal that passes only EntryEventType#ADDED and EntryEventType#UPDATED events.
[中]返回仅传递EntryEventType#ADDED和EntryEventType#UPDATED事件的Sources#mapJournal和Sources#remoteMapJournal的谓词。

代码示例

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a supplier of processors for
 * {@link Sources#mapJournal(String, JournalInitialPosition)} )}.
 */
@Nonnull
public static <K, V> ProcessorMetaSupplier streamMapP(
    @Nonnull String mapName,
    @Nonnull JournalInitialPosition initialPos,
    @Nonnull EventTimePolicy<? super Entry<K, V>> eventTimePolicy
) {
  return streamMapP(mapName, mapPutEvents(), mapEventToEntry(), initialPos, eventTimePolicy);
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Convenience for {@link #mapJournal(String, DistributedPredicate,
 * DistributedFunction, JournalInitialPosition)}
 * which will pass only {@link EntryEventType#ADDED ADDED} and
 * {@link EntryEventType#UPDATED UPDATED} events and will project the
 * event's key and new value into a {@code Map.Entry}.
 */
@Nonnull
public static <K, V> StreamSource<Entry<K, V>> mapJournal(
    @Nonnull String mapName,
    @Nonnull JournalInitialPosition initialPos
) {
  return mapJournal(mapName, mapPutEvents(), mapEventToEntry(), initialPos);
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a supplier of processors for
 * {@link Sources#remoteMapJournal(String, ClientConfig, JournalInitialPosition)}.
 */
@Nonnull
public static <K, V> ProcessorMetaSupplier streamRemoteMapP(
    @Nonnull String mapName,
    @Nonnull ClientConfig clientConfig,
    @Nonnull JournalInitialPosition initialPos,
    @Nonnull EventTimePolicy<? super Entry<K, V>> eventTimePolicy
) {
  return streamRemoteMapP(mapName, clientConfig, mapPutEvents(), mapEventToEntry(), initialPos,
      eventTimePolicy);
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Convenience for {@link #remoteMapJournal(String, ClientConfig,
 * DistributedPredicate, DistributedFunction, JournalInitialPosition)}
 * which will pass only {@link EntryEventType#ADDED ADDED}
 * and {@link EntryEventType#UPDATED UPDATED} events and will
 * project the event's key and new value into a {@code Map.Entry}.
 */
@Nonnull
public static <K, V> StreamSource<Entry<K, V>> remoteMapJournal(
    @Nonnull String mapName,
    @Nonnull ClientConfig clientConfig,
    @Nonnull JournalInitialPosition initialPos
) {
  return remoteMapJournal(mapName, clientConfig, mapPutEvents(), mapEventToEntry(), initialPos);
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Convenience for {@link #mapJournal(IMap, DistributedPredicate,
 * DistributedFunction, JournalInitialPosition)}
 * which will pass only {@link EntryEventType#ADDED
 * ADDED} and {@link EntryEventType#UPDATED UPDATED}
 * events and will project the event's key and new value into a {@code
 * Map.Entry}.
 * <p>
 * <strong>NOTE:</strong> Jet only remembers the name of the map you supply
 * and acquires a map with that name on the local cluster. If you supply a
 * map instance from another cluster, no error will be thrown to indicate
 * this.
 */
@Nonnull
public static <K, V> StreamSource<Entry<K, V>> mapJournal(
    @Nonnull IMap<? extends K, ? extends V> map,
    @Nonnull JournalInitialPosition initialPos
) {
  return mapJournal(map.getName(), mapPutEvents(), mapEventToEntry(), initialPos);
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline aggregate() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
      mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
   .addTimestamps(pv -> pv.timestamp(), 100)
   .window(sliding(10, 1))
   .aggregate(counting())
   .drainTo(Sinks.logger());
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline coGroup() {
  Pipeline p = Pipeline.create();
  StreamStageWithKey<PageVisit, Integer> pageVisits = p
      .drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
          mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
      .addTimestamps(pv -> pv.timestamp(), 100)
      .groupingKey(pv -> pv.userId());
  StreamStageWithKey<Payment, Integer> payments = p
      .drawFrom(Sources.<Payment, Integer, Payment>mapJournal(PAYMENT,
          mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
      .addTimestamps(pm -> pm.timestamp(), 100)
      .groupingKey(pm -> pm.userId());
  StreamStageWithKey<AddToCart, Integer> addToCarts = p
      .drawFrom(Sources.<AddToCart, Integer, AddToCart>mapJournal(ADD_TO_CART,
          mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
      .addTimestamps(atc -> atc.timestamp(), 100)
      .groupingKey(atc -> atc.userId());
  StageWithKeyAndWindow<PageVisit, Integer> windowStage = pageVisits.window(sliding(10, 1));
  StreamStage<TimestampedEntry<Integer, Tuple3<List<PageVisit>, List<AddToCart>, List<Payment>>>> coGrouped =
      windowStage.aggregate3(toList(), addToCarts, toList(), payments, toList());
  coGrouped.drainTo(Sinks.logger());
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline groupAndAggregate() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
      mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
   .addTimestamps(pv -> pv.timestamp(), 100)
   .window(sliding(10, 1))
   .groupingKey(pv -> pv.userId())
   .aggregate(toList())
   .drainTo(Sinks.logger());
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline coGroupWithBuilder() {
  Pipeline p = Pipeline.create();
  StreamStageWithKey<PageVisit, Integer> pageVisits = p
      .drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
          mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
      .addTimestamps(pv -> pv.timestamp(), 100)
      .groupingKey(pv -> pv.userId());
  StreamStageWithKey<AddToCart, Integer> addToCarts = p
      .drawFrom(Sources.<AddToCart, Integer, AddToCart>mapJournal(ADD_TO_CART,
          mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
      .addTimestamps(atc -> atc.timestamp(), 100)
      .groupingKey(atc -> atc.userId());
  StreamStageWithKey<Payment, Integer> payments = p
      .drawFrom(Sources.<Payment, Integer, Payment>mapJournal(PAYMENT,
          mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
      .addTimestamps(pm -> pm.timestamp(), 100)
      .groupingKey(pm -> pm.userId());
  StageWithKeyAndWindow<PageVisit, Integer> windowStage = pageVisits.window(sliding(10, 1));
  WindowGroupAggregateBuilder<Integer, List<PageVisit>> builder = windowStage.aggregateBuilder(toList());
  Tag<List<PageVisit>> pageVisitTag = builder.tag0();
  Tag<List<AddToCart>> addToCartTag = builder.add(addToCarts, toList());
  Tag<List<Payment>> paymentTag = builder.add(payments, toList());
  StreamStage<TimestampedEntry<Integer, Tuple3<List<PageVisit>, List<AddToCart>, List<Payment>>>> coGrouped =
      builder.build((winStart, winEnd, key, r) -> new TimestampedEntry<>(
          winEnd, key, tuple3(r.get(pageVisitTag), r.get(addToCartTag), r.get(paymentTag))));
  coGrouped.drainTo(Sinks.logger());
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<PriceUpdateEvent, String, Tuple2<Integer, Long>>mapJournal(
      "prices",
      mapPutEvents(),
      e -> new PriceUpdateEvent(e.getKey(), e.getNewValue().f0(), e.getNewValue().f1()),
      START_FROM_CURRENT
  ))
   .addTimestamps(PriceUpdateEvent::timestamp, LAG_SECONDS * 1000)
   .setLocalParallelism(1)
   .groupingKey(PriceUpdateEvent::ticker)
   .window(WindowDefinition.sliding(WINDOW_SIZE_SECONDS * 1000, 1000))
   .aggregate(AggregateOperations.counting())
   .drainTo(Sinks.logger());
  return p;
}

相关文章