我正在尝试使用以下代码启动kafka服务器:
public class MockKafkaServer {
private static final String LOCALHOST = "127.0.0" + ".1";
private static final int CONSUMER_TIMEOUT_MS = 5000;
private static final int CONSUMER_BUFFER_SIZE = 64 * 1024;
private static final int PRODUCER_SLEEP_INTERVAL = 100;
private final KafkaServerStartable broker;
private final MockZooKeeper mockZooKeeper;
private KafkaProducer<byte[], byte[]> kafkaProducer;
private SimpleConsumer simpleConsumer;
private final int port;
public MockKafkaServer() throws IOException, InterruptedException {
this.mockZooKeeper = new MockZooKeeper();
final int zkPort = mockZooKeeper.start();
this.port = getAvailablePort();
final File logDirectory = Files.createTempDir();
logDirectory.deleteOnExit();
final Properties properties = new Properties();
properties.put("zookeeper.connect", LOCALHOST + ":" + zkPort);
properties.put("broker.id", "0");
properties.put("num.partitions", "1");
properties.put("host.name", "localhost");
properties.put("port", String.valueOf(port));
properties.put("log.dir", logDirectory.getAbsolutePath());
properties.put("auto.create.topics.enable", "true");
this.broker = new KafkaServerStartable(new KafkaConfig(properties));
}
public void start() throws IOException, InterruptedException {
broker.startup();
}
public void stop() {
broker.shutdown();
broker.awaitShutdown();
mockZooKeeper.stop();
}
}
对于每个单元测试用例的执行,单元测试用例都调用start()和stop()。我观察到的是,经过几次启动和停止,我得到以下错误:
7790 [main] FATAL kafka.server.KafkaServerStartable - Fatal error during KafkaServerStartable startup. Prepare to shutdown
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at kafka.log.SkimpyOffsetMap.<init>(OffsetMap.scala:44)
at kafka.log.LogCleaner$CleanerThread.<init>(LogCleaner.scala:196)
at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:86)
at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:86)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.log.LogCleaner.<init>(LogCleaner.scala:86)
at kafka.log.LogManager.<init>(LogManager.scala:66)
at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:647)
at kafka.server.KafkaServer.startup(KafkaServer.scala:209)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39)
at com.viper.mockkafkaj.MockKafkaServer.start(MockKafkaServer.java:87)
at com.viper.project.restj.RestControllerTest.setUp(RestControllerTest.java:68)
at junit.framework.TestCase.runBare(TestCase.java:139)
at junit.framework.TestResult$1.protect(TestResult.java:122)
at junit.framework.TestResult.runProtected(TestResult.java:142)
at junit.framework.TestResult.run(TestResult.java:125)
at junit.framework.TestCase.run(TestCase.java:129)
at junit.framework.TestSuite.runTest(TestSuite.java:255)
at junit.framework.TestSuite.run(TestSuite.java:250)
at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:84)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:272)
at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:236)
at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:310)
at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:261)
我尝试为maven测试插件设置以下值
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>-server -Xss128m -Xms64m -Xmx256m -XX:+UseCompressedOops -XX:+UseG1GC -XX:G1HeapWastePercent=50 -XX:+CMSClassUnloadingEnabled -XX:MaxGCPauseMillis=10 -XX:+CMSScavengeBeforeRemark </argLine>
</configuration>
</plugin>
所以,即使xmx设置为1024m,我也得到outofmemoryerror。感谢您的帮助!
1条答案
按热度按时间q5lcpyga1#
这是kafka0.10.2.0的一个问题,默认情况下,log cleaner线程使用高内存。我设置了以下值,我再也看不到堆空间错误了。