我正在连接到一个websocket(股票市场)并在Flume中从中获取交易,我想将这些交易发布给Kafka。我可以发送一些测试消息(字符串)到Kafka,但无法连接到Kafka出版商的实际交易
这里我有actorsystem,actor materializer,system dispatcher,然后是9092(端口)的React式kafka生产者。然后我在控制台上打印交易,这是好的。而不是打印到控制台,我想把这些交易Kafka生产者。
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val kafka = new ReactiveKafka()
val producer = ReactiveKafkaProducer[Array[Byte], String](ProducerProperties(
bootstrapServers = "localhost:9092",
topic = "binance",
valueSerializer = new StringSerializer()
))
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
Sink.foreach(println),
Source.maybe[Message])(Keep.right)
// Test messages to Kafka Producer is working fine
producer.producer.send(new ProducerRecord("binance","foo"))
producer.producer.send(new ProducerRecord("binance","bar"))
val (upgradeResponse, promise) =
Http().singleWebSocketRequest(
WebSocketRequest("wss://stream.binance.com:9443/ws/bnbbtc@trade"),
flow)
val connected = upgradeResponse.map { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
connected.onComplete(println)
控制台上打印的交易:success(done)textmessage.strict({“e”:“trade”,“e”:1518536267285,“s”:“bnbbtc”,“t”:9161710,“p”:“0.00106130”,“q”:“7.43000000”,“b”:23819006,“a”:23819013,“t”:1518536267283,“m”:true,“m”:true})textmessage.strict({“e”:“trade”,“e”:1518536267920,“s”:“bnbbtc”,“t”:9161711,“p”:“0.00106210”,“q”:“20.00000000”,“b”:23819014,“a”:23819010,“t”:1518536267917,“m”:false,“m”:true})文本信息。严格({“e”:“trade”,“e”:1518536272108,“s”:“bnbbtc”,“t”:9161712,“p”:“0.00106150”,“q”:“47.03000000”,“b”:23819019,“a”:23819020,“t”:1518536272104,“m”:true,“m”:true})文本信息。严格({“e”:“trade”,“e”:1518536276145,“s”:“bnbbtc”,“t”:9161713,“p”:“0.00106180”,“q”:“1.29000000”,“b”:23819028,“a”:23819027,“t”:1518536276142,“m”:假,“m”:真})
也请让我知道如何处理的消息,因为它是json和发送给Kafka生产者
1条答案
按热度按时间lstz6jyr1#
它和这段代码一起工作