我有一个使用Vertx框架和Kafka的Java应用程序。
有以下类可以实现Kafka使用者和Web服务:
package com.example.starter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.vertx.core.*;
import io.vertx.kafka.client.consumer.KafkaConsumer;
public class ConsumerVerticle extends AbstractVerticle {
public void initKafkaConsumer() {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "latest");
config.put("enable.auto.commit", "true");
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
consumer.handler(record -> {
System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
try {TimeUnit.SECONDS.sleep(1);}
catch(Exception e){}
});
consumer.subscribe("quickstart");
}
@Override
public void start() throws Exception {
System.out.println("CONSUMER THREAD: Thread name -> " + Thread.currentThread().getName());
initKafkaConsumer();
}
}
package com.example.starter;
import io.vertx.core.AbstractVerticle;
import io.vertx.ext.web.Router;
public class WebServiceVerticle extends AbstractVerticle {
private void initWebService() {
Router router = Router.router(vertx);
router.get("/healthcheck").handler(rc -> rc.response().end("OK"));
vertx.createHttpServer().requestHandler(router)
.listen(5555, result -> {
System.out.println("HTTP server started");
});
}
@Override
public void start() throws Exception {
System.out.println("WEB SERVICE: Thread name -> " + Thread.currentThread().getName());
initWebService();
}
}
如果主类定义如下:
package com.example.starter;
import java.util.HashMap;
import java.util.Map;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
public class MainVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
System.out.println("MAIN THREAD: Thread name -> " + Thread.currentThread().getName());
DeploymentOptions options = new DeploymentOptions();
vertx.deployVerticle(WebServiceVerticle.class, options);
vertx.deployVerticle(ConsumerVerticle.class, options);
}
输出如下所示:
MAIN THREAD: Thread name -> vert.x-eventloop-thread-1
WEB SERVICE: Thread name -> vert.x-eventloop-thread-2
CONSUMER THREAD: Thread name -> vert.x-eventloop-thread-3
这对我来说很有意义,因为每个垂直线都被分配给一个特定的线程。
另一方面,我不理解以下情况:
public class MainVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
System.out.println("MAIN THREAD: Thread name -> " + Thread.currentThread().getName());
DeploymentOptions options = new DeploymentOptions();
vertx = Vertx.vertx(new VertxOptions().setMaxEventLoopExecuteTime(1)); // The vertx object is explicitly initialized
vertx.deployVerticle(WebServiceVerticle.class, options);
vertx.deployVerticle(ConsumerVerticle.class, options);
}
在这种情况下,输出如下:
MAIN THREAD: Thread name -> vert.x-eventloop-thread-1
WEB SERVICE: Thread name -> vert.x-eventloop-thread-0
CONSUMER THREAD: Thread name -> vert.x-eventloop-thread-1
我希望在这个实现中不同的顶点也有不同的线程。有人能解释这个行为吗?
1条答案
按热度按时间9o685dep1#
在顶点内部时,不应创建其他顶点示例
不同之处在于,当在verticle内部创建vertx时,它只能访问verticle内部的事件循环/线程,而verticle内部的事件循环/线程为2 -因为verticle是单线程的-这就是Kafka/http仅部署在事件循环0和1上的原因-因为它们是它唯一可用的
作为一个经验法则,你不需要一个主要的垂直线-做任何事情直接从你的主要
部分参考代码:https://github.com/asad-awadia/echo-server/blob/main/src/main/kotlin/dev/aawadia/Main.kt#L13
https://github.com/asad-awadia/kvdb/blob/main/src/main/kotlin/dev/aawadia/Main.kt
在我的课程中,我将介绍所有这些内容:https://www.udemy.com/course/backend-development-with-vertx/learn/?referralCode=063C2D57CCB957C5088C