我正在运行一个简单的kafka流应用程序,它将使用node js记录的信息转换为kafka主题。
E.g.
Producer = kafka.Producer
KeyedMessage = kafka.KeyedMessage
client = new kafka.KafkaClient()
producer = new Producer(client)
km = new KeyedMessage('key', 'message')
kafka_message = JSON.stringify({ id: req.session.data.toString(), url: article.info })
payloads = [
{ topic: 'eventTopic', messages: kafka_message,timestamp:timestampNow}
];
producer.send(payloads, function (err, data) {
console.log(data);
});
还需要注意的是,时间戳只是一个数字,表示自1970年6月以来的秒数。
我使用scala中的kafka流来使用这些数据。
例如。
val builder = new StreamsBuilder
val stream = builder
.stream[String, String]("TopicTest")
.foreach((k:String, v:String) => {
println(k)
println(v)
}
但是,我不确定如何将时间戳(我从nodejs发送的)提取到这个流中。
例如,如果我想做这样的事
val stream = builder
.stream[String, String,Long]("TopicTest")
.foreach((k:String, v:String,timeStamp:Long) => {
println(k)
println(v)
println(timeStamp)
}
这会导致错误“无法解析符号流”。我在想我怎么才能解决这个问题。这是我的拓扑结构和流的配置,仅供参考。val topology=builder.build
import java.util.Properties
val props = new Properties()
import org.apache.kafka.streams.StreamsConfig
val appId = this.getClass.getSimpleName.replace("$", "")
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
props.put(StreamsConfig.CLIENT_ID_CONFIG, appId)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
// Step 4. Create Kafka Streams Client
import org.apache.kafka.streams.KafkaStreams
val ks = new KafkaStreams(topology, props)
ks.start
1条答案
按热度按时间cgyqldqp1#
确实存在时间戳提取器(https://jaceklaskowski.gitbooks.io/mastering-kafka-streams/content/kafka-streams-timestampextractor.html). 但是,可以将时间戳作为任何常规的kafka消息发送到。我首先修改的是nodejs代码。
我发送的json消息中现在有一个timestamp字段。
最后,我们可以使用argonaut解析json消息。