我试图用alpakkafka连接器(akkastreamkafka)创建一个简单的原型。
运行应用程序时,出现以下错误:
com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'
我有以下代码 ./src/main/scala/App.scala
:
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent.Future
object App {
def main(args: Array[String]): Unit = {
println("Hello from producer")
implicit val system = ActorSystem("fakeProducer")
implicit val materializer: Materializer = ActorMaterializer()
val config = system.settings.config // ConfigFactory.load()
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done: Future[Done] =
Source(1 to 100)
.map(_.toString)
.map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
.runWith(Producer.plainSink(producerSettings))
println("Done")
}
}
以下 build.sbt
:
name := "test-akka-stream"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
我使用 sbt run
. 我没有配置任何uber/程序集jar。
我可能错过了一些明显的东西,但我看不到。。。我怀疑akka依赖性有问题。
更新
正如@terminally chill calling所建议的 ProducerSettings(system, new StringSerializer, new StringSerializer)
(通过 ActorSystem
而不是配置)解决问题。我只是不明白这是设计的还是bug。
更新2
我已经创建了一个github问题,这个问题已经解决了。现在文档更加准确,并解释了创建 ProducerSettings
/ ConsumerSettings
.
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
或者你可以通过考试 ActorSystem
如上所述。
4条答案
按热度按时间n3h0vuf21#
感谢@terminally chill和@murray todd williams的回答。我做了一些进一步的研究,并试图总结如下:
两者
ConsumerSettings
以及ProducerSettings
有apply
需要Config
(见此处)或ActorSystem
(参见此处)。问题是当使用
ActorSystem
代码是:使用时
Config
代码是:因此,当直接传递配置时,代码将搜索
kafka-clients
属性,而不是在传递ActorSystem
代码检查akka.kafka.consumer/akka.kafka.producer
.最后,在创建
ActorSystem
示例默认情况下,大多数设置都是从嵌入的reference.conf
文件并与您的application.conf
文件(如果存在)。更多信息请点击此处。所以基本上唯一需要设置的属性通常是bootstrap.servers
.所以你现在可以理解为什么使用
system.settings.config
代码不起作用。此配置示例已加载reference.conf
(使用所有默认值,请参见此处)和自定义application.conf
. 这个kafka-clients
财产在里面akka.kafka.consumer/akka.kafka.producer
,但代码直接检查kafka-clients
.一些可能的解决方案:
直接通过
ActorSystem
使用另一个过载通过正确的部分使用
system.settings.config.getConfig("akka.kafka.consumer")
手动构造Config
示例与kafka-clients
部分对我来说,问题是这里提供的官方文件没有提到这些差异,所提供的例子也不完整和/或不准确。可能对于一个akkaMaven来说这是很清楚的,但是对于新开发人员来说这可能是非常混乱的。
我在这个要点中创建了一个更“易于使用”的示例,并打开了一个问题。
odopli942#
感谢您注意到并提交了一个在阿尔帕卡Kafka连接器项目的问题。文档现已更新:https://doc.akka.io/docs/akka-stream-kafka/current/producer.html
nnsrf1az3#
通常我将所有配置都保存在akkasystem示例中。我不把alpakka用于kafka,但是我的很多实现都是基于源代码的。
加载typesafe配置对象
val config = ConfigFactory.load()
然后通过config
加入val system = ActorSystem("fakeProducer", config)
.最后,通过
system.settings.config
至ProducerSettings
.您当前的代码没有通过任何设置,因为您还没有将配置加载到akka系统中。你的
val config = system.settings.config
正在引用一个空配置,该配置没有kafka clients部分(最佳猜测)。m0rkklqb4#
我想我遇到了和你一样的问题(几乎在同一时间),尽管我试图创建一个基本的“你好世界”Kafka消费者而不是生产者。我猜您只是在浏览alpakka-kafka连接器文档中的文档,并遵循它们最初定义的示例
然后将其传递到新的consumersettings对象中。我猜在线文档有一个缺陷,但我对akka streams还不够了解(这是我第一次尝试通过示例学习),我没有资格准确判断什么是对的什么是错的。
我尝试创建和application.conf文件,然后执行configfactory.load(),然后在创建时手动将其传递给actorsystem,然后将该系统传递给consumersettings构造函数,丢失的“kafka clients”错误消失了,但显然我甚至不必这样做。正如您所说,只需传递'system'变量而不是'config'变量就可以了。
希望这能帮助那些在黑暗中摸索的人。我要说的是,尽管阿克卡河周围到处都是嗡嗡声,但似乎真的缺少文档。我可能要写一篇博客文章,一旦我弄明白了这些东西!