io.aeron.Aeron类的使用及代码示例

x33g5p2x  于2022-01-16 转载在 其他  
字(14.1k)|赞(0)|评价(0)|浏览(217)

本文整理了Java中io.aeron.Aeron类的一些代码示例,展示了Aeron类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Aeron类的具体详情如下:
包路径:io.aeron.Aeron
类名称:Aeron

Aeron介绍

[英]Aeron entry point for communicating to the Media Driver for creating Publications and Subscriptions. Use an Aeron.Context to configure the Aeron object.

A client application requires only one Aeron object per Media Driver.

Note: If Aeron.Context#errorHandler(ErrorHandler) is not set and a DriverTimeoutExceptionoccurs then the process will face the wrath of System#exit(int). See Aeron.Configuration#DEFAULT_ERROR_HANDLER.
[中]Aeron入口点,用于与媒体驱动程序通信以创建发布和订阅。使用Aeron。上下文来配置Aeron对象。
客户端应用程序每个媒体驱动程序只需要一个Aeron对象。
注:如果是Aeron。上下文#errorHandler(errorHandler)未设置,并且发生DriverTimeOutExceptionOcception,则进程将面临系统#退出(int)的愤怒。见艾伦。配置#默认_错误_处理程序。

代码示例

代码示例来源:origin: real-logic/aeron

@Before
public void before()
{
  driver = MediaDriver.launch(
    new MediaDriver.Context()
      .errorHandler(Throwable::printStackTrace)
      .publicationTermBufferLength(LogBufferDescriptor.TERM_MIN_LENGTH)
      .threadingMode(ThreadingMode.SHARED));
  pingClient = Aeron.connect();
  pongClient = Aeron.connect();
  pingSubscription = pongClient.addSubscription(PING_URI, PING_STREAM_ID);
  pingPublication = pingClient.addPublication(PING_URI, PING_STREAM_ID);
  pongSubscription = pingClient.addSubscription(PONG_URI, PONG_STREAM_ID);
  pongPublication = pongClient.addPublication(PONG_URI, PONG_STREAM_ID);
}

代码示例来源:origin: real-logic/aeron

public static void main(final String[] args) throws Exception
{
  loadPropertiesFiles(args);
  final AtomicBoolean running = new AtomicBoolean(true);
  SigInt.register(() -> running.set(false));
  final MediaDriver.Context ctx = new MediaDriver.Context()
    .threadingMode(ThreadingMode.SHARED)
    .sharedIdleStrategy(new NoOpIdleStrategy());
  try (MediaDriver ignore = MediaDriver.launch(ctx);
    Aeron aeron = Aeron.connect();
    Publication publication = aeron.addExclusivePublication(CHANNEL, STREAM_ID);
    Subscription subscription = aeron.addSubscription(CHANNEL, STREAM_ID))
  {
    final Subscriber subscriber = new Subscriber(running, subscription);
    final Thread subscriberThread = new Thread(subscriber);
    subscriberThread.setName("subscriber");
    final Thread publisherThread = new Thread(new Publisher(running, publication));
    publisherThread.setName("publisher");
    final Thread rateReporterThread = new Thread(new RateReporter(running, subscriber));
    rateReporterThread.setName("rate-reporter");
    rateReporterThread.start();
    subscriberThread.start();
    publisherThread.start();
    subscriberThread.join();
    publisherThread.join();
    rateReporterThread.join();
  }
}

代码示例来源:origin: real-logic/aeron

@Test
public void shouldCatchErrorOnAddressAlreadyInUseForSubscriptions()
{
  final Subscription subscriptionA = clientA.addSubscription(URI, STREAM_ID);
  while (subscriptionA.channelStatus() == ChannelEndpointStatus.INITIALIZING)
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
  assertThat(subscriptionA.channelStatus(), is(ChannelEndpointStatus.ACTIVE));
  final Subscription subscriptionB = clientB.addSubscription(URI, STREAM_ID);
  final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
  verify(errorHandlerClientB, timeout(5000)).onError(captor.capture());
  assertThat(captor.getValue(), instanceOf(ChannelEndpointException.class));
  final ChannelEndpointException channelEndpointException = (ChannelEndpointException)captor.getValue();
  final long status = clientB.countersReader().getCounterValue(channelEndpointException.statusIndicatorId());
  assertThat(status, is(ChannelEndpointStatus.ERRORED));
  assertThat(errorCounter.get(), greaterThan(0));
  assertThat(subscriptionB.channelStatusId(), is(channelEndpointException.statusIndicatorId()));
  assertThat(subscriptionA.channelStatus(), is(ChannelEndpointStatus.ACTIVE));
}

