本文整理了Java中org.apache.kafka.streams.kstream.Materialized.with()
方法的一些代码示例,展示了Materialized.with()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Materialized.with()
方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.Materialized
类名称:Materialized
方法名:with
[英]Materialize a StateStore with the provided key and value Serdes. An internal name will be used for the store.
[中]使用提供的键和值序列实现状态存储。商店将使用内部名称。
代码示例来源:origin: confluentinc/kafka-streams-examples
},
Materialized.with(windowedStringSerde, new PriorityQueueSerde<>(comparator, valueAvroSerde))
);
代码示例来源:origin: rayokota/kafka-graphs
public KGraph<K, VV, EV> subgraph(Predicate<K, VV> vertexFilter, Predicate<Edge<K>, EV> edgeFilter) {
KTable<K, VV> filteredVertices = vertices.filter(vertexFilter);
KTable<Edge<K>, EV> remainingEdges = edgesBySource()
.join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
.map((k, edge) -> new KeyValue<>(edge.target(), edge))
.join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
.map((k, edge) -> new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()))
.groupByKey(Serialized.with(new KryoSerde<>(), edgeValueSerde()))
.reduce((v1, v2) -> v2, Materialized.with(new KryoSerde<>(), edgeValueSerde()));
KTable<Edge<K>, EV> filteredEdges = remainingEdges
.filter(edgeFilter, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
return new KGraph<>(filteredVertices, filteredEdges, serialized);
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger) {
return aggregate(initializer, aggregator, sessionMerger, Materialized.with(keySerde, null));
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> adder,
final Aggregator<? super K, ? super V, T> subtractor) {
return aggregate(initializer, adder, subtractor, Materialized.with(keySerde, null));
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor) {
return reduce(adder, subtractor, Materialized.with(keySerde, valSerde));
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator) {
return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator) {
return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
return reduce(reducer, Materialized.with(keySerde, valSerde));
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
return reduce(reducer, Materialized.with(keySerde, valSerde));
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public KTable<K, V> reduce(final Reducer<V> reducer) {
return reduce(reducer, Materialized.with(keySerde, valSerde));
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public KTable<K, Long> count() {
return count(Materialized.with(keySerde, Serdes.Long()));
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public KTable<Windowed<K>, Long> count() {
return doCount(Materialized.with(keySerde, Serdes.Long()));
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public KTable<Windowed<K>, Long> count() {
return doCount(Materialized.with(keySerde, Serdes.Long()));
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public KTable<K, Long> count() {
return doCount(Materialized.with(keySerde, Serdes.Long()));
}
代码示例来源:origin: confluentinc/kafka-streams-examples
total.getValue() + order.getQuantity() * order.getPrice()),
Materialized.with(null, Schemas.ORDER_VALUE_SERDE));
代码示例来源:origin: rayokota/kafka-graphs
private KTable<K, Iterable<EdgeWithValue<K, EV>>> edgesGroupedBy(Function<Edge<K>, K> fun) {
return edges()
.groupBy(new GroupEdges(fun), Serialized.with(keySerde(), new KryoSerde<>()))
.aggregate(
HashSet::new,
(aggKey, value, aggregate) -> {
((Set<EdgeWithValue<K, EV>>) aggregate).add(value);
return aggregate;
},
(aggKey, value, aggregate) -> {
((Set<EdgeWithValue<K, EV>>) aggregate).remove(value);
return aggregate;
},
Materialized.with(keySerde(), new KryoSerde<>()));
}
代码示例来源:origin: rayokota/kafka-graphs
public <T> KGraph<K, VV, EV> joinWithVertices(KTable<K, T> inputDataSet,
final VertexJoinFunction<VV, T> vertexJoinFunction) {
KTable<K, VV> resultedVertices = vertices()
.leftJoin(inputDataSet,
new ApplyLeftJoinToVertexValues<>(vertexJoinFunction), Materialized.with(keySerde(), vertexValueSerde()));
return new KGraph<>(resultedVertices, edges, serialized);
}
代码示例来源:origin: rayokota/kafka-graphs
public <T> KGraph<K, VV, EV> joinWithEdges(KTable<Edge<K>, T> inputDataSet,
final EdgeJoinFunction<EV, T> edgeJoinFunction) {
KTable<Edge<K>, EV> resultedEdges = edges()
.leftJoin(inputDataSet,
new ApplyLeftJoinToEdgeValues<>(edgeJoinFunction), Materialized.with(new KryoSerde<>(), edgeValueSerde()));
return new KGraph<>(vertices, resultedEdges, serialized);
}
代码示例来源:origin: confluentinc/kafka-streams-examples
(id, result, total) -> PASS.equals(result.getValidationResult()) ? total + 1 : total,
Materialized.with(null, Serdes.Long())
代码示例来源:origin: rayokota/kafka-graphs
public static <K, VV, EV> KGraph<K, VV, EV> fromEdges(
KTable<Edge<K>, EV> edges,
ValueMapper<K, VV> vertexValueInitializer,
GraphSerialized<K, VV, EV> serialized) {
KTable<K, VV> vertices = edges
.toStream()
.flatMap(new EmitSrcAndTargetAsTuple1<>(vertexValueInitializer))
.groupByKey(Serialized.with(serialized.keySerde(), new KryoSerde<>()))
.<VV>reduce((v1, v2) -> v2,
Materialized.with(serialized.keySerde(), serialized.vertexValueSerde()));
return new KGraph<>(vertices, edges, serialized);
}
内容来源于网络,如有侵权,请联系作者删除!