我有一个测试,它会让一个开放生产者线程连续记录错误。
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
测试是可行的,但有时会像上面那样失败。
test("My test") {
val topology = Application.getTopology(...)
val streams = new KafkaStreams(topology,properties)
withRunningKafka {
createCustomTopic(eventTopic)
val streamId = UUIDs.newUuid().toString
logger.info(s"Creating stream with Application ID: [$streamId]")
val streams = new KafkaStreams(topology, streamConfig(streamId, PropertiesConfig.asScalaMap(props)))
try {
publishToKafka(eventTopic, key = keyMSite1UID1, message = event11a)
// ... several more publishings
Thread.sleep(publishingDelay) // Give time to initialize
streams.start()
Thread.sleep(deletionDelay)
withConsumer[MyKey, MyEvent, Unit] { consumer =>
val consumedMessages: Stream[(MyKey, MyEvent)] =
consumer.consumeLazily[(MyKey, MyEvent)](eventTopic)
val messages = consumedMessages.take(20).toList
messages.foreach(tuple => logger.info("EVENT END: " + tuple))
messages.size should be(6)
// several assertions here
}
} finally {
streams.close()
}
}(config)
}
一个特殊性是streams应用程序在其使用的同一主题上生成删除事件。
在这个套件中有两个类似的测试。我在sbt下执行测试套件,如下所示:
testOnly *MyTest
五分之四的执行会留下一个悬而未决的线程无限期地发布这些错误。他们三人一组出现,但我也不知道为什么。
我尝试在调用close()后设置延迟,但似乎没有帮助。如何避免悬挂生产者线程?
1条答案
按热度按时间3ks5zfa01#
在测试中,您将创建两个
KafkaStreams
示例,但仅限于close()
一个。我认为Producer
属于您未关闭的示例。注意,你需要打电话KafkaStreams#close()
即使你从没打过电话KafkaStreams#start()
.