事件源-如何使用kafka+kafkastreams api查询某些实体状态的示例

vxbzzdmp  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(419)

我正在使用kafka实现一个基于事件源的体系结构。
假设我以json格式存储事件:

{"name": "ProductAdded", "productId":"1", quantity=3, dateAdded="2017-04-04" }

我想实现一个查询来获取productid=x的产品在某个日期的数量。
您能用spring-kafka-kstreams展示这个查询的近似实现吗?
更新:我使用springkafkakstreams对此做了一点改进,但是我得到了一个反序列化错误。
这是我的Spring云流Kafka制作人:

public interface ProductProducer{

    final String OUTPUT = "productsOut";

    @Output(ProductProducer.OUTPUT)
    MessageChannel output();

}

配置:

spring:
  application:
    name: product-generator-service
  cloud:
    stream:
      kafka:
        binder:
          brokers:
          - kafka
          zk-nodes:
          - kafka
        bindings:
          productsOut:
            producer:
              sync: true
      bindings:
        productsOut: 
          destination: orders
          content-type: application/json

我使用以下代码发送一条消息,将Map正确序列化为json对象:

Map<String, Object> event = new HashMap<>();
event.put("name", "ProductCreated");
event.put("productId", product.getId());
event.put("quantity", product.getQuantity());
event.put("dateAdded", new Date());
        productProducer.output().send(MessageBuilder.withPayload(event).build(), 500);
``` `MessageBuilder.withPayload(event).build()` ->  `GenericMessage [payload={quantity=1, productId=1, name=ProductCreated, dateAdded="xx"}, headers={id=fc531176-e3e9-61b8-40e3-08074fabee4d, timestamp=1499845483095}]` 在productservice应用程序中,我可以使用spring云流监听器读取以下消息:

@Component
public class ProductListener{

@StreamListener(ProductConsumer.INPUT)
public void handleProduct(Map<String, Object> event){
但是,对于kstream,我得到了一个反序列化错误:

@Configuration
public class KStreamsConfig {

private static final String STREAMING_TOPIC1 = "orders";

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-kstream");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    return new StreamsConfig(props);
}

@Bean
public FactoryBean<KStreamBuilder> myKStreamBuilder(StreamsConfig streamsConfig) {
    return new KStreamBuilderFactoryBean(streamsConfig);
}

@Bean
public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder) {

    Serde<Integer> integerSerde = Serdes.Integer();
    final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
    final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
    final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

    KStream<Integer, JsonNode> stream = kStreamBuilder.stream(null, integerSerde, jsonSerde, STREAMING_TOPIC1);
    stream.print();
    return stream;
}

}

例外情况:

org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null')
at [Source: [B@288e4e9a; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null')
at [Source: [B@288e4e9a; line: 1, column: 4]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3528)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:30)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:46)
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

更新2:
为了找出kstream的内容,我将键和值都改为字符串反序列化程序,这是正在打印的内容:

KStream<Integer, String> stream = kStreamBuilder.stream(null, integerSerde, stringSerde, STREAMING_TOPIC1);

打印值:

[KSTREAM-SOURCE-0000000000]: null , �contentType

为什么我没有得到json字符串?
更新3:我修复了反序列化问题,原因是消息生产者(springcloudstream)默认情况下添加了一些头作为有效负载的一部分。我只需禁用此标头包含即可开始在kafka流中正确接收消息:

spring:
application:
name: product-service
cloud:
stream:
kafka:
binder:
brokers:
- kafka
zk-nodes:
- kafka
bindings:
productsOut:
producer:
sync: true
bindings:
productsIn:
group: product-service
destination: orders
consumer:
max-attempts: 5
header-mode: raw
productsOut:
destination: orders
content-type: application/json
producer:
header-mode: raw

kstream定义:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);

输出:

[KSTREAM-SOURCE-0000000000]: null , {"quantity":0,"productId":0,"name":"ProductCreated","dateAdded":1499930385450}

现在一切都设置正确了:如何实现像我需要的那样的交互式查询?->获取某个日期内productid=x的产品的数量
a9wyjsp7

a9wyjsp71#

我通过混合使用spring云流(生成消息)和spring kafka来处理kafkastreams并实现交互式查询(重要提示:注意问题更新3:能够将两者结合起来):
Kafka流配置:

@Configuration
public class KStreamsConfig {

    private static final String STREAMING_TOPIC1 = "orders";

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-streams");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);
    }

    @Bean
    public KStreamBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
        return new KStreamBuilderFactoryBean(streamsConfig);
    }

    @Bean
    public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {

        Serde<Integer> integerSerde = Serdes.Integer();
        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);

        stream.map( (key, value) -> {
            return new KeyValue<>(value.get("productId").asInt(), value.get("quantity").asInt());
        }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");

        stream.print();
        return stream;
    }

}

注意我是如何生成ktable存储的 ProductsStock 稍后我将在服务中查询。
产品服务:

@Autowired
private KStreamBuilderFactoryBean kStreamBuilderFactoryBean;

@Override
    public Integer getProductStock(Integer id) {
        KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
        ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
        streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
        return keyValueStore.get(id);
}
h9vpoimq

h9vpoimq2#

即将发布的1.3.0.m1版本的springcloudstreamkafka绑定器将支持kstream绑定。有一个公关,你可以跟踪这一举措的进展。
下面是一个使用kstream绑定器的更通用的示例(wordcount):wordcount示例使用spring云流支持kafka流
有了这个,你可以通过以下方式实现你想要的。
这个streamlistener方法将监听一个kafka主题,并在最后30秒的时间窗口内,以id等于123的产品计数写入另一个主题。

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class ProductCountApplication {

  public static final int = 123;

  @StreamListener("input")
  @SendTo("output")
  public KStream<?, String> process(KStream<?, Product> input) {

        return input
                .filter((key, product) -> product.getID() == PRODUCT_ID)
                .map((k,v) -> new KeyValue<>(v, v))
                .groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
                .count(TimeWindows.of(30000), "product-store")
                .toStream()
                .map((w,c) -> new KeyValue<>(null, "Product with id 123 count: " + c));
  }

}

下面是使用的application.yml:

spring.cloud.stream.kstream.binder.streamConfiguration:
  key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde # Use a native Kafka Serde for the key
  value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde # Use a native Kafka Serde for the value
spring.cloud.stream.bindings.output.producer:
  headerMode: raw # Incoming data has no embedded headers
  useNativeEncoding: true # Write data using the native Serde
spring.cloud.stream.bindings.input.consumer:
  headerMode: raw # Outbound data has no embedded headers

运行程序时,需要传入输入/输出目标(主题):

--spring.cloud.stream.bindings.input.destination=products 
--spring.cloud.stream.bindings.output.destination=counts

相关问题