我在sql server中有一个表,我想流到kafka主题,结构如下:
(UserID, ReportID)
此表将持续更改(记录已添加、已插入、无更新)
我想把它转换成这种结构,放到elasticsearch中:
{
"UserID": 1,
"Reports": [1, 2, 3, 4, 5, 6]
}
到目前为止,我看到的例子是日志或click stream,它们在我的案例中不起作用。
这种用例可能吗?我可以一直看着你 UserID
更改和查询数据库,但这似乎是幼稚的,不是最好的方法。
更新
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.util.ArrayList;
import java.util.Properties;
public class MyDemo {
public static void main(String... args) {
System.out.println("Hello KTable!");
final Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<Long, Long> reportPermission = builder.stream(TOPIC);
KTable<Long, ArrayList<Long>> result = reportPermission
.groupByKey()
.aggregate(
new Initializer<ArrayList<Long>>() {
@Override
public ArrayList<Long> apply() {
return null;
}
},
new Aggregator<Long, Long, ArrayList<Long>>() {
@Override
public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) {
aggregate.add(value);
return aggregate;
}
},
new Serde<ArrayList<Long>>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public void close() {}
@Override
public Serializer<ArrayList<Long>> serializer() {
return null;
}
@Override
public Deserializer<ArrayList<Long>> deserializer() {
return null;
}
});
result.to("report-aggregated-topic");
KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static final String TOPIC = "report-permission";
private static final Properties createStreamProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
return props;
}
}
我实际上陷入了聚合阶段,因为我不能为它编写一个合适的serde ArrayList<Long>
(还没有足够的技能),lambdas似乎没有在aggregator上工作-它不知道聚合器是什么类型的 agg
:
KTable<Long, ArrayList<Long>> sample = builder.stream(TOPIC)
.groupByKey()
.aggregate(
() -> new ArrayList<Long>(),
(key, val, agg) -> agg.add(val),
longSerde
);
2条答案
按热度按时间mec1mxoz1#
直接来说,这种方法在sql和kafka流中是不允许的,但是用例是可能的,可以实现如下:
1) 使用solrjapi在sqlserver上编写一个定制的应用程序,每当在sql中执行dml(insert、update、delete等)操作时,它都会命中solr示例。https://wiki.apache.org/solr/solrj
2) usesolrsql数据导入处理程序在sql中发生dml(insert、update、delete等)操作时,sqlserver将自动通知solr。https://wiki.apache.org/solr/dataimporthandler
jgwigjjp2#
您可以使用kafka的connectapi将数据从sqlserver获取到kafka中。我不知道sql server的任何特定连接器,但您可以使用任何基于jdbc的通用连接器:https://www.confluent.io/product/connectors/
要处理数据,可以使用kafka的streams api。你可以简单地
aggregate()
每个用户的所有报表。像这样:有关streams api的更多详细信息,请查看文档:https://docs.confluent.io/current/streams/index.html
请注意,您需要确保报告列表不会无限增长。kafka有一些(可配置的)最大消息大小,整个列表将包含在单个消息中。因此,您应该估计最大消息大小并应用相应的配置(->
max.message.bytes
)在投入生产之前。在网页上 checkout 配置:http://kafka.apache.org/documentation/#brokerconfigs最后,使用connectapi将数据推入ElasticSearch。有多种不同的连接器可用(我当然会推荐合流的一个)。有关connect api的更多详细信息:https://docs.confluent.io/current/connect/userguide.html