与node.js相比,有人能告诉我为什么不能将事件发送到kafka主题吗?但是代码没有产生任何错误。
var express = require('express');
var path = require('path');
var favicon = require('serve-favicon');
var logger = require('morgan');
var cookieParser = require('cookie-parser');
var bodyParser = require('body-parser');
var config = require('./config.js');
var nforce = require('nforce');
var dateTime = require('node-datetime');
var kafka = require('kafka-node');
var routes = require('./routes/index');
var dt = dateTime.create();
var org = nforce.createConnection({
clientId: config.CLIENT_ID,
clientSecret: config.CLIENT_SECRET,
redirectUri: config.CALLBACK_URL + '/oauth/_callback',
mode: 'multi',
environment: config.ENVIRONMENT // optional, sandbox or production, production default
});
HighLevelProducer = kafka.HighLevelProducer,
client = new kafka.Client('localhost:9092'),
producer = new HighLevelProducer(client),
org.authenticate({ username: config.USERNAME, password: config.PASSWORD }, function(err, oauth) {
if(err) return console.log(err);
if(!err) {
console.log('***Successfully connected to Salesforce***');
// add any logic to perform after login
}
// subscribe to a pushtopic
var str = org.stream({ topic: config.PUSH_TOPIC, oauth: oauth });
str.on('connect', function(){
console.log('Connected to pushtopic: ' + config.PUSH_TOPIC);
});
str.on('error', function(error) {
console.log('Error received from pushtopic: ' + error);
});
str.on('data', function(data) {
console.log('Received the following from pushtopic ---');
var dataStream1 = data['sobject'];
dataStream1['timestamp'] = dt.format('Y-m-d H:M:S');
console.log(dataStream1);
var dataStreamFinal = '[' + JSON.stringify(dataStream1) + ']';
payloads = [
{ topic: 'testing1', messages: dataStreamFinal }
];
console.log(payloads);
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
});
});
module.exports = {app: app, server: server};
我正在使用node的以下模块https://github.com/sohu-co/kafka-node
在此之前,我只是添加了代码,从salesforce pushtopic中提取事件,并将其发布到kafka是我的目标。
1条答案
按热度按时间rnmwe5a21#
终于可以把数据推给Kafka了。尽管nforce还不支持重播消息功能。