我如何连接一个播放后路线到KafkaFlume?
我发现https://github.com/jamesward/hello-play-kafka. 但它使用随机数字的滴答声源连接到Kafka接收器上。1
如何使post-request路由成为连接到kafka接收器的源?
编辑:post请求主体的格式是json,内容类型为application/json。发送给Kafka的消息应该是完全相同的json。路由需要一条json消息。
我如何连接一个播放后路线到KafkaFlume?
我发现https://github.com/jamesward/hello-play-kafka. 但它使用随机数字的滴答声源连接到Kafka接收器上。1
如何使post-request路由成为连接到kafka接收器的源?
编辑:post请求主体的格式是json,内容类型为application/json。发送给Kafka的消息应该是完全相同的json。路由需要一条json消息。
2条答案
按热度按时间ni65a41a1#
也许我误解了您的问题,但您可以使用rest代理向Kafka发送消息:
参考:融合rest代理文档
mw3dktmi2#
由于您的post请求是直接进入kafka的,一对一,因此不清楚akkastreams提供的流模型是否适合。使用
KafkaProducer
api直接发送消息,一次发送一条消息到一个Kafka主题。这将使代码,特别是错误处理变得更加简单。可能仍然有很好的理由选择使用akka流,例如管理并发或排序。具体的原因可能会影响最佳的解决方法,所以如果你有更多的细节,请添加到问题,我会更新答案。
例如,如果要确保没有并发写操作,可以在guice中将类注册为单例(假设使用guice进行依赖注入)。当它被创建时,它运行一个akka流,使用
Source.queue
,Source.actorRef
,或Source.actorRefWithBackpressure
还有KafkaSink
. 它还需要使用RestartSource
以确保流在出现错误时重新启动。公开一个公共方法,该方法允许调用者向具体化的源队列或参与者发送消息,然后将singleton注入到控制器类中,以允许它使用该方法发布其post数据。这种方法的一个大缺点是,控制器操作只能知道消息已成功发送到流,而不能知道消息已成功写入kafka。如果kafka代理关闭,应用程序崩溃,或者由于其他原因写操作失败,则消息可能会丢失,即使在将成功的结果返回给发起客户端之后也是如此。这不是所有用例的问题,但是需要考虑。