org.apache.kafka.streams.kstream.Materialized.with()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.5k)|赞(0)|评价(0)|浏览(96)

本文整理了Java中org.apache.kafka.streams.kstream.Materialized.with()方法的一些代码示例,展示了Materialized.with()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Materialized.with()方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.Materialized
类名称:Materialized
方法名:with

Materialized.with介绍

[英]Materialize a StateStore with the provided key and value Serdes. An internal name will be used for the store.
[中]使用提供的键和值序列实现状态存储。商店将使用内部名称。

代码示例

代码示例来源:origin: confluentinc/kafka-streams-examples

},
Materialized.with(windowedStringSerde, new PriorityQueueSerde<>(comparator, valueAvroSerde))
);

代码示例来源:origin: rayokota/kafka-graphs

public KGraph<K, VV, EV> subgraph(Predicate<K, VV> vertexFilter, Predicate<Edge<K>, EV> edgeFilter) {
  KTable<K, VV> filteredVertices = vertices.filter(vertexFilter);
  KTable<Edge<K>, EV> remainingEdges = edgesBySource()
    .join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
    .map((k, edge) -> new KeyValue<>(edge.target(), edge))
    .join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
    .map((k, edge) -> new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()))
    .groupByKey(Serialized.with(new KryoSerde<>(), edgeValueSerde()))
    .reduce((v1, v2) -> v2, Materialized.with(new KryoSerde<>(), edgeValueSerde()));
  KTable<Edge<K>, EV> filteredEdges = remainingEdges
    .filter(edgeFilter, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
  return new KGraph<>(filteredVertices, filteredEdges, serialized);
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                      final Aggregator<? super K, ? super V, T> aggregator,
                      final Merger<? super K, T> sessionMerger) {
  return aggregate(initializer, aggregator, sessionMerger, Materialized.with(keySerde, null));
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                 final Aggregator<? super K, ? super V, T> adder,
                 final Aggregator<? super K, ? super V, T> subtractor) {
  return aggregate(initializer, adder, subtractor, Materialized.with(keySerde, null));
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public KTable<K, V> reduce(final Reducer<V> adder,
              final Reducer<V> subtractor) {
  return reduce(adder, subtractor, Materialized.with(keySerde, valSerde));
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                  final Aggregator<? super K, ? super V, VR> aggregator) {
  return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                       final Aggregator<? super K, ? super V, VR> aggregator) {
  return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
  return reduce(reducer, Materialized.with(keySerde, valSerde));
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
  return reduce(reducer, Materialized.with(keySerde, valSerde));
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public KTable<K, V> reduce(final Reducer<V> reducer) {
  return reduce(reducer, Materialized.with(keySerde, valSerde));
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public KTable<K, Long> count() {
  return count(Materialized.with(keySerde, Serdes.Long()));
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public KTable<Windowed<K>, Long> count() {
  return doCount(Materialized.with(keySerde, Serdes.Long()));
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public KTable<Windowed<K>, Long> count() {
  return doCount(Materialized.with(keySerde, Serdes.Long()));
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public KTable<K, Long> count() {
  return doCount(Materialized.with(keySerde, Serdes.Long()));
}

代码示例来源:origin: confluentinc/kafka-streams-examples

total.getValue() + order.getQuantity() * order.getPrice()),
Materialized.with(null, Schemas.ORDER_VALUE_SERDE));

代码示例来源:origin: rayokota/kafka-graphs

private KTable<K, Iterable<EdgeWithValue<K, EV>>> edgesGroupedBy(Function<Edge<K>, K> fun) {
  return edges()
    .groupBy(new GroupEdges(fun), Serialized.with(keySerde(), new KryoSerde<>()))
    .aggregate(
      HashSet::new,
      (aggKey, value, aggregate) -> {
        ((Set<EdgeWithValue<K, EV>>) aggregate).add(value);
        return aggregate;
      },
      (aggKey, value, aggregate) -> {
        ((Set<EdgeWithValue<K, EV>>) aggregate).remove(value);
        return aggregate;
      },
      Materialized.with(keySerde(), new KryoSerde<>()));
}

代码示例来源:origin: rayokota/kafka-graphs

public <T> KGraph<K, VV, EV> joinWithVertices(KTable<K, T> inputDataSet,
                       final VertexJoinFunction<VV, T> vertexJoinFunction) {
  KTable<K, VV> resultedVertices = vertices()
    .leftJoin(inputDataSet,
      new ApplyLeftJoinToVertexValues<>(vertexJoinFunction), Materialized.with(keySerde(), vertexValueSerde()));
  return new KGraph<>(resultedVertices, edges, serialized);
}

代码示例来源:origin: rayokota/kafka-graphs

public <T> KGraph<K, VV, EV> joinWithEdges(KTable<Edge<K>, T> inputDataSet,
                      final EdgeJoinFunction<EV, T> edgeJoinFunction) {
  KTable<Edge<K>, EV> resultedEdges = edges()
    .leftJoin(inputDataSet,
      new ApplyLeftJoinToEdgeValues<>(edgeJoinFunction), Materialized.with(new KryoSerde<>(), edgeValueSerde()));
  return new KGraph<>(vertices, resultedEdges, serialized);
}

代码示例来源:origin: confluentinc/kafka-streams-examples

(id, result, total) -> PASS.equals(result.getValidationResult()) ? total + 1 : total,
Materialized.with(null, Serdes.Long())

代码示例来源:origin: rayokota/kafka-graphs

public static <K, VV, EV> KGraph<K, VV, EV> fromEdges(
  KTable<Edge<K>, EV> edges,
  ValueMapper<K, VV> vertexValueInitializer,
  GraphSerialized<K, VV, EV> serialized) {
  KTable<K, VV> vertices = edges
    .toStream()
    .flatMap(new EmitSrcAndTargetAsTuple1<>(vertexValueInitializer))
    .groupByKey(Serialized.with(serialized.keySerde(), new KryoSerde<>()))
    .<VV>reduce((v1, v2) -> v2,
      Materialized.with(serialized.keySerde(), serialized.vertexValueSerde()));
  return new KGraph<>(vertices, edges, serialized);
}

相关文章