如何在nodejs中从kafka获取数据

xqk2d5yq  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(219)

目前我正在努力实现 KafkaNode.js . 我的整个项目结构是一个连接到 Node.js 服务器。我有一个基于python的平台来存储 sql 从数据库中获取数据。所以,我想用 Kafka 做沟通 Node.js server <-> Python Platform .
我可以在中创建简单的api Express 并将数据推入kafka(producer),然后在我的python平台中触发一些“fetch data class”。然而,现在我坚持用我的电脑来获取数据并显示它们 React 前端。我不知道如何从python plarform返回数据 Kafka consumerNode.js 还有我的 Kafka consumerNode.js <=> Express . 请帮助我了解如何从 consumerExpress ? 谢谢!
React.js:

let url = "localhost/GetData"
axios
    .post(url,params)
    .then(function(response){
    if(response.data.data.length !== 0) {
        'do something
    })
    .catch(err => {
      console.error(err);
    })

node.js-server.js:

router.post('/GetData', (req, res) => {
    getConnection(function(err, conn){
        if (err) {
            return res.json({ success: false, error: err })
        } else {
            let getData = new getDataObject(id, price, fruit);
            let getDataReq = Object.assign({}, producerRequest);
            let payloads = [];
            getDataReq .messages = [];
            getDataReq .messages.push(JSON.stringify(t.get()));
            payloads.push(getDataReq );

            kafkaProducer.send(payloads, function(err, result){
                if(err){
                    console.log(err)
                    return res.json({ success: false, error: err });
                }else{
                    console.log(result)
                    return res.json({ success: true, data: result });
                }
            })          
        }
    });
})

node.js-kafkaproducted.js:

const kafka = require('kafka-node');
const { NODE_ENV } = require('../../config/env');
const config = require('../../config/kafka.json');
const logger = require('../logger')

const kafkaProducerConfig = '11.11.11.11'; -fake address
const {producer_kafka_server} = kafkaProducerConfig;
const producer_client = new kafka.KafkaClient({kafkaHost: producer_kafka_server});
const producer = new kafka.Producer(producer_client);

producer.on('ready', function(){
    logger.info('kafka producer is ready...')
})

producer.on('error', function(err){
    logger.error(err)
});

exports.producerRequest = {topic: 'get_database_data', messages:[], timestamp: Date.now()};

exports.kafkaProducer = producer;

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题