本文整理了Java中org.apache.kafka.streams.kstream.Window
类的一些代码示例,展示了Window
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Window
类的具体详情如下:
包路径:org.apache.kafka.streams.kstream.Window
类名称:Window
[英]A single window instance, defined by its start and end timestamp. Window is agnostic if start/end boundaries are inclusive or exclusive; this is defined by concrete window implementations.
To specify how Window boundaries are defined use Windows. For time semantics, see TimestampExtractor.
[中]单个窗口实例,由其开始和结束时间戳定义。如果开始/结束边界包含或排除,则窗口是不可知的;这是由具体的窗口实现定义的。
要指定如何定义窗口边界,请使用Windows。有关时间语义,请参阅TimestampExtractor。
代码示例来源:origin: confluentinc/kafka-streams-examples
.map((key, value) -> new KeyValue<>(key.key() + "@" + key.window().start() + "->" + key.window().end(), value))
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public boolean equals(final Object obj) {
if (obj == this)
return true;
if (!(obj instanceof Windowed))
return false;
final Windowed<?> that = (Windowed) obj;
return window.equals(that.window) && key.equals(that.key);
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public int hashCode() {
final long n = ((long) window.hashCode() << 32) | key.hashCode();
return (int) (n % 0xFFFFFFFFL);
}
}
代码示例来源:origin: confluentinc/kafka-streams-examples
private static void consumeOutput(String bootstrapServers, String schemaRegistryUrl) {
final Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
consumerProperties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
"top-articles-lambda-example-consumer");
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final Deserializer<Windowed<String>> windowedDeserializer = WindowedSerdes.timeWindowedSerdeFrom(String.class).deserializer();
final KafkaConsumer<Windowed<String>, String> consumer = new KafkaConsumer<>(consumerProperties,
windowedDeserializer,
Serdes.String().deserializer());
consumer.subscribe(Collections.singleton(TopArticlesLambdaExample.TOP_NEWS_PER_INDUSTRY_TOPIC));
while (true) {
ConsumerRecords<Windowed<String>, String> consumerRecords = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<Windowed<String>, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key().key() + "@" + consumerRecord.key().window().start() + "=" + consumerRecord.value());
}
}
}
代码示例来源:origin: simplesteph/medium-blog-kafka-udemy
private boolean keepCurrentWindow(Windowed<String> window, long advanceMs) {
long now = System.currentTimeMillis();
return window.window().end() > now &&
window.window().end() < now + advanceMs;
}
代码示例来源:origin: org.apache.kafka/kafka-streams
public static <K> byte[] toBinary(final Windowed<K> timeKey,
final Serializer<K> serializer,
final String topic) {
final byte[] bytes = serializer.serialize(topic, timeKey.key());
final ByteBuffer buf = ByteBuffer.allocate(bytes.length + TIMESTAMP_SIZE);
buf.put(bytes);
buf.putLong(timeKey.window().start());
return buf.array();
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public long time(final ProcessorContext context, final K key) {
return key.window().end();
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public String toString() {
return "[" + key + "@" + window.start() + "/" + window.end() + "]";
}
代码示例来源:origin: org.apache.kafka/kafka-streams
public static <K> Bytes toStoreKeyBinary(final Windowed<K> timeKey,
final int seqnum,
final StateSerdes<K, ?> serdes) {
final byte[] serializedKey = serdes.rawKey(timeKey.key());
return toStoreKeyBinary(serializedKey, timeKey.window().start(), seqnum);
}
代码示例来源:origin: org.apache.kafka/kafka-streams
public static byte[] toBinary(final Windowed<Bytes> sessionKey) {
final byte[] bytes = sessionKey.key().get();
final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
buf.put(bytes);
buf.putLong(sessionKey.window().end());
buf.putLong(sessionKey.window().start());
return buf.array();
}
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
int compare(final Bytes cacheKey, final Windowed<Bytes> storeKey) {
final Bytes storeKeyBytes = WindowKeySchema.toStoreKeyBinary(storeKey.key(), storeKey.window().start(), 0);
return cacheFunction.compareSegmentedKeys(cacheKey, storeKeyBytes);
}
}
代码示例来源:origin: org.apache.kafka/kafka-streams
public static <K> byte[] toBinary(final Windowed<K> sessionKey,
final Serializer<K> serializer,
final String topic) {
final byte[] bytes = serializer.serialize(topic, sessionKey.key());
final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
buf.put(bytes);
buf.putLong(sessionKey.window().end());
buf.putLong(sessionKey.window().start());
return buf.array();
}
代码示例来源:origin: org.apache.kafka/kafka-streams
public static Bytes toStoreKeyBinary(final Windowed<Bytes> timeKey,
final int seqnum) {
final byte[] bytes = timeKey.key().get();
return toStoreKeyBinary(bytes, timeKey.window().start(), seqnum);
}
代码示例来源:origin: org.apache.kafka/kafka-streams
private AGG fetchPrevious(final Bytes rawKey, final Window window) {
try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore.findSessions(rawKey, window.start(), window.end())) {
if (!iterator.hasNext()) {
return null;
}
return serdes.valueFrom(iterator.next().value);
}
}
代码示例来源:origin: timothyrenner/kafka-streams-ex
.map((k,v) ->
KeyValue.pair(k.key(),
Long.toString(k.window().start()) +
"\t" + v.toString()))
.to(Serdes.String(),
代码示例来源:origin: spring-cloud/spring-cloud-stream-samples
@StreamListener("binding2")
@SendTo("singleOutput")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
代码示例来源:origin: ebi-wp/kafka-streams-api-websockets
Serdes.serdeFrom(new MySerde(), new MySerde()),
"data-store")
.toStream((key, value) -> key.key().toString() + " " + key.window().start())
.mapValues((job) -> job.computeAvgTime().toString());
代码示例来源:origin: spring-cloud/spring-cloud-stream-samples
@StreamListener("input")
@SendTo({"output1","output2","output3"})
@SuppressWarnings("unchecked")
public KStream<?, WordCount>[] process(KStream<Object, String> input) {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
}
代码示例来源:origin: org.apache.kafka/kafka-streams
private void maybeForward(final ThreadCache.DirtyEntry entry,
final Bytes key,
final Windowed<K> windowedKey,
final InternalProcessorContext context) {
if (flushListener != null) {
final ProcessorRecordContext current = context.recordContext();
context.setRecordContext(entry.entry().context());
try {
final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null;
flushListener.apply(windowedKey, serdes.valueFrom(entry.newValue()), oldValue);
} finally {
context.setRecordContext(current);
}
}
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-samples
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
}
内容来源于网络,如有侵权,请联系作者删除!