我试图在不同的集群系统中进行分布式发布订阅,但无论我怎么尝试,它都不起作用。
我所要做的就是创建一个简单的例子。
1)我创建了一个主题,说“内容”。
2)比如说,jvm A中的一个节点创建主题,订阅该主题,以及发布该主题的发布者。
3)在另一个节点中,比如说另一个端口上的B,我创建了一个订阅者。
4)当我从jvm A向主题发送消息时,我希望jvm B上的订阅者也能收到它,因为它订阅了同一个主题。
任何帮助都将非常感谢或一个简单的工作示例分布式pub sub与订阅者和发布者在不同的集群系统在不同的端口,在Java。
下面是app1的代码及其配置文件。
public class App1{
public static void main(String[] args) {
System.setProperty("akka.remote.netty.tcp.port", "2551");
ActorSystem clusterSystem = ActorSystem.create("ClusterSystem");
ClusterClientReceptionist clusterClientReceptionist1 = ClusterClientReceptionist.get(clusterSystem);
ActorRef subcriber1=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber1");
clusterClientReceptionist1.registerSubscriber("content", subcriber1);
ActorRef publisher1=clusterSystem.actorOf(Props.create(Publisher.class), "publisher1");
clusterClientReceptionist1.registerSubscriber("content", publisher1);
publisher1.tell("testMessage1", ActorRef.noSender());
}
}
app1.confi
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}
akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
}
}
app2及其配置文件的代码
public class App
{
public static Set<ActorPath> initialContacts() {
return new HashSet<ActorPath>(Arrays.asList(
ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")));
}
public static void main( String[] args ) {
System.setProperty("akka.remote.netty.tcp.port", "2553");
ActorSystem clusterSystem = ActorSystem.create("ClusterSystem2");
ClusterClientReceptionist clusterClientReceptionist2 = ClusterClientReceptionist.get(clusterSystem);
final ActorRef clusterClient = clusterSystem.actorOf(ClusterClient.props(ClusterClientSettings.create(
clusterSystem).withInitialContacts(initialContacts())), "client");
ActorRef subcriber2=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber2");
clusterClientReceptionist2.registerSubscriber("content", subcriber2);
ActorRef publisher2=clusterSystem.actorOf(Props.create(Publisher.class), "publisher2");
publisher2.tell("testMessage2", ActorRef.noSender());
clusterClient.tell(new ClusterClient.Send("/user/publisher1", "hello", true), null);
}
}
app2.confi
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2553
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2553"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}
akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
}
}
发布服务器类和订阅服务器类对于两个应用程序是相同,如下所示
出版商:
public class Publisher extends UntypedActor {
private final ActorRef mediator =
DistributedPubSub.get(getContext().system()).mediator();
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof String) {
mediator.tell(new DistributedPubSubMediator.Publish("events", msg), getSelf());
} else {
unhandled(msg);
}
}
}
订阅者:
public class Subscriber extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public Subscriber(){
ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
mediator.tell(new DistributedPubSubMediator.Subscribe("events", getSelf()), getSelf());
}
public void onReceive(Object msg) throws Throwable {
if (msg instanceof String) {
log.info("Got: {}", msg);
} else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
log.info("subscribing");
} else {
unhandled(msg);
}
}
}
我在运行两个应用程序时在接收器端应用程序中遇到此错误。遇到死信
[ClusterSystem-akka.actor.default-dispatcher-21] INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ClusterSystem/system/receptionist/akka.tcp%3A%2F%2FClusterSystem2%40127.0.0.1%3A2553%2FdeadLetters#188707926] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#1119990682] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
并且在发送方侧应用中,在日志中显示消息发送成功。
[ClusterSystem2-akka.actor.default-dispatcher-22] DEBUG akka.cluster.client.ClusterClient - Sending buffered messages to receptionist
2条答案
按热度按时间l2osamch1#
以这种方式使用ClusterClient实际上没有意义,并且与使用分布式发布sub没有任何关系,因为您的两个节点都是集群的一部分,您可以直接使用分布式发布sub API。
下面是一个简单的主配置,它使用您的发布服务器执行元和订阅服务器执行元创建一个双节点群集,并按预期工作:
注意,分布式pub sub不给予任何传递保证,因此如果您在中介体彼此联系之前发送消息,消息将丢失(因此使用
Thread.sleep
语句,这通常不是您在实际代码中应该做的事情)。ljo96ir52#
我认为问题是您的参与者系统具有不同的名称ClusterSystem和ClusterSystem2。至少我遇到了相同的问题,因为我在集群中有两个不同的服务,但我用不同的名称命名每个服务中的系统。