kafka流和kglobaltable连接问题

8e2ybdfx  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(217)

我在加入带有globalktable的kstream时遇到了一个问题,非常感谢您的帮助。
考虑到Kafka的两个主题 orders 以及 customers :
订单

"1"     {"ID":"1","Name":"Myorder1","CustID":"100"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200"}

客户

"100"   {"CustID":"100","CustName":"Customer1"}

"200"   {"CustID":"200","CustName":"Customer2"}

要求是用客户名称丰富订单流

"1"     {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}

我正在尝试以下方法:
orders 主题
customers 主题
构建另一个连接orders和customers的流(在customer表中查找order.custid)

KStream<String, EnrichedOrder> enrichedstreams = orders.join(
    customers,
    new KeyValueMapper<String, Order, String>() {            
        @Override
        public String apply(String key, Order value) {
           return value.CustID;
        }
    },
    new ValueJoiner<Order,Customer, EnrichedOrder>() {
        @Override
        public EnrichedOrder apply(Order order, Customer customer) {
            EnrichedOrder eorder = new EnrichedOrder();
            eorder.CustID = order.CustID;
            eorder.CustName = customer.CustName;
            eorder.ID = order.ID;
            eorder.Name = order.Name;           
            return eorder;
        }
    }
);

但它不会给出任何结果,也不会抛出任何异常。
当使用 leftJoin ,我得到一个客户的nullpointer异常。
请让我知道,如果你遇到了类似的问题,并建议如何解决这个问题。

shyt4zoc

shyt4zoc1#

@deepak您可能需要实现您的ktable

builder.table(customers, Materialized.as(customerStore));

然后流式处理订单并建立连接。

lztngnrs

lztngnrs2#

让我们仔细看看复制粘贴的内容:
customers 主题:

"100"   {"CustID":"100","CustName":"Customer1"}

您可以注意到键是一个字符串,该字符串包含双引号: "100" . 通常,打印字符串键时不带双引号。我宁愿看到:

100    {"CustID":"100","CustName":"Customer1"}

换句话说,键的java字符串表示是 ""100"" (或 "\"100\"" )而不是 "100" 正如我们所料。
另一方面,你的价值 orders 主题是json {"ID":"1","Name":"Myorder1","CustID":"100"} ,以及属性 CustID 是一个字符串,这次用java表示 "100" .
当你加入 orders 以及 customers ,尝试匹配订单custid 100 使用客户密钥 "100" . 这将失败,因为custid中缺少键中的双引号。

相关问题