kafkastreams left join dsl:在外部空值上插入

y0u0uwnf  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(346)

我有一个混合和匹配的dsl papi拓扑。dsl部分将pageviews(“pageviews”主题)与那些pageviews的用户(“users”主题)连接起来。我想两者都加入,所以如果用户是新的,那么就从pvs信息中创建一个新的“user”到“users”主题中,否则什么也不做。
所以我尝试在pageviews和用户之间做一个左连接,如果用户为null,这意味着还没有用这个键创建用户,所以在这种情况下我创建了一个。
在代码中,我以流的形式获取页面视图,以表的形式获取用户,当用户在连接中为空时,将它们连接起来生成新用户,然后过滤并将这些新用户发送给“用户”。

val builder = new StreamsBuilder()
    val pageviewsTopic: KStream[Key, Pageview] = builder.stream("pageviews")
      .map((muipk, pageview) => (new MerchantUserPartitionKey(muipk.merchantSiteId, muipk.uid) -> pageview))

    val usersTopic: KTable[MerchantUserPartitionKey, user] = builder.table("users")

    val joinedPageviewsWithUsers: KStream[MerchantUserPartitionKey, User] =
      pageviewsTopic.leftJoin(
        usersTopic,
        new ValueJoiner[Pageview, User, User] {
          override def apply(pageview: Pageview, user: User): User = {
            logger.info("JOIN PAGEVIEW-user")
            if (user == null) {
              new User(UUIDUtils.generateRandomId(), pageview.uid /*, some other data */)
            } else {
              logger.info("user already created.")
              null
            }
          }
        })
    // Generate users.
    joinedPageviewsWithUsers.
      filter((key, user) => user != null ).
      to("users")

生成的dsl拓扑如下所示:

Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [pageviews])
      --> KSTREAM-MAP-0000000001
    Processor: KSTREAM-MAP-0000000001 (stores: [])
      --> KSTREAM-FILTER-0000000006
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FILTER-0000000006 (stores: [])
      --> KSTREAM-SINK-0000000005
      <-- KSTREAM-MAP-0000000001
    Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-MAP-0000000001-repartition)
      <-- KSTREAM-FILTER-0000000006
  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-MAP-0000000001-repartition])
      --> KSTREAM-LEFTJOIN-0000000008
    Processor: KSTREAM-LEFTJOIN-0000000008 (stores: [users-STATE-STORE-0000000002])
      --> KSTREAM-FILTER-0000000009
      <-- KSTREAM-SOURCE-0000000007
    Processor: KSTREAM-FILTER-0000000009 (stores: [])
      --> KSTREAM-SINK-0000000010
      <-- KSTREAM-LEFTJOIN-0000000008
    Source: KSTREAM-SOURCE-0000000003 (topics: [users])
      --> KTABLE-SOURCE-0000000004
    Sink: KSTREAM-SINK-0000000010 (topic: users)
      <-- KSTREAM-FILTER-0000000009
    Processor: KTABLE-SOURCE-0000000004 (stores: [user-STATE-STORE-0000000002])
      --> none
      <-- KSTREAM-SOURCE-0000000003

但是,当对具有相同键的多个页面视图运行此操作时,“users”会创建新用户,但它总是与“null”合并。因此,看起来存储并没有使用“users”主题中新生成的数据进行更新,即使它显示使用 user-STATE-STORE-0000000002 .
你需要做些额外的事情来把数据输入商店吗?这是不是Kafka团队的反模式(写下你之前加入的主题)?
更新更多信息:
键不为空
valuejoiner代码被执行(显示打印输出),只有用户值总是空的。
用户被写入“users”主题(在这种情况下,逻辑上,它每次进入valuejoiner时都这样做,因为它总是发现外部值为null,因此它会将用户插入“users”中)

yyyllmsg

yyyllmsg1#

当一个流在一个子策略中,该子策略在另一个子策略中查找一个表时,可能会涉及到常规的消费/生产延迟。例如,当您直接从主题定义流或表时,就会发生这种情况。如果你能使用更有意义的指令,比如 through (这会写入主题,但会让topology知道它仍将在该拓扑中使用)它会有所帮助 KafkaStreams 想知道怎么会有这样的关系。

相关问题