我在加入带有globalktable的kstream时遇到了一个问题,非常感谢您的帮助。
考虑到Kafka的两个主题 orders
以及 customers
:
订单
"1" {"ID":"1","Name":"Myorder1","CustID":"100"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200"}
客户
"100" {"CustID":"100","CustName":"Customer1"}
"200" {"CustID":"200","CustName":"Customer2"}
要求是用客户名称丰富订单流
"1" {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}
我正在尝试以下方法:
从 orders
主题
从 customers
主题
构建另一个连接orders和customers的流(在customer表中查找order.custid)
KStream<String, EnrichedOrder> enrichedstreams = orders.join(
customers,
new KeyValueMapper<String, Order, String>() {
@Override
public String apply(String key, Order value) {
return value.CustID;
}
},
new ValueJoiner<Order,Customer, EnrichedOrder>() {
@Override
public EnrichedOrder apply(Order order, Customer customer) {
EnrichedOrder eorder = new EnrichedOrder();
eorder.CustID = order.CustID;
eorder.CustName = customer.CustName;
eorder.ID = order.ID;
eorder.Name = order.Name;
return eorder;
}
}
);
但它不会给出任何结果,也不会抛出任何异常。
当使用 leftJoin
,我得到一个客户的nullpointer异常。
请让我知道,如果你遇到了类似的问题,并建议如何解决这个问题。
2条答案
按热度按时间shyt4zoc1#
@deepak您可能需要实现您的ktable
然后流式处理订单并建立连接。
lztngnrs2#
让我们仔细看看复制粘贴的内容:
在
customers
主题:您可以注意到键是一个字符串,该字符串包含双引号:
"100"
. 通常,打印字符串键时不带双引号。我宁愿看到:换句话说,键的java字符串表示是
""100""
(或"\"100\""
)而不是"100"
正如我们所料。另一方面,你的价值
orders
主题是json{"ID":"1","Name":"Myorder1","CustID":"100"}
,以及属性CustID
是一个字符串,这次用java表示"100"
.当你加入
orders
以及customers
,尝试匹配订单custid100
使用客户密钥"100"
. 这将失败,因为custid中缺少键中的双引号。