kafka流表转换

brccelvz  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(407)

我在sql server中有一个表,我想流到kafka主题,结构如下:

(UserID, ReportID)

此表将持续更改(记录已添加、已插入、无更新)
我想把它转换成这种结构,放到elasticsearch中:

{
  "UserID": 1,
  "Reports": [1, 2, 3, 4, 5, 6]
}

到目前为止,我看到的例子是日志或click stream,它们在我的案例中不起作用。
这种用例可能吗?我可以一直看着你 UserID 更改和查询数据库,但这似乎是幼稚的,不是最好的方法。

更新

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.ArrayList;
import java.util.Properties;

public class MyDemo {
  public static void main(String... args) {
    System.out.println("Hello KTable!");

    final Serde<Long> longSerde = Serdes.Long();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<Long, Long> reportPermission = builder.stream(TOPIC);

    KTable<Long, ArrayList<Long>> result = reportPermission
        .groupByKey()
        .aggregate(
            new Initializer<ArrayList<Long>>() {
              @Override
              public ArrayList<Long> apply() {
                return null;
              }
            },
            new Aggregator<Long, Long, ArrayList<Long>>() {
              @Override
              public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) {
                aggregate.add(value);
                return aggregate;
              }
            },
            new Serde<ArrayList<Long>>() {
              @Override
              public void configure(Map<String, ?> configs, boolean isKey) {}

              @Override
              public void close() {}

              @Override
              public Serializer<ArrayList<Long>> serializer() {
                return null;
              }

              @Override
              public Deserializer<ArrayList<Long>> deserializer() {
                return null;
              }
            });

    result.to("report-aggregated-topic");

    KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());
    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }

  private static final String TOPIC = "report-permission";

  private static final Properties createStreamProperties() {
    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");

    return props;
  }
}

我实际上陷入了聚合阶段,因为我不能为它编写一个合适的serde ArrayList<Long> (还没有足够的技能),lambdas似乎没有在aggregator上工作-它不知道聚合器是什么类型的 agg :

KTable<Long, ArrayList<Long>> sample = builder.stream(TOPIC)
    .groupByKey()
    .aggregate(
        () -> new ArrayList<Long>(),
        (key, val, agg) -> agg.add(val),
        longSerde
    );
mec1mxoz

mec1mxoz1#

直接来说,这种方法在sql和kafka流中是不允许的,但是用例是可能的,可以实现如下:
1) 使用solrjapi在sqlserver上编写一个定制的应用程序,每当在sql中执行dml(insert、update、delete等)操作时,它都会命中solr示例。https://wiki.apache.org/solr/solrj
2) usesolrsql数据导入处理程序在sql中发生dml(insert、update、delete等)操作时,sqlserver将自动通知solr。https://wiki.apache.org/solr/dataimporthandler

jgwigjjp

jgwigjjp2#

您可以使用kafka的connectapi将数据从sqlserver获取到kafka中。我不知道sql server的任何特定连接器,但您可以使用任何基于jdbc的通用连接器:https://www.confluent.io/product/connectors/
要处理数据,可以使用kafka的streams api。你可以简单地 aggregate() 每个用户的所有报表。像这样:

KTable<UserId, List<Reports>> result =
    builder.stream("topic-name")
           .groupByKey()
           // init a new empty list and
           // `add` the items to the list in the actual aggregation
           .aggregate(...);

result.to("result-topic");

有关streams api的更多详细信息,请查看文档:https://docs.confluent.io/current/streams/index.html
请注意,您需要确保报告列表不会无限增长。kafka有一些(可配置的)最大消息大小,整个列表将包含在单个消息中。因此,您应该估计最大消息大小并应用相应的配置(-> max.message.bytes )在投入生产之前。在网页上 checkout 配置:http://kafka.apache.org/documentation/#brokerconfigs
最后,使用connectapi将数据推入ElasticSearch。有多种不同的连接器可用(我当然会推荐合流的一个)。有关connect api的更多详细信息:https://docs.confluent.io/current/connect/userguide.html

相关问题