io.aeron.Aeron.addSubscription()方法的使用及代码示例

x33g5p2x  于2022-01-16 转载在 其他  
字(12.3k)|赞(0)|评价(0)|浏览(126)

本文整理了Java中io.aeron.Aeron.addSubscription()方法的一些代码示例,展示了Aeron.addSubscription()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Aeron.addSubscription()方法的具体详情如下:
包路径:io.aeron.Aeron
类名称:Aeron
方法名:addSubscription

Aeron.addSubscription介绍

[英]Add a new Subscription for subscribing to messages from publishers.

The method will set up the Subscription to use the Aeron.Context#availableImageHandler(AvailableImageHandler) and Aeron.Context#unavailableImageHandler(UnavailableImageHandler) from the Aeron.Context.
[中]添加新订阅以订阅来自发布者的邮件。
该方法将设置订阅以使用Aeron。上下文#availableImageHandler(availableImageHandler)和Aeron。上下文#来自Aeron的unavailableImageHandler(unavailableImageHandler)。上下文

代码示例

代码示例来源:origin: real-logic/aeron

Subscription createAndRecordLogSubscriptionAsFollower(final String logChannel)
{
  closeExistingLog();
  final Subscription subscription = aeron.addSubscription(logChannel, ctx.logStreamId());
  startLogRecording(logChannel, SourceLocation.REMOTE);
  return subscription;
}

代码示例来源:origin: real-logic/aeron

@Before
public void before()
{
  when(mockAeron.conductorAgentInvoker()).thenReturn(mock(AgentInvoker.class));
  when(mockEgressPublisher.sendEvent(any(), anyLong(), anyInt(), any(), any())).thenReturn(TRUE);
  when(mockLogPublisher.appendSessionClose(any(), anyLong(), anyLong())).thenReturn(TRUE);
  when(mockLogPublisher.appendSessionOpen(any(), anyLong(), anyLong())).thenReturn(128L);
  when(mockLogPublisher.appendClusterAction(anyLong(), anyLong(), anyLong(), any(ClusterAction.class)))
    .thenReturn(TRUE);
  when(mockAeron.addPublication(anyString(), anyInt())).thenReturn(mockResponsePublication);
  when(mockAeron.addSubscription(anyString(), anyInt())).thenReturn(mock(Subscription.class));
  when(mockAeron.addSubscription(anyString(), anyInt(), eq(null), any(UnavailableImageHandler.class)))
    .thenReturn(mock(Subscription.class));
  when(mockResponsePublication.isConnected()).thenReturn(TRUE);
}

代码示例来源:origin: real-logic/aeron

@Test(timeout = 5000)
public void shouldBeAbleToQueryChannelStatusForSubscription()
{
  final Subscription subscription = clientA.addSubscription(URI, STREAM_ID);
  while (subscription.channelStatus() == ChannelEndpointStatus.INITIALIZING)
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
  assertThat(subscription.channelStatus(), is(ChannelEndpointStatus.ACTIVE));
}

代码示例来源:origin: real-logic/aeron

@Test(timeout = 10_000)
public void shouldSpinUpAndShutdownWithDynamic()
{
  launch();
  publication = clientA.addPublication(PUB_MDC_DYNAMIC_URI, STREAM_ID);
  subscriptionA = clientA.addSubscription(SUB1_MDC_DYNAMIC_URI, STREAM_ID);
  subscriptionB = clientB.addSubscription(SUB2_MDC_DYNAMIC_URI, STREAM_ID);
  subscriptionC = clientA.addSubscription(SUB3_MDC_DYNAMIC_URI, STREAM_ID);
  while (subscriptionA.hasNoImages() || subscriptionB.hasNoImages() || subscriptionC.hasNoImages())
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
}

代码示例来源:origin: real-logic/aeron