代码示例来源:origin: real-logic/aeron

private Publication addIngressPublication(final String channel, final int streamId)
{
  if (ctx.isIngressExclusive())
  {
    return aeron.addExclusivePublication(channel, streamId);
  }
  else
  {
    return aeron.addPublication(channel, streamId);
  }
}

代码示例来源:origin: real-logic/aeron

public static void main(final String[] args)
  {
    if (args.length != 1)
    {
      System.out.format("Usage: SetControllableIdleStrategy <n>");
      System.exit(0);
    }

    try (Aeron aeron = Aeron.connect())
    {
      final CountersReader countersReader = aeron.countersReader();
      final StatusIndicator statusIndicator = StatusUtil.controllableIdleStrategy(countersReader);

      if (null != statusIndicator)
      {
        final int status = Integer.parseInt(args[0]);
        statusIndicator.setOrdered(status);
        System.out.println("Set ControllableIdleStrategy status to " + status);
      }
      else
      {
        System.out.println("Could not find ControllableIdleStrategy status.");
      }
    }
  }
}

代码示例来源:origin: real-logic/aeron

public static void main(final String[] args) throws Exception
{
  if (args.length != 1)
  {
    System.out.println("Filename to be sent must be supplied as a command line argument");
    System.exit(1);
  }
  try (Aeron aeron = Aeron.connect();
    Publication publication = aeron.addExclusivePublication(CHANNEL, STREAM_ID))
  {
    while (!publication.isConnected())
    {
      Thread.sleep(1);
    }
    final File file = new File(args[0]);
    final UnsafeBuffer buffer = new UnsafeBuffer(IoUtil.mapExistingFile(file, "sending"));
    final long correlationId = aeron.nextCorrelationId();
    sendFileCreate(publication, correlationId, buffer.capacity(), file.getName());
    streamChunks(publication, correlationId, buffer);
  }
}

代码示例来源:origin: real-logic/aeron

@Test(timeout = 10_000)
public void shouldSpinUpAndShutdownWithDynamic()
{
  launch();
  publication = clientA.addPublication(PUB_MDC_DYNAMIC_URI, STREAM_ID);
  subscriptionA = clientA.addSubscription(SUB1_MDC_DYNAMIC_URI, STREAM_ID);
  subscriptionB = clientB.addSubscription(SUB2_MDC_DYNAMIC_URI, STREAM_ID);
  subscriptionC = clientA.addSubscription(SUB3_MDC_DYNAMIC_URI, STREAM_ID);
  while (subscriptionA.hasNoImages() || subscriptionB.hasNoImages() || subscriptionC.hasNoImages())
  {
    SystemTest.checkInterruptedStatus();
    Thread.yield();
  }
}

代码示例来源:origin: real-logic/aeron

@Test(timeout = 10_000)
public void recordAndReplayExclusivePublication()
{
  final String controlChannel = archive.context().localControlChannel();
  final int controlStreamId = archive.context().localControlStreamId();
  final String recordingChannel = archive.context().recordingEventsChannel();
  final int recordingStreamId = archive.context().recordingEventsStreamId();
  final Publication controlPublication = client.addPublication(controlChannel, controlStreamId);
  final Subscription recordingEvents = client.addSubscription(recordingChannel, recordingStreamId);
  final ArchiveProxy archiveProxy = new ArchiveProxy(controlPublication);
  prePublicationActionsAndVerifications(archiveProxy, controlPublication, recordingEvents);
  final ExclusivePublication recordedPublication =
    client.addExclusivePublication(publishUri, PUBLISH_STREAM_ID);
  final int sessionId = recordedPublication.sessionId();
  final int termBufferLength = recordedPublication.termBufferLength();
  final int initialTermId = recordedPublication.initialTermId();
  final int maxPayloadLength = recordedPublication.maxPayloadLength();
  final long startPosition = recordedPublication.position();
  assertThat(startPosition, is(requestedStartPosition));
  assertThat(recordedPublication.initialTermId(), is(requestedInitialTermId));
  preSendChecks(archiveProxy, recordingEvents, sessionId, termBufferLength, startPosition);
  final int messageCount = prepAndSendMessages(recordingEvents, recordedPublication);
  postPublicationValidations(
    archiveProxy,
    recordingEvents,
    termBufferLength,
    initialTermId,
    maxPayloadLength,
    messageCount);
}

代码示例来源:origin: real-logic/aeron

