在node js app中使用kafka consumer指示已进行计算

f0brbegy  于 2021-06-09  发布在  Cassandra
关注(0)|答案(1)|浏览(419)

因此,我的问题可能涉及一些头脑风暴的基础上的性质的应用程序。
我有一个node js应用程序,可以向kafka发送消息。例如,每次用户单击页面时,Kafka应用程序都会根据访问量运行一个计算。然后,在同一个示例中,我希望在通过kafka消息触发计算之后检索它。到目前为止,这个计算存储在一个cassandra数据库中。问题是,如果我们试图在计算完成之前从cassandra读取数据,那么我们将不会从数据库中查询任何内容(尚未插入密钥),也不会返回任何内容(错误),或者可能是计算已过时。这是我目前的代码。

router.get('/:slug', async (req, res) =>{

Producer = kafka.Producer

KeyedMessage = kafka.KeyedMessage
  client = new kafka.KafkaClient()

producer = new Producer(client)

km = new KeyedMessage('key', 'message')
  kafka_message = JSON.stringify({ id: req.session.session_id.toString(), url: arbitrary_url })
  payloads = [
    { topic: 'MakeComputationTopic', messages: kafka_message}
  ]; 
const clientCass = new cassandra.Client({
contactPoints: ['127.0.0.1:9042'],
localDataCenter: 'datacenter1', // here is the change required
keyspace: 'computation_space',
authProvider: new auth.PlainTextAuthProvider('cassandra', 'cassandra')
});

const query = 'SELECT * FROM computation  WHERE id = ?';

clientCass.execute(query, [req.session.session_id],{ hints : ['int'] })
  .then(result => console.log('User with email %s', result.rows[0].computations))
  .catch((message) => {
    console.log('Could not find key')
  });

}

首先,async和await出现在我的脑海中,但这是被排除的,因为这不会停止陈旧的计算。
第二,我考虑让我的应用程序休眠,但这种方式似乎会减慢我的应用程序。
我可能决定使用kafka consumer(在我的node js中)来使用一条消息,该消息指示现在可以安全地查看cassandra表。
例如(使用Kafka节点)

consumer.on('message', function (message) {
    clientCass.execute(query, [req.session.session_id],{ hints : ['int'] })
  .then(result => console.log('User with computation%s', result.rows[0].computations))
  .catch((message) => {
    console.log('Could not find key')
  });
});

这种方法虽然更好,但似乎有点不合适,因为每次用户单击一个页面时,我都必须让一个消费者,我只关心它被发送一条消息。
我在想我该如何应对这个挑战?我可能错过了一个场景,或者有没有一种方法可以使用kafka节点来解决这个问题?我还考虑做一个while循环,等待承诺成功,并且计算不会过时(比较缓存中的值)

yuvru6vn

yuvru6vn1#

这种方法虽然更好,但似乎有点不合适,因为每次用户单击一个页面时,我都必须让一个消费者,我只关心它被发送一条消息。
我也会得出同样的结论。cassandra不是为这种用例设计的。数据库最终是一致的。如果你把一些东西放在一起,你现在的方法可能会奏效,但是一旦你有了一个cassandra集群,肯定会导致未定义的行为。尤其是更新条目时。
计算表中的id是分区键。这意味着一旦你有了一个集群,cassandra就会按id分发数据,看起来它只包含一行。这是一种非常低效的cassandra表建模方法。
您的用例看起来像会话存储或缓存的用例。redis或leveldb非常适合这种用例。任何其他键值存储也可以完成这项工作。
为什么不将结果写入另一个主题,让另一个应用程序读取该主题并将结果写入数据库。所以你不需要保持任何状态。完成后,结果将出现在主题中。它看起来是这样的:
传入数据->第一个Kafka主题->计算应用程序->第二个Kafka主题->另一个应用程序将其写入数据库<-另一个应用程序定期读取数据。
如果它在那里,它就在那里,因此还没有完成。

相关问题