本文整理了Java中org.apache.kafka.streams.kstream.KTable.mapValues()
方法的一些代码示例,展示了KTable.mapValues()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KTable.mapValues()
方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.KTable
类名称:KTable
方法名:mapValues
[英]Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store. For each KTable update the provided ValueMapper is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the result KTable. Thus, an input record can be transformed into an output record . This is a stateless record-by-record operation.
The example below counts the number of token of the value string.
KTable inputTable = builder.table("topic");});
}
This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result KTable.
Note that mapValues for a changelog stream works differently than KStream#mapValues(ValueMapper), because KeyValue with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the result KTable.
[中]通过使用默认序列化程序、反序列化程序和状态存储,将此KTable中每个记录的值转换为新KTable中的新值(可能具有新类型),从而创建一个新的KTable。对于每个KTable更新,提供的ValueMapper将应用于更新记录的值,并为其计算新值,从而生成结果KTable的更新记录。因此,可以将输入记录转换为输出记录。这是一个逐记录的无状态操作。
下面的示例统计值字符串的标记数。
KTable inputTable = builder.table("topic");});
}
此操作保留与密钥相关的数据同位。因此,如果对结果KTable应用基于键的运算符(如join),则需要no内部数据重新分布。
请注意,changelog流的mapValues与KStream#mapValues(ValueMapper)的工作方式不同,因为带有null值的KeyValue(所谓的墓碑记录)具有删除语义。因此,对于墓碑,不评估提供的值映射器,而是直接转发墓碑记录以删除结果表中的相应记录。
代码示例来源:origin: confluentinc/kafka-streams-examples
final KTable<String, String> userRegions = userProfiles.mapValues(record ->
record.get("region").toString());
代码示例来源:origin: confluentinc/kafka-streams-examples
final KTable<String, String> userRegions = userProfiles.mapValues(new ValueMapper<GenericRecord, String>() {
@Override
public String apply(final GenericRecord record) {
代码示例来源:origin: confluentinc/kafka-streams-examples
.mapValues(queue -> {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < topN; i++) {
代码示例来源:origin: SciSpike/kafka-lab
.mapValues(value -> value.toString())
代码示例来源:origin: JohnReedLOL/kafka-streams
KTable<String, String> userRegions = userProfiles.mapValues(record ->
record.get("region").toString());
代码示例来源:origin: JohnReedLOL/kafka-streams
.mapValues(queue -> {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < topN; i++) {
代码示例来源:origin: rayokota/kafka-graphs
public <NV> KGraph<K, VV, NV> mapEdges(ValueMapperWithKey<Edge<K>, EV, NV> mapper, Serde<NV> newEdgeValueSerde) {
KTable<Edge<K>, NV> mappedEdges = edges.mapValues(mapper, Materialized.<Edge<K>, NV, KeyValueStore<Bytes, byte[]>>as(
generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(newEdgeValueSerde));
return new KGraph<>(vertices, mappedEdges,
GraphSerialized.with(keySerde(), vertexValueSerde(), newEdgeValueSerde));
}
代码示例来源:origin: rayokota/kafka-graphs
public <NV> KGraph<K, NV, EV> mapVertices(ValueMapperWithKey<K, VV, NV> mapper, Serde<NV> newVertexValueSerde) {
KTable<K, NV> mappedVertices = vertices.mapValues(mapper, Materialized.<K, NV, KeyValueStore<Bytes, byte[]>>as(
generateStoreName()).withKeySerde(keySerde()).withValueSerde(newVertexValueSerde));
return new KGraph<>(mappedVertices, edges,
GraphSerialized.with(keySerde(), newVertexValueSerde, edgeValueSerde()));
}
代码示例来源:origin: rayokota/kafka-graphs
public KTable<K, EV> reduceOnEdges(Reducer<EV> reducer,
EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
return edgesGroupedByTarget()
.mapValues(v -> {
EV result = null;
for (EdgeWithValue<K, EV> edge : v) {
result = result != null ? reducer.apply(result, edge.value()) : edge.value();
}
return result;
},
Materialized.<K, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(keySerde()).withValueSerde(edgeValueSerde()));
case OUT:
return edgesGroupedBySource()
.mapValues(v -> {
EV result = null;
for (EdgeWithValue<K, EV> edge : v) {
result = result != null ? reducer.apply(result, edge.value()) : edge.value();
}
return result;
},
Materialized.<K, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(keySerde()).withValueSerde(edgeValueSerde()));
case BOTH:
throw new UnsupportedOperationException();
default:
throw new IllegalArgumentException("Illegal edge direction");
}
}
代码示例来源:origin: rayokota/kafka-graphs
Materialized.with(keySerde(), new KryoSerde<>()));
KTable<K, VV> neighborsReducedByTarget = neighborsGroupedByTarget
.mapValues(v -> v.values().stream().reduce(reducer::apply).orElse(null),
Materialized.<K, VV, KeyValueStore<Bytes, byte[]>>as(generateStoreName())
.withKeySerde(keySerde()).withValueSerde(vertexValueSerde()));
Materialized.with(keySerde(), new KryoSerde<>()));
KTable<K, VV> neighborsReducedBySource = neighborsGroupedBySource
.mapValues(v -> v.values().stream().reduce(reducer::apply).orElse(null),
Materialized.<K, VV, KeyValueStore<Bytes, byte[]>>as(generateStoreName())
.withKeySerde(keySerde()).withValueSerde(vertexValueSerde()));
代码示例来源:origin: rayokota/kafka-graphs
joinedGraph.vertices().mapValues((k, v) -> new Tuple2<>(0.0, 0.0));
代码示例来源:origin: rayokota/kafka-graphs
joinedGraph.vertices().mapValues((k, v) -> new Tuple2<>(0.0, k == srcVertexId ? Double.NEGATIVE_INFINITY
: 0.0));
代码示例来源:origin: rayokota/kafka-graphs
joinedGraph.vertices().mapValues((k, v) -> new Tuple2<>(0.0, 0.0));
代码示例来源:origin: rayokota/kafka-graphs
KTable<Long, Long> initialVertices = gridGraph.vertices().mapValues(new InitVerticesFromId<>());
KGraph<Long, Long, Long> graph = new KGraph<>(initialVertices, gridGraph.edges(),
GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()));
代码示例来源:origin: rayokota/kafka-graphs
KTable<Long, Long> initialVertices = gridGraph.vertices().mapValues(new InitVerticesFromId<>());
KGraph<Long, Long, Long> graph = new KGraph<>(initialVertices, gridGraph.edges(),
GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()));
代码示例来源:origin: rayokota/kafka-graphs
.mapValues(v -> v._4, Materialized.as(solutionSetStore));
代码示例来源:origin: rayokota/kafka-graphs
KTable<Long, Double> initialVertices = starGraph.vertices().mapValues(v -> 1.0);
KTable<Edge<Long>, Double> initialEdges = starGraph.edges().mapValues(v -> 1.0);
KGraph<Long, Double, Double> initialGraph =
new KGraph<>(initialVertices, initialEdges, GraphSerialized.with(Serdes.Long(), Serdes.Double(), Serdes
joinedGraph.vertices().mapValues((k, v) -> new Tuple2<>(0.0, 0.0));
代码示例来源:origin: rayokota/kafka-graphs
KTable<Long, Double> initialVertices = completeGraph.vertices().mapValues(v -> 1.0);
KTable<Edge<Long>, Double> initialEdges = completeGraph.edges().mapValues(v -> 1.0);
KGraph<Long, Double, Double> initialGraph =
new KGraph<>(initialVertices, initialEdges, GraphSerialized.with(Serdes.Long(), Serdes.Double(), Serdes
joinedGraph.vertices().mapValues((k, v) -> new Tuple2<>(0.0, 0.0));
代码示例来源:origin: jresoort/kafkastreams-workshop
.windowedBy(TimeWindows.of(300000).advanceBy(60000))
.aggregate(SumCount::new, (key, value, aggregate) -> aggregate.addValue(value.getTemperature()), Materialized.with(Serdes.String(), new JsonSerde<>(SumCount.class)))
.mapValues(SumCount::average, Materialized.with(new WindowedSerde<>(Serdes.String()), Serdes.Double()))
.toStream()
.map(((key, average) -> new KeyValue<>(key.key(), new Average(average, key.window().start(), key.window().start() + 300000))))
代码示例来源:origin: rayokota/kafka-graphs
@Test
public void testJoinWithVertexSet() throws Exception {
Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class,
LongSerializer.class, new Properties()
);
StreamsBuilder builder = new StreamsBuilder();
KTable<Long, Long> vertices =
StreamUtils.tableFromCollection(builder, producerConfig, Serdes.Long(), Serdes.Long(),
TestGraphUtils.getLongLongVertices());
KTable<Edge<Long>, Long> edges =
StreamUtils.tableFromCollection(builder, producerConfig, new KryoSerde<>(), Serdes.Long(),
TestGraphUtils.getLongLongEdges());
KGraph<Long, Long, Long> graph = new KGraph<>(
vertices, edges, GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()));
KGraph<Long, Long, Long> res = graph.joinWithVertices(graph.vertices()
.mapValues(v -> v), new AddValuesMapper());
KTable<Long, Long> data = res.vertices();
startStreams(builder, Serdes.Long(), Serdes.Long());
Thread.sleep(5000);
List<KeyValue<Long, Long>> result = StreamUtils.listFromTable(streams, data);
expectedResult = "1,2\n" +
"2,4\n" +
"3,6\n" +
"4,8\n" +
"5,10\n";
TestUtils.compareResultAsTuples(result, expectedResult);
}
内容来源于网络,如有侵权,请联系作者删除!