kafka streams应用程序将读写分离

gc0ot86w  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(402)

我对Kafka和Kafka溪还很陌生,所以请你容忍我。我想知道我在这条路上走得对不对。
我正在给一个Kafka主题写信,并试图通过rest服务访问数据。原始数据在被访问之前需要进行转换。
到目前为止,我拥有的是一个将原始数据写入主题的生产者。
1.)现在我想要streams应用程序(应该是一个在容器中运行的jar),它只是将数据转换成我想要的形状。遵循物化视图范式。
过于简化的版本1。)

KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> source = 
    builder.stream("my-raw-data-topic");

    KafkaStreams streams = new KafkaStreams(builder,props);
    KTable<String, Long> t =  source.groupByKey().count("My-Table");
    streams.start();

2.)和另一个streams应用程序(应该是一个在容器中运行的jar),它只保存 KTable 作为某种存储库,可以通过 Package rest服务进行访问。
在这里,我有点拘泥于使用api的正确方法。什么是访问和查询 KTable ? 我需要再次将转换拓扑分配给生成器吗?

KStreamBuilder builder = new KStreamBuilder();
KTable table = builder.table("My-Table"); //Casting?
KafkaStreams streams = new KafkaStreams(builder, props);

RestService service = new RestService(table); 
// Use the Table as Repository which is wrapped by a Rest-Service and gets updated reactivly

现在这是伪代码
我走的路对吗?把1.)和2.)分开有意义吗?这是使用流来具体化视图的缩进方式吗?对我来说,通过容器独立地扩展写操作和读操作会有好处,这样我可以看到更多的流量。
在1.)或2.)崩溃时,如何处理ktable的重新填充。这是通过复制到流api完成的,还是我需要通过代码来解决的。比如重置光标和回复事件?

rm5edbpk

rm5edbpk1#

几点意见:
在代码段(1)中,在将生成器交给 KafkaStreams 施工单位:

KafkaStreams streams = new KafkaStreams(builder,props);
// don't modify builder anymore!

您不应该这样做,而是首先指定拓扑,然后创建 KafkaStreams 示例。
把你的申请一分为二。独立缩放这两个部分是有意义的。但总的来说很难说。但是,如果您同时使用这两个主题,第一个主题需要将转换后的日期写入输出主题,第二个主题应该将此输出主题作为表读取( builder.table("output-topic-of-transformation") 满足其余的要求。
要访问ktable的存储,需要通过提供的存储名称获取查询句柄:

ReadOnlyKeyValueStore keyValueStore =
streams.store("My-Table", QueryableStoreTypes.keyValueStore());

详见文件:
http://docs.confluent.io/current/streams/developer-guide.html#interactive-查询

相关问题