@Test
public void shouldCatchErrorOnAddressAlreadyInUseForSubscriptions()
{
  final Subscription subscriptionA = clientA.addSubscription(URI, STREAM_ID);
  while (subscriptionA.channelStatus() == ChannelEndpointStatus.INITIALIZING)
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
  assertThat(subscriptionA.channelStatus(), is(ChannelEndpointStatus.ACTIVE));
  final Subscription subscriptionB = clientB.addSubscription(URI, STREAM_ID);
  final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
  verify(errorHandlerClientB, timeout(5000)).onError(captor.capture());
  assertThat(captor.getValue(), instanceOf(ChannelEndpointException.class));
  final ChannelEndpointException channelEndpointException = (ChannelEndpointException)captor.getValue();
  final long status = clientB.countersReader().getCounterValue(channelEndpointException.statusIndicatorId());
  assertThat(status, is(ChannelEndpointStatus.ERRORED));
  assertThat(errorCounter.get(), greaterThan(0));
  assertThat(subscriptionB.channelStatusId(), is(channelEndpointException.statusIndicatorId()));
  assertThat(subscriptionA.channelStatus(), is(ChannelEndpointStatus.ACTIVE));
}

代码示例来源:origin: real-logic/aeron

@Test(timeout = 10_000)
public void shouldSpinUpAndShutdown()
{
  launch();
  subscriptionA = clientA.addSubscription(MULTICAST_URI, STREAM_ID);
  subscriptionB = clientB.addSubscription(MULTICAST_URI, STREAM_ID);
  publication = clientA.addPublication(MULTICAST_URI, STREAM_ID);
  while (!subscriptionA.isConnected() && !subscriptionB.isConnected())
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
}

代码示例来源:origin: real-logic/aeron

@Test(timeout = 10_000)
public void shouldSpinUpAndShutdown()
{
  launch();
  subscriptionA = clientA.addSubscription(MULTICAST_URI, STREAM_ID);
  subscriptionB = clientB.addSubscription(MULTICAST_URI, STREAM_ID);
  publication = clientA.addPublication(MULTICAST_URI, STREAM_ID);
  while (!subscriptionA.isConnected() || !subscriptionB.isConnected())
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
}

代码示例来源:origin: real-logic/aeron

@Test(timeout = 10_000)
public void shouldSpinUpAndShutdownWithManual()
{
  launch();
  subscriptionA = clientA.addSubscription(SUB1_MDC_MANUAL_URI, STREAM_ID);
  subscriptionB = clientB.addSubscription(SUB2_MDC_MANUAL_URI, STREAM_ID);
  subscriptionC = clientA.addSubscription(SUB3_MDC_MANUAL_URI, STREAM_ID);
  publication = clientA.addPublication(PUB_MDC_MANUAL_URI, STREAM_ID);
  publication.addDestination(SUB1_MDC_MANUAL_URI);
  publication.addDestination(SUB2_MDC_MANUAL_URI);
  while (subscriptionA.hasNoImages() || subscriptionB.hasNoImages() || subscriptionC.hasNoImages())
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
}

代码示例来源:origin: real-logic/aeron

@Test
  public void shouldThrowWhenReentering()
  {
    final MutableReference<Throwable> expectedException = new MutableReference<>();
    final ErrorHandler errorHandler = expectedException::set;

    try (Aeron aeron = Aeron.connect(new Aeron.Context().errorHandler(errorHandler)))
    {
      final String channel = CommonContext.IPC_CHANNEL;
      final AvailableImageHandler mockHandler = mock(AvailableImageHandler.class);
      doAnswer((invocation) -> aeron.addSubscription(channel, 3))
        .when(mockHandler).onAvailableImage(any(Image.class));

      final Subscription sub = aeron.addSubscription(channel, 1, mockHandler, null);
      final Publication pub = aeron.addPublication(channel, 1);

      verify(mockHandler, timeout(5000)).onAvailableImage(any(Image.class));

      pub.close();
      sub.close();

      assertThat(expectedException.get(), instanceOf(AeronException.class));
    }
  }
}

代码示例来源:origin: real-logic/aeron

