所以我有一个kafka主题,它从外部源接收数据,并将其写入cassandra表(通过sink连接器)。
在cassandra表中,我有以下格式的数据:
[
SessionID varchar,
TS timestamp,
TYPE varchar,
other data...
]
与 SessionID
作为分区键,以及 (TS, TYPE)
作为聚类键。
所以我的目标是能够“重播”任何已经存在的会话(当前是否活动)。为此,我需要从cassandra(即。 WHERE SessionID=XXX
)并让cassandra发布关于kafka主题的查询结果。
为了解决这个问题,我尝试在Cassandra和Kafka之间使用一个源连接器(如果有更好的方法,请告诉我)。这个连接器允许kql查询,当连接器启动时,kql查询是“固定的”。
所以问题是:有没有一种方法可以随着时间的推移更改kql查询以请求不同的sessionid,或者我应该在每次流式处理不同的会话时创建一个新的、自动生成的连接器?
谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!