在Spring中使用什么类型的“EventBus”?内置的、Reactor、Akka?

sg2wtvxw  于 2022-11-06  发布在  Spring
关注(0)|答案(3)|浏览(272)

我们将在几周内启动一个新的Spring 4应用程序。我们希望使用一些事件驱动的架构。今年我到处读到关于“Reactor”的文章,在网上寻找它的时候,我偶然发现了“Akka”。
所以现在我们有三个选择:

我找不到一个真实的的比较。
现在我们只需要这样的东西:

  • 用于侦听Event EX寄存器
  • 用于侦听Event EY寄存器
  • Z发送一个

然后,XY将接收并处理该事件。
我们很可能会以异步方式使用它,但肯定也会有一些同步场景。而且我们很可能总是将一个类作为事件发送。(Reactor示例主要使用String和String模式,但它也支持Object)。
据我所知,ApplicationEvent默认是同步工作的,Reactor是异步工作的。Reactor还允许使用await()方法来使它有点同步。Akka提供了与Reactor差不多的功能,但也支持远程处理。
关于React堆的await()方法:它能等待多个线程完成吗?或者甚至是这些线程的一部分?如果我们以上面的例子为例:

  • 用于侦听Event EX寄存器
  • 用于侦听Event EY寄存器
  • Z发送一个Event E

有没有可能让它同步,通过说:等待XY完成。是否有可能使它只等待X,而不等待Y
也许还有其他的选择,比如JMS呢?
有很多问题,但希望你能提供一些答案!
谢谢你,谢谢你

EDIT:用例示例

1.当一个特定的事件被触发时,我想创建10000封电子邮件。每封电子邮件都必须用用户特定的内容来生成。所以我会创建很多线程(最大=系统cpu内核)来创建邮件,而不会阻塞调用者线程,因为这可能需要几分钟。
1.当一个特定的事件被触发时,我想从未知数量的服务中收集信息。每次获取大约需要100 ms。在这里我可以想象使用Reactor的await,因为我需要这些信息来继续我在主线程中的工作。
1.当一个特定的事件被触发时,我想根据应用程序的配置来执行一些操作。所以应用程序必须能够动态地注册消费者/事件处理程序。他们会用事件做他们自己的事情,我不在乎。所以我会为每个处理程序创建一个线程,然后继续在主线程中做我的工作。
1.简单去耦:我基本上知道所有的接收器,但是我不想在我的代码中调用每个接收器。这应该同步完成。
听起来我需要一个ThreadPool或者一个RingBuffer。这些框架有动态的RingBuffer吗?如果需要的话,它的大小会增加吗?

nfs0ujit

nfs0ujit1#

我不确定我能在这么小的空间里充分回答你的问题。但我会给予的!:)
Spring的ApplicationEvent系统和Reactor在功能上是完全不同的。ApplicationEvent路由是基于ApplicationListener处理的类型。(不过,这也不一定是坏事)。然而,提供了一个全面的路由层,它也是非常轻量级的和完全可扩展的。在订阅和发布事件的能力方面,这实际上是任何事件驱动系统的一个特性。另外,不要忘记Spring 4中新的spring-messaging模块。它是SpringIntegration中可用工具的一个子集,还提供了围绕事件驱动架构进行构建的抽象。
Reactor将帮助您解决几个关键问题,否则您将不得不自己管理:

  • 选择器匹配 *:Reactor进行Selector匹配,包括一系列的匹配--从简单的.equals(Object other)调用,到允许占位符提取的更复杂的URI模板匹配。您还可以使用自己的自定义逻辑扩展内置的选择器,以便使用富对象作为通知键(例如域对象)。
  • 流和承诺API *:您已经提到了Promise API,并引用了.await()方法,它实际上是为期望阻塞行为的现有代码准备的。使用组合和回调通过不阻塞线程来有效地利用系统资源的压力再大也不为过。在一个依赖少量线程执行大量任务的架构中,阻塞调用者几乎从来都不是一个好主意。未来是不可云扩展的,这就是为什么现代应用程序会利用替代解决方案。

你的应用程序可以用Streams或Promises架构,不过老实说,我认为你会发现Stream更灵活。关键的好处是API的可组合性,它允许你在依赖链中将动作连接在一起,而不会阻塞。作为一个基于你的电子邮件用例的完全即兴的例子,你描述如下:

@Autowired
Environment env;
@Autowired
SmtpClient client;

// Using a ThreadPoolDispatcher
Deferred<DomainObject, Stream<DomainObject>> input = Streams.defer(env, THREAD_POOL);

input.compose()
  .map(new Function<DomainObject, EmailTemplate>() {
    public EmailTemplate apply(DomainObject in) {
      // generate the email
      return new EmailTemplate(in);
    }
  })
  .consume(new Consumer<EmailTemplate>() {
    public void accept(EmailTemplate email) {
      // send the email
      client.send(email);
    }
  });

// Publish input into Deferred
DomainObject obj = reader.readNext();
if(null != obj) {
  input.accept(obj);
}

Reactor还提供了Boundary,它基本上是一个用于阻塞任意消费者的CountDownLatch(因此,如果您只想阻塞一个Consumer完成,则不必构造一个Promise)。在这种情况下,您可以使用原始的Reactor,并使用on()notify()方法来触发服务状态检查。
然而,对于某些事情,似乎您想要的是从ExecutorService返回的Future,不是吗?为什么不简单点呢?Reactor只在吞吐量性能和开销效率很重要的情况下才有真实的的好处。如果您阻塞了调用线程,那么您很可能会抹杀Reactor无论如何都会给您带来的效率提升。因此在这种情况下,使用更传统的工具集可能会更好。
Reactor的开放性的好处是没有什么可以阻止这两个组件的交互。你可以自由地混合FuturesConsumers,而不会产生静电。在这种情况下,请记住,你的速度只能和最慢的组件一样快。

u3r8eeie

u3r8eeie2#

让我们忽略Spring的ApplicationEvent,因为它实际上不是为您所要求的而设计的(它更多的是关于bean生命周期管理)。
你需要弄清楚的是你是否想做
1.面向对象的方式(即参与者、动态消费者、动态注册)
1.服务方式(静态使用者,启动时注册)。
XY为例,它们是:
1.短暂的示例(1)或他们
1.长期单例/服务对象(2)?
如果您需要动态注册消费者,Akka是一个不错的选择(我不确定reactor,因为我从未使用过它)。如果您不想在短暂对象中进行消费,则可以使用JMS或AMQP。
您还需要了解,这些类型的库试图解决两个问题:
1.并发性(即在同一台机器上并行处理)
1.分布(即在多台机器上并行执行操作)
Reactor和Akka主要关注#1。Akka最近刚刚添加了集群支持,参与者抽象使#2更容易实现。消息队列(JMS、AMQP)关注#2。
对于我自己的工作,我做了服务路由,并使用了经过大量修改的Guava EventBus和RabbitMQ。我使用了类似于Guava Eventbus的注解,但也有对总线上发送的对象的注解,然而,您可以在异步模式下使用Guava的EventBus作为POC,然后像我一样创建您自己的。
您可能认为您需要动态的消费者(1),但是大多数问题可以通过一个简单的pub/sub来解决。此外,管理动态的消费者可能很棘手(因此Akka是一个很好的选择,因为演员模型对此有各种各样的管理)

knpiaxh1

knpiaxh13#

仔细定义您想要从框架中得到什么。如果一个框架的功能比您需要的多,它并不总是好的。更多的功能意味着更多的错误,更多的代码需要学习,性能也会下降。
需要关注的一些特性包括:

  • 执行元的性质(线程或轻量级对象)
  • 能够在计算机集群上工作(Akka)
  • 持久性消息队列(JMS)
  • 信号(没有信息的事件)、转换(将来自不同端口的消息组合成复杂事件的对象,参见Petri网)等特定功能。

注意像await这样的同步特性--它会阻塞整个线程,并且当参与者在线程池中执行时是危险的(线程饥饿)。
更多要查看的框架:
Fork-Join Pool-在某些情况下,允许await而不发生线程饥饿
Scientific workflow systems
Dataflow framework for Java-信号、转换

附加:两种参与者。

通常,并行工作系统可以表示为一个图,图中的活动节点相互发送消息.在Java中,与大多数其他主流语言一样,活动节点(执行元)可以实现为线程或任务(Runnable或Callable)由线程池执行。通常,部分参与者是线程,部分是任务。这两种方法各有优缺点。因此,为系统中的每个参与者选择最合适的实现是至关重要的。简单地说,线程可以阻塞(并等待事件),但它们的堆栈会消耗大量内存。任务可能不会阻塞,但会使用共享堆栈(池中线程的)。
如果一个任务调用阻塞操作,它会从服务中排除一个池线程。如果许多任务阻塞,它们会排除所有线程,从而导致死锁-那些可以解除阻塞阻塞的任务的任务无法运行。这种死锁称为 * 线程饥饿 *。如果为了防止线程饥饿,将线程池配置为无限制,我们只是将任务转换为线程,从而失去任务的优势。
为了消除任务中对阻塞操作的调用,任务应该被分成两个(或更多)--第一个任务调用阻塞操作并退出,其余的被格式化为异步任务,在阻塞操作完成时启动。当然,阻塞操作必须有一个替代的异步接口。因此,例如,应该使用NIO或NIO 2库,而不是同步阅读套接字。
不幸的是,标准java库缺少与流行的同步工具(如队列和信号量)相对应的异步函数,幸运的是,它们很容易从头开始实现(参见Dataflow framework for Java示例)。
因此,完全使用非阻塞任务进行计算是可能的,但会增加代码的大小。明显的建议是在可能的情况下使用线程,而任务只用于简单的大规模计算。

相关问题