本文整理了Java中io.aeron.Aeron.addSubscription()
方法的一些代码示例,展示了Aeron.addSubscription()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Aeron.addSubscription()
方法的具体详情如下:
包路径:io.aeron.Aeron
类名称:Aeron
方法名:addSubscription
[英]Add a new Subscription for subscribing to messages from publishers.
The method will set up the Subscription to use the Aeron.Context#availableImageHandler(AvailableImageHandler) and Aeron.Context#unavailableImageHandler(UnavailableImageHandler) from the Aeron.Context.
[中]添加新订阅以订阅来自发布者的邮件。
该方法将设置订阅以使用Aeron。上下文#availableImageHandler(availableImageHandler)和Aeron。上下文#来自Aeron的unavailableImageHandler(unavailableImageHandler)。上下文
代码示例来源:origin: real-logic/aeron
Subscription createAndRecordLogSubscriptionAsFollower(final String logChannel)
{
closeExistingLog();
final Subscription subscription = aeron.addSubscription(logChannel, ctx.logStreamId());
startLogRecording(logChannel, SourceLocation.REMOTE);
return subscription;
}
代码示例来源:origin: real-logic/aeron
@Before
public void before()
{
when(mockAeron.conductorAgentInvoker()).thenReturn(mock(AgentInvoker.class));
when(mockEgressPublisher.sendEvent(any(), anyLong(), anyInt(), any(), any())).thenReturn(TRUE);
when(mockLogPublisher.appendSessionClose(any(), anyLong(), anyLong())).thenReturn(TRUE);
when(mockLogPublisher.appendSessionOpen(any(), anyLong(), anyLong())).thenReturn(128L);
when(mockLogPublisher.appendClusterAction(anyLong(), anyLong(), anyLong(), any(ClusterAction.class)))
.thenReturn(TRUE);
when(mockAeron.addPublication(anyString(), anyInt())).thenReturn(mockResponsePublication);
when(mockAeron.addSubscription(anyString(), anyInt())).thenReturn(mock(Subscription.class));
when(mockAeron.addSubscription(anyString(), anyInt(), eq(null), any(UnavailableImageHandler.class)))
.thenReturn(mock(Subscription.class));
when(mockResponsePublication.isConnected()).thenReturn(TRUE);
}
代码示例来源:origin: real-logic/aeron
@Test(timeout = 5000)
public void shouldBeAbleToQueryChannelStatusForSubscription()
{
final Subscription subscription = clientA.addSubscription(URI, STREAM_ID);
while (subscription.channelStatus() == ChannelEndpointStatus.INITIALIZING)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}
assertThat(subscription.channelStatus(), is(ChannelEndpointStatus.ACTIVE));
}
代码示例来源:origin: real-logic/aeron
@Test(timeout = 10_000)
public void shouldSpinUpAndShutdownWithDynamic()
{
launch();
publication = clientA.addPublication(PUB_MDC_DYNAMIC_URI, STREAM_ID);
subscriptionA = clientA.addSubscription(SUB1_MDC_DYNAMIC_URI, STREAM_ID);
subscriptionB = clientB.addSubscription(SUB2_MDC_DYNAMIC_URI, STREAM_ID);
subscriptionC = clientA.addSubscription(SUB3_MDC_DYNAMIC_URI, STREAM_ID);
while (subscriptionA.hasNoImages() || subscriptionB.hasNoImages() || subscriptionC.hasNoImages())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}
}
代码示例来源:origin: real-logic/aeron
@Test
public void shouldCatchErrorOnAddressAlreadyInUseForSubscriptions()
{
final Subscription subscriptionA = clientA.addSubscription(URI, STREAM_ID);
while (subscriptionA.channelStatus() == ChannelEndpointStatus.INITIALIZING)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}
assertThat(subscriptionA.channelStatus(), is(ChannelEndpointStatus.ACTIVE));
final Subscription subscriptionB = clientB.addSubscription(URI, STREAM_ID);
final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
verify(errorHandlerClientB, timeout(5000)).onError(captor.capture());
assertThat(captor.getValue(), instanceOf(ChannelEndpointException.class));
final ChannelEndpointException channelEndpointException = (ChannelEndpointException)captor.getValue();
final long status = clientB.countersReader().getCounterValue(channelEndpointException.statusIndicatorId());
assertThat(status, is(ChannelEndpointStatus.ERRORED));
assertThat(errorCounter.get(), greaterThan(0));
assertThat(subscriptionB.channelStatusId(), is(channelEndpointException.statusIndicatorId()));
assertThat(subscriptionA.channelStatus(), is(ChannelEndpointStatus.ACTIVE));
}
代码示例来源:origin: real-logic/aeron
@Test(timeout = 10_000)
public void shouldSpinUpAndShutdown()
{
launch();
subscriptionA = clientA.addSubscription(MULTICAST_URI, STREAM_ID);
subscriptionB = clientB.addSubscription(MULTICAST_URI, STREAM_ID);
publication = clientA.addPublication(MULTICAST_URI, STREAM_ID);
while (!subscriptionA.isConnected() && !subscriptionB.isConnected())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}
}
代码示例来源:origin: real-logic/aeron
@Test(timeout = 10_000)
public void shouldSpinUpAndShutdown()
{
launch();
subscriptionA = clientA.addSubscription(MULTICAST_URI, STREAM_ID);
subscriptionB = clientB.addSubscription(MULTICAST_URI, STREAM_ID);
publication = clientA.addPublication(MULTICAST_URI, STREAM_ID);
while (!subscriptionA.isConnected() || !subscriptionB.isConnected())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}
}
代码示例来源:origin: real-logic/aeron
@Test(timeout = 10_000)
public void shouldSpinUpAndShutdownWithManual()
{
launch();
subscriptionA = clientA.addSubscription(SUB1_MDC_MANUAL_URI, STREAM_ID);
subscriptionB = clientB.addSubscription(SUB2_MDC_MANUAL_URI, STREAM_ID);
subscriptionC = clientA.addSubscription(SUB3_MDC_MANUAL_URI, STREAM_ID);
publication = clientA.addPublication(PUB_MDC_MANUAL_URI, STREAM_ID);
publication.addDestination(SUB1_MDC_MANUAL_URI);
publication.addDestination(SUB2_MDC_MANUAL_URI);
while (subscriptionA.hasNoImages() || subscriptionB.hasNoImages() || subscriptionC.hasNoImages())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}
}
代码示例来源:origin: real-logic/aeron
@Test
public void shouldThrowWhenReentering()
{
final MutableReference<Throwable> expectedException = new MutableReference<>();
final ErrorHandler errorHandler = expectedException::set;
try (Aeron aeron = Aeron.connect(new Aeron.Context().errorHandler(errorHandler)))
{
final String channel = CommonContext.IPC_CHANNEL;
final AvailableImageHandler mockHandler = mock(AvailableImageHandler.class);
doAnswer((invocation) -> aeron.addSubscription(channel, 3))
.when(mockHandler).onAvailableImage(any(Image.class));
final Subscription sub = aeron.addSubscription(channel, 1, mockHandler, null);
final Publication pub = aeron.addPublication(channel, 1);
verify(mockHandler, timeout(5000)).onAvailableImage(any(Image.class));
pub.close();
sub.close();
assertThat(expectedException.get(), instanceOf(AeronException.class));
}
}
}
代码示例来源:origin: real-logic/aeron
private void loadSnapshot(final long recordingId)
{
try (AeronArchive archive = AeronArchive.connect(archiveCtx))
{
final String channel = ctx.replayChannel();
final int streamId = ctx.replayStreamId();
final int sessionId = (int)archive.startReplay(recordingId, 0, NULL_VALUE, channel, streamId);
final String replaySessionChannel = ChannelUri.addSessionId(channel, sessionId);
try (Subscription subscription = aeron.addSubscription(replaySessionChannel, streamId))
{
final Image image = awaitImage(sessionId, subscription);
loadState(image);
service.onLoadSnapshot(image);
}
}
}
代码示例来源:origin: real-logic/aeron
private void checkForReplay(final CountersReader counters, final int recoveryCounterId)
{
if (RecoveryState.hasReplay(counters, recoveryCounterId))
{
awaitActiveLog();
try (Subscription subscription = aeron.addSubscription(activeLogEvent.channel, activeLogEvent.streamId))
{
consensusModuleProxy.ack(activeLogEvent.logPosition, ackId++, serviceId);
final Image image = awaitImage(activeLogEvent.sessionId, subscription);
final BoundedLogAdapter adapter = new BoundedLogAdapter(image, commitPosition, this);
consumeImage(image, adapter, activeLogEvent.maxLogPosition);
}
activeLogEvent = null;
heartbeatCounter.setOrdered(epochClock.time());
}
}
代码示例来源:origin: real-logic/aeron
@Test(timeout = 10_000)
public void shouldSpinUpAndShutdownWithMulticast()
{
launch();
subscription = clientA.addSubscription(SUB_URI, STREAM_ID);
subscription.addDestination(PUB_MULTICAST_URI);
publicationA = clientA.addPublication(PUB_MULTICAST_URI, STREAM_ID);
while (subscription.hasNoImages())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}
}
代码示例来源:origin: real-logic/aeron
@Test(timeout = 10_000)
public void shouldSpinUpAndShutdownWithUnicast()
{
launch();
subscription = clientA.addSubscription(SUB_URI, STREAM_ID);
subscription.addDestination(PUB_UNICAST_URI);
publicationA = clientA.addPublication(PUB_UNICAST_URI, STREAM_ID);
while (subscription.hasNoImages())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}
}
代码示例来源:origin: real-logic/aeron
@Test(timeout = 10_000)
public void shouldSpinUpAndShutdownWithDynamicMdc()
{
launch();
subscription = clientA.addSubscription(SUB_URI, STREAM_ID);
subscription.addDestination(SUB_MDC_DESTINATION_URI);
publicationA = clientA.addPublication(PUB_MDC_URI, STREAM_ID);
while (subscription.hasNoImages())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}
}
代码示例来源:origin: real-logic/aeron
private void launch(final String channel)
{
context
.threadingMode(THREADING_MODE)
.errorHandler(Throwable::printStackTrace)
.publicationConnectionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(500))
.timerIntervalNs(TimeUnit.MILLISECONDS.toNanos(100));
driver = MediaDriver.launch(context);
subscribingClient = Aeron.connect();
publishingClient = Aeron.connect();
subscription = subscribingClient.addSubscription(channel, STREAM_ID);
publication = publishingClient.addPublication(channel, STREAM_ID);
}
代码示例来源:origin: real-logic/aeron
@Before
public void before()
{
driver = MediaDriver.launch(
new MediaDriver.Context()
.errorHandler(Throwable::printStackTrace)
.publicationTermBufferLength(LogBufferDescriptor.TERM_MIN_LENGTH)
.threadingMode(ThreadingMode.SHARED));
pingClient = Aeron.connect();
pongClient = Aeron.connect();
pingSubscription = pongClient.addSubscription(PING_URI, PING_STREAM_ID);
pingPublication = pingClient.addPublication(PING_URI, PING_STREAM_ID);
pongSubscription = pingClient.addSubscription(PONG_URI, PONG_STREAM_ID);
pongPublication = pongClient.addPublication(PONG_URI, PONG_STREAM_ID);
}
代码示例来源:origin: real-logic/aeron
@Theory
@Test(timeout = 10_000)
public void shouldNotSimulateConnectionWhenNotConfiguredTo(final String channel)
{
launch();
spy = client.addSubscription(spyForChannel(channel), STREAM_ID);
publication = client.addPublication(channel, STREAM_ID);
while (!spy.isConnected())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}
assertFalse(publication.isConnected());
}
代码示例来源:origin: real-logic/aeron
ClusteredServiceAgent(final ClusteredServiceContainer.Context ctx)
{
this.ctx = ctx;
archiveCtx = ctx.archiveContext();
aeron = ctx.aeron();
service = ctx.clusteredService();
idleStrategy = ctx.idleStrategy();
serviceId = ctx.serviceId();
epochClock = ctx.epochClock();
markFile = ctx.clusterMarkFile();
final String channel = ctx.serviceControlChannel();
consensusModuleProxy = new ConsensusModuleProxy(aeron.addPublication(channel, ctx.consensusModuleStreamId()));
serviceAdapter = new ServiceAdapter(aeron.addSubscription(channel, ctx.serviceStreamId()), this);
egressMessageHeaderEncoder.wrapAndApplyHeader(headerBuffer, 0, new MessageHeaderEncoder());
}
代码示例来源:origin: real-logic/aeron
@Test(timeout = 10_000)
public void shouldSendToSingleDestinationSubscriptionWithUnicast()
{
final int numMessagesToSend = NUM_MESSAGES_PER_TERM * 3;
launch();
subscription = clientA.addSubscription(SUB_URI, STREAM_ID);
subscription.addDestination(PUB_UNICAST_URI);
publicationA = clientA.addPublication(PUB_UNICAST_URI, STREAM_ID);
while (subscription.hasNoImages())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}
for (int i = 0; i < numMessagesToSend; i++)
{
while (publicationA.offer(buffer, 0, buffer.capacity()) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}
final MutableInteger fragmentsRead = new MutableInteger();
pollForFragment(subscription, fragmentHandler, fragmentsRead);
}
verifyFragments(fragmentHandler, numMessagesToSend);
}
代码示例来源:origin: real-logic/aeron
private void launch(final String channelOne, final int streamOne, final String channelTwo, final int streamTwo)
{
driverOne = MediaDriver.launchEmbedded(
new MediaDriver.Context()
.errorHandler(Throwable::printStackTrace)
.termBufferSparseFile(true));
driverTwo = MediaDriver.launchEmbedded(
new MediaDriver.Context()
.errorHandler(Throwable::printStackTrace)
.termBufferSparseFile(true));
publisherOne = Aeron.connect(new Aeron.Context().aeronDirectoryName(driverOne.aeronDirectoryName()));
subscriberOne = Aeron.connect(new Aeron.Context().aeronDirectoryName(driverTwo.aeronDirectoryName()));
publisherTwo = Aeron.connect(new Aeron.Context().aeronDirectoryName(driverOne.aeronDirectoryName()));
subscriberTwo = Aeron.connect(new Aeron.Context().aeronDirectoryName(driverTwo.aeronDirectoryName()));
subscriptionOne = subscriberOne.addSubscription(channelOne, streamOne);
subscriptionTwo = subscriberTwo.addSubscription(channelTwo, streamTwo);
publicationOne = publisherOne.addPublication(channelOne, streamOne);
publicationTwo = publisherTwo.addPublication(channelTwo, streamTwo);
}
内容来源于网络,如有侵权,请联系作者删除!