private static Thread startPong(final String embeddedDirName)
{
  return new Thread(() ->
  {
    System.out.println("Subscribing Ping at " + PING_CHANNEL + " on stream Id " + PING_STREAM_ID);
    System.out.println("Publishing Pong at " + PONG_CHANNEL + " on stream Id " + PONG_STREAM_ID);
    final Aeron.Context ctx = new Aeron.Context().aeronDirectoryName(embeddedDirName);
    try (Aeron aeron = Aeron.connect(ctx);
      Subscription pingSubscription = aeron.addSubscription(PING_CHANNEL, PING_STREAM_ID);
      Publication pongPublication = EXCLUSIVE_PUBLICATIONS ?
        aeron.addExclusivePublication(PONG_CHANNEL, PONG_STREAM_ID) :
        aeron.addPublication(PONG_CHANNEL, PONG_STREAM_ID))
    {
      final FragmentAssembler dataHandler = new FragmentAssembler(
        (buffer, offset, length, header) -> pingHandler(pongPublication, buffer, offset, length));
      while (RUNNING.get())
      {
        PING_HANDLER_IDLE_STRATEGY.idle(pingSubscription.poll(dataHandler, FRAME_COUNT_LIMIT));
      }
      System.out.println("Shutting down...");
    }
  });
}

代码示例来源:origin: real-logic/aeron

@Before
public void before()
{
  when(mockAeron.conductorAgentInvoker()).thenReturn(mock(AgentInvoker.class));
  when(mockEgressPublisher.sendEvent(any(), anyLong(), anyInt(), any(), any())).thenReturn(TRUE);
  when(mockLogPublisher.appendSessionClose(any(), anyLong(), anyLong())).thenReturn(TRUE);
  when(mockLogPublisher.appendSessionOpen(any(), anyLong(), anyLong())).thenReturn(128L);
  when(mockLogPublisher.appendClusterAction(anyLong(), anyLong(), anyLong(), any(ClusterAction.class)))
    .thenReturn(TRUE);
  when(mockAeron.addPublication(anyString(), anyInt())).thenReturn(mockResponsePublication);
  when(mockAeron.addSubscription(anyString(), anyInt())).thenReturn(mock(Subscription.class));
  when(mockAeron.addSubscription(anyString(), anyInt(), eq(null), any(UnavailableImageHandler.class)))
    .thenReturn(mock(Subscription.class));
  when(mockResponsePublication.isConnected()).thenReturn(TRUE);
}

代码示例来源:origin: real-logic/aeron

aeronClientInvoker = aeron.conductorAgentInvoker();
idleStrategy = ctx.idleStrategy();
messageTimeoutNs = ctx.messageTimeoutNs();
lock = ctx.lock();
nanoClock = aeron.context().nanoClock();
subscription = aeron.addSubscription(ctx.controlResponseChannel(), ctx.controlResponseStreamId());
controlResponsePoller = new ControlResponsePoller(subscription);
publication = aeron.addExclusivePublication(ctx.controlRequestChannel(), ctx.controlRequestStreamId());
archiveProxy = new ArchiveProxy(
  publication, idleStrategy, nanoClock, messageTimeoutNs, DEFAULT_RETRY_ATTEMPTS);
final long correlationId = aeron.nextCorrelationId();
if (!archiveProxy.connect(
  ctx.controlResponseChannel(), ctx.controlResponseStreamId(), correlationId, aeronClientInvoker))

代码示例来源:origin: real-logic/aeron

/**
 * Create an Aeron instance and connect to the media driver with a default {@link Context}.
 * <p>
 * Threads required for interacting with the media driver are created and managed within the Aeron instance.
 *
 * @return the new {@link Aeron} instance connected to the Media Driver.
 */
public static Aeron connect()
{
  return connect(new Context());
}

代码示例来源:origin: real-logic/aeron