private void loadSnapshot(final long recordingId)
{
  try (AeronArchive archive = AeronArchive.connect(archiveCtx))
  {
    final String channel = ctx.replayChannel();
    final int streamId = ctx.replayStreamId();
    final int sessionId = (int)archive.startReplay(recordingId, 0, NULL_VALUE, channel, streamId);
    final String replaySessionChannel = ChannelUri.addSessionId(channel, sessionId);
    try (Subscription subscription = aeron.addSubscription(replaySessionChannel, streamId))
    {
      final Image image = awaitImage(sessionId, subscription);
      loadState(image);
      service.onLoadSnapshot(image);
    }
  }
}

代码示例来源:origin: real-logic/aeron

private void checkForReplay(final CountersReader counters, final int recoveryCounterId)
{
  if (RecoveryState.hasReplay(counters, recoveryCounterId))
  {
    awaitActiveLog();
    try (Subscription subscription = aeron.addSubscription(activeLogEvent.channel, activeLogEvent.streamId))
    {
      consensusModuleProxy.ack(activeLogEvent.logPosition, ackId++, serviceId);
      final Image image = awaitImage(activeLogEvent.sessionId, subscription);
      final BoundedLogAdapter adapter = new BoundedLogAdapter(image, commitPosition, this);
      consumeImage(image, adapter, activeLogEvent.maxLogPosition);
    }
    activeLogEvent = null;
    heartbeatCounter.setOrdered(epochClock.time());
  }
}

代码示例来源:origin: real-logic/aeron

@Test(timeout = 10_000)
public void shouldSpinUpAndShutdownWithMulticast()
{
  launch();
  subscription = clientA.addSubscription(SUB_URI, STREAM_ID);
  subscription.addDestination(PUB_MULTICAST_URI);
  publicationA = clientA.addPublication(PUB_MULTICAST_URI, STREAM_ID);
  while (subscription.hasNoImages())
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
}

代码示例来源:origin: real-logic/aeron

@Test(timeout = 10_000)
public void shouldSpinUpAndShutdownWithUnicast()
{
  launch();
  subscription = clientA.addSubscription(SUB_URI, STREAM_ID);
  subscription.addDestination(PUB_UNICAST_URI);
  publicationA = clientA.addPublication(PUB_UNICAST_URI, STREAM_ID);
  while (subscription.hasNoImages())
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
}

代码示例来源:origin: real-logic/aeron

@Test(timeout = 10_000)
public void shouldSpinUpAndShutdownWithDynamicMdc()
{
  launch();
  subscription = clientA.addSubscription(SUB_URI, STREAM_ID);
  subscription.addDestination(SUB_MDC_DESTINATION_URI);
  publicationA = clientA.addPublication(PUB_MDC_URI, STREAM_ID);
  while (subscription.hasNoImages())
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
}

代码示例来源:origin: real-logic/aeron

private void launch(final String channel)
{
  context
    .threadingMode(THREADING_MODE)
    .errorHandler(Throwable::printStackTrace)
    .publicationConnectionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(500))
    .timerIntervalNs(TimeUnit.MILLISECONDS.toNanos(100));
  driver = MediaDriver.launch(context);
  subscribingClient = Aeron.connect();
  publishingClient = Aeron.connect();
  subscription = subscribingClient.addSubscription(channel, STREAM_ID);
  publication = publishingClient.addPublication(channel, STREAM_ID);
}

代码示例来源:origin: real-logic/aeron

@Before
public void before()
{
  driver = MediaDriver.launch(
    new MediaDriver.Context()
      .errorHandler(Throwable::printStackTrace)
      .publicationTermBufferLength(LogBufferDescriptor.TERM_MIN_LENGTH)
      .threadingMode(ThreadingMode.SHARED));
  pingClient = Aeron.connect();
  pongClient = Aeron.connect();
  pingSubscription = pongClient.addSubscription(PING_URI, PING_STREAM_ID);
  pingPublication = pingClient.addPublication(PING_URI, PING_STREAM_ID);
  pongSubscription = pingClient.addSubscription(PONG_URI, PONG_STREAM_ID);
  pongPublication = pongClient.addPublication(PONG_URI, PONG_STREAM_ID);
}

代码示例来源:origin: real-logic/aeron

@Theory
@Test(timeout = 10_000)
public void shouldNotSimulateConnectionWhenNotConfiguredTo(final String channel)
{
  launch();
  spy = client.addSubscription(spyForChannel(channel), STREAM_ID);
  publication = client.addPublication(channel, STREAM_ID);
  while (!spy.isConnected())
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
  assertFalse(publication.isConnected());
}

代码示例来源:origin: real-logic/aeron

ClusteredServiceAgent(final ClusteredServiceContainer.Context ctx)
{
  this.ctx = ctx;
  archiveCtx = ctx.archiveContext();
  aeron = ctx.aeron();
  service = ctx.clusteredService();
  idleStrategy = ctx.idleStrategy();
  serviceId = ctx.serviceId();
  epochClock = ctx.epochClock();
  markFile = ctx.clusterMarkFile();
  final String channel = ctx.serviceControlChannel();
  consensusModuleProxy = new ConsensusModuleProxy(aeron.addPublication(channel, ctx.consensusModuleStreamId()));
  serviceAdapter = new ServiceAdapter(aeron.addSubscription(channel, ctx.serviceStreamId()), this);
  egressMessageHeaderEncoder.wrapAndApplyHeader(headerBuffer, 0, new MessageHeaderEncoder());
}

代码示例来源:origin: real-logic/aeron

@Test(timeout = 10_000)
public void shouldSendToSingleDestinationSubscriptionWithUnicast()
{
  final int numMessagesToSend = NUM_MESSAGES_PER_TERM * 3;
  launch();
  subscription = clientA.addSubscription(SUB_URI, STREAM_ID);
  subscription.addDestination(PUB_UNICAST_URI);
  publicationA = clientA.addPublication(PUB_UNICAST_URI, STREAM_ID);
  while (subscription.hasNoImages())
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
  for (int i = 0; i < numMessagesToSend; i++)
  {
    while (publicationA.offer(buffer, 0, buffer.capacity()) < 0L)
    {
      SystemTest.checkInterruptedStatus();
      Thread.yield();
    }
    final MutableInteger fragmentsRead = new MutableInteger();
    pollForFragment(subscription, fragmentHandler, fragmentsRead);
  }
  verifyFragments(fragmentHandler, numMessagesToSend);
}

代码示例来源:origin: real-logic/aeron

private void launch(final String channelOne, final int streamOne, final String channelTwo, final int streamTwo)
{
  driverOne = MediaDriver.launchEmbedded(
    new MediaDriver.Context()
      .errorHandler(Throwable::printStackTrace)
      .termBufferSparseFile(true));
  driverTwo = MediaDriver.launchEmbedded(
    new MediaDriver.Context()
      .errorHandler(Throwable::printStackTrace)
      .termBufferSparseFile(true));
  publisherOne = Aeron.connect(new Aeron.Context().aeronDirectoryName(driverOne.aeronDirectoryName()));
  subscriberOne = Aeron.connect(new Aeron.Context().aeronDirectoryName(driverTwo.aeronDirectoryName()));
  publisherTwo = Aeron.connect(new Aeron.Context().aeronDirectoryName(driverOne.aeronDirectoryName()));
  subscriberTwo = Aeron.connect(new Aeron.Context().aeronDirectoryName(driverTwo.aeronDirectoryName()));
  subscriptionOne = subscriberOne.addSubscription(channelOne, streamOne);
  subscriptionTwo = subscriberTwo.addSubscription(channelTwo, streamTwo);
  publicationOne = publisherOne.addPublication(channelOne, streamOne);
  publicationTwo = publisherTwo.addPublication(channelTwo, streamTwo);
}

相关文章