aeron = Aeron.connect(
    new Aeron.Context()
      .aeronDirectoryName(aeronDirectoryName)
    errorCounter = aeron.addCounter(SYSTEM_COUNTER_TYPE_ID, "Archive errors");
if (null == aeron.conductorAgentInvoker())
  if (ownsAeronClient)
    aeron.context().errorHandler(countedErrorHandler);

代码示例来源:origin: real-logic/aeron

ArchiveConductor(final Archive.Context ctx)
{
  super("archive-conductor", ctx.countedErrorHandler());
  this.ctx = ctx;
  aeron = ctx.aeron();
  aeronAgentInvoker = aeron.conductorAgentInvoker();
  driverAgentInvoker = ctx.mediaDriverAgentInvoker();
  epochClock = ctx.epochClock();
  archiveDir = ctx.archiveDir();
  archiveDirChannel = ctx.archiveDirChannel();
  maxConcurrentRecordings = ctx.maxConcurrentRecordings();
  maxConcurrentReplays = ctx.maxConcurrentReplays();
  connectTimeoutMs = TimeUnit.NANOSECONDS.toMillis(ctx.connectTimeoutNs());
  final ChannelUri controlChannelUri = ChannelUri.parse(ctx.controlChannel());
  controlChannelUri.put(CommonContext.SPARSE_PARAM_NAME, Boolean.toString(ctx.controlTermBufferSparse()));
  controlSubscription = aeron.addSubscription(controlChannelUri.toString(), ctx.controlStreamId(), this, null);
  localControlSubscription = aeron.addSubscription(
    ctx.localControlChannel(), ctx.localControlStreamId(), this, null);
  recordingEventsProxy = new RecordingEventsProxy(
    aeron.addExclusivePublication(ctx.recordingEventsChannel(), ctx.recordingEventsStreamId()));
  cachedEpochClock.update(epochClock.time());
  catalog = ctx.catalog();
  markFile = ctx.archiveMarkFile();
}

代码示例来源:origin: real-logic/aeron

try (Subscription subscription = aeron.addSubscription(RECORDING_CHANNEL, RECORDING_STREAM_ID);
  Publication publication = aeron.addPublication(RECORDING_CHANNEL, RECORDING_STREAM_ID))
  final CountersReader counters = aeron.countersReader();
  final int counterId = getRecordingCounterId(publication.sessionId(), counters);
  final long recordingId = RecordingPos.getRecordingId(counters, counterId);

代码示例来源:origin: real-logic/aeron

public static boolean removeMember(
  final ClusterMarkFile markFile,
  final int memberId,
  final boolean isPassive)
{
  final String aeronDirectoryName = markFile.decoder().aeronDirectory();
  final String archiveChannel = markFile.decoder().archiveChannel();
  final String channel = markFile.decoder().serviceControlChannel();
  final int toServiceStreamId = markFile.decoder().serviceStreamId();
  final int toConsensusModuleStreamId = markFile.decoder().consensusModuleStreamId();
  try (Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(aeronDirectoryName));
    ConsensusModuleProxy consensusModuleProxy = new ConsensusModuleProxy(
      aeron.addPublication(channel, toConsensusModuleStreamId)))
  {
    if (consensusModuleProxy.removeMember(
      aeron.nextCorrelationId(), memberId, isPassive ? BooleanType.TRUE : BooleanType.FALSE))
    {
      return true;
    }
  }
  return false;
}

代码示例来源:origin: real-logic/aeron

@Theory
  @Test
  public void shouldHaveCorrectTermBufferLength(final String channel)
  {
    final MediaDriver.Context ctx = new MediaDriver.Context()
      .errorHandler(Throwable::printStackTrace)
      .publicationTermBufferLength(TEST_TERM_LENGTH * 2)
      .ipcTermBufferLength(TEST_TERM_LENGTH * 2);

    try (MediaDriver ignore = MediaDriver.launch(ctx);
      Aeron aeron = Aeron.connect();
      Publication publication = aeron.addPublication(channel, STREAM_ID))
    {
      assertThat(publication.termBufferLength(), is(TEST_TERM_LENGTH));
    }
    finally
    {
      ctx.deleteAeronDirectory();
    }
  }
}

代码示例来源:origin: real-logic/aeron

/**
 * Print out the values from {@link #countersReader()} which can be useful for debugging.
 *
 *  @param out to where the counters get printed.
 */
public void printCounters(final PrintStream out)
{
  final CountersReader counters = countersReader();
  counters.forEach((value, id, label) -> out.format("%3d: %,20d - %s%n", id, value, label));
}

代码示例来源:origin: real-logic/aeron

Subscription createAndRecordLogSubscriptionAsFollower(final String logChannel)
{
  closeExistingLog();
  final Subscription subscription = aeron.addSubscription(logChannel, ctx.logStreamId());
  startLogRecording(logChannel, SourceLocation.REMOTE);
  return subscription;
}

代码示例来源:origin: real-logic/aeron

aeron = Aeron.connect(
  new Aeron.Context()
    .aeronDirectoryName(aeronDirectoryName)
errorCounter = aeron.addCounter(SYSTEM_COUNTER_TYPE_ID, "Cluster errors - service " + serviceId);
if (ownsAeronClient)
  aeron.context().errorHandler(countedErrorHandler);

相关文章