本文整理了Java中io.aeron.Aeron.nextCorrelationId()
方法的一些代码示例,展示了Aeron.nextCorrelationId()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Aeron.nextCorrelationId()
方法的具体详情如下:
包路径:io.aeron.Aeron
类名称:Aeron
方法名:nextCorrelationId
[英]Generate the next correlation id that is unique for the connected Media Driver.
This is useful generating correlation identifiers for pairing requests with responses in a clients own application protocol.
This method is thread safe and will work across processes that all use the same media driver.
[中]生成下一个关联id,该id对于所连接的媒体驱动程序是唯一的。
这对于在客户端自己的应用程序协议中生成用于将请求与响应配对的相关标识符非常有用。
此方法是线程安全的,可以跨所有使用相同媒体驱动程序的进程工作。
代码示例来源:origin: real-logic/aeron
private int passiveFollower(final long nowMs)
{
if (nowMs > (timeOfLastActivityMs + intervalMs))
{
correlationId = ctx.aeron().nextCorrelationId();
if (memberStatusPublisher.snapshotRecordingQuery(clusterPublication, correlationId, memberId))
{
timeOfLastActivityMs = nowMs;
return 1;
}
}
return 0;
}
代码示例来源:origin: real-logic/aeron
/**
* Stop a replay session.
*
* @param replaySessionId to stop replay for.
*/
public void stopReplay(final long replaySessionId)
{
lock.lock();
try
{
ensureOpen();
final long correlationId = aeron.nextCorrelationId();
if (!archiveProxy.stopReplay(replaySessionId, correlationId, controlSessionId))
{
throw new ArchiveException("failed to send stop recording request");
}
pollForResponse(correlationId);
}
finally
{
lock.unlock();
}
}
代码示例来源:origin: real-logic/aeron
/**
* Get the stop position for a recording.
*
* @param recordingId of the active recording for which the position is required.
* @return the stop position, or {@link #NULL_POSITION} if still active.
* @see #getRecordingPosition(long)
*/
public long getStopPosition(final long recordingId)
{
lock.lock();
try
{
ensureOpen();
final long correlationId = aeron.nextCorrelationId();
if (!archiveProxy.getStopPosition(recordingId, correlationId, controlSessionId))
{
throw new ArchiveException("failed to send get stop position request");
}
return pollForResponse(correlationId);
}
finally
{
lock.unlock();
}
}
代码示例来源:origin: real-logic/aeron
/**
* Truncate a stopped recording to a given position that is less than the stopped position. The provided position
* must be on a fragment boundary. Truncating a recording to the start position effectively deletes the recording.
*
* @param recordingId of the stopped recording to be truncated.
* @param position to which the recording will be truncated.
*/
public void truncateRecording(final long recordingId, final long position)
{
lock.lock();
try
{
ensureOpen();
final long correlationId = aeron.nextCorrelationId();
if (!archiveProxy.truncateRecording(recordingId, position, correlationId, controlSessionId))
{
throw new ArchiveException("failed to send truncate recording request");
}
pollForResponse(correlationId);
}
finally
{
lock.unlock();
}
}
代码示例来源:origin: real-logic/aeron
/**
* Get the position recorded for an active recording. If no active recording then return {@link #NULL_POSITION}.
*
* @param recordingId of the active recording for which the position is required.
* @return the recorded position for the active recording or {@link #NULL_POSITION} if recording not active.
* @see #getStopPosition(long)
*/
public long getRecordingPosition(final long recordingId)
{
lock.lock();
try
{
ensureOpen();
final long correlationId = aeron.nextCorrelationId();
if (!archiveProxy.getRecordingPosition(recordingId, correlationId, controlSessionId))
{
throw new ArchiveException("failed to send get recording position request");
}
return pollForResponse(correlationId);
}
finally
{
lock.unlock();
}
}
代码示例来源:origin: real-logic/aeron
/**
* Stop recording for a subscriptionId that has been returned from
* {@link #startRecording(String, int, SourceLocation)} or
* {@link #extendRecording(long, String, int, SourceLocation)}.
*
* @param subscriptionId is the {@link Subscription#registrationId()} for the recording in the archive.
*/
public void stopRecording(final long subscriptionId)
{
lock.lock();
try
{
ensureOpen();
final long correlationId = aeron.nextCorrelationId();
if (!archiveProxy.stopRecording(subscriptionId, correlationId, controlSessionId))
{
throw new ArchiveException("failed to send stop recording request");
}
pollForResponse(correlationId);
}
finally
{
lock.unlock();
}
}
代码示例来源:origin: real-logic/aeron
/**
* Stop recording for a channel and stream pairing.
* <p>
* Channels that include sessionId parameters are considered different than channels without sessionIds. Stopping
* a recording on a channel without a sessionId parameter will not stop the recording of any sessionId specific
* recordings that use the same channel and streamId.
*
* @param channel to stop recording for.
* @param streamId to stop recording for.
*/
public void stopRecording(final String channel, final int streamId)
{
lock.lock();
try
{
ensureOpen();
final long correlationId = aeron.nextCorrelationId();
if (!archiveProxy.stopRecording(channel, streamId, correlationId, controlSessionId))
{
throw new ArchiveException("failed to send stop recording request");
}
pollForResponse(correlationId);
}
finally
{
lock.unlock();
}
}
代码示例来源:origin: real-logic/aeron
/**
* List all recording descriptors from a recording id with a limit of record count.
* <p>
* If the recording id is greater than the largest known id then nothing is returned.
*
* @param recordingId at which to begin the listing.
* @param consumer to which the descriptors are dispatched.
* @return the number of descriptors found and consumed.
*/
public int listRecording(final long recordingId, final RecordingDescriptorConsumer consumer)
{
lock.lock();
try
{
ensureOpen();
final long correlationId = aeron.nextCorrelationId();
if (!archiveProxy.listRecording(recordingId, correlationId, controlSessionId))
{
throw new ArchiveException("failed to send list recording request");
}
return pollForDescriptors(correlationId, 1, consumer);
}
finally
{
lock.unlock();
}
}
代码示例来源:origin: real-logic/aeron
/**
* Find the last recording that matches the given criteria.
*
* @param minRecordingId to search back to.
* @param channelFragment for a contains match on the original channel stored with the archive descriptor.
* @param streamId of the recording to match.
* @param sessionId of the recording to match.
* @return the recordingId if found otherwise {@link Aeron#NULL_VALUE} if not found.
*/
public long findLastMatchingRecording(
final long minRecordingId, final String channelFragment, final int streamId, final int sessionId)
{
lock.lock();
try
{
ensureOpen();
final long correlationId = aeron.nextCorrelationId();
if (!archiveProxy.findLastMatchingRecording(
minRecordingId, channelFragment, streamId, sessionId, correlationId, controlSessionId))
{
throw new ArchiveException("failed to send find last matching request");
}
return pollForResponse(correlationId);
}
finally
{
lock.unlock();
}
}
代码示例来源:origin: real-logic/aeron
/**
* List all recording descriptors from a recording id with a limit of record count.
* <p>
* If the recording id is greater than the largest known id then nothing is returned.
*
* @param fromRecordingId at which to begin the listing.
* @param recordCount to limit for each query.
* @param consumer to which the descriptors are dispatched.
* @return the number of descriptors found and consumed.
*/
public int listRecordings(
final long fromRecordingId, final int recordCount, final RecordingDescriptorConsumer consumer)
{
lock.lock();
try
{
ensureOpen();
final long correlationId = aeron.nextCorrelationId();
if (!archiveProxy.listRecordings(fromRecordingId, recordCount, correlationId, controlSessionId))
{
throw new ArchiveException("failed to send list recordings request");
}
return pollForDescriptors(correlationId, recordCount, consumer);
}
finally
{
lock.unlock();
}
}
代码示例来源:origin: real-logic/aeron
/**
* Start recording a channel and stream pairing.
* <p>
* Channels that include sessionId parameters are considered different than channels without sessionIds. If a
* publication matches both a sessionId specific channel recording and a non-sessionId specific recording, it will
* be recorded twice.
*
* @param channel to be recorded.
* @param streamId to be recorded.
* @param sourceLocation of the publication to be recorded.
* @return the subscriptionId, i.e. {@link Subscription#registrationId()}, of the recording.
*/
public long startRecording(final String channel, final int streamId, final SourceLocation sourceLocation)
{
lock.lock();
try
{
ensureOpen();
final long correlationId = aeron.nextCorrelationId();
if (!archiveProxy.startRecording(channel, streamId, sourceLocation, correlationId, controlSessionId))
{
throw new ArchiveException("failed to send start recording request");
}
return pollForResponse(correlationId);
}
finally
{
lock.unlock();
}
}
代码示例来源:origin: real-logic/aeron
public static void main(final String[] args) throws Exception
{
if (args.length != 1)
{
System.out.println("Filename to be sent must be supplied as a command line argument");
System.exit(1);
}
try (Aeron aeron = Aeron.connect();
Publication publication = aeron.addExclusivePublication(CHANNEL, STREAM_ID))
{
while (!publication.isConnected())
{
Thread.sleep(1);
}
final File file = new File(args[0]);
final UnsafeBuffer buffer = new UnsafeBuffer(IoUtil.mapExistingFile(file, "sending"));
final long correlationId = aeron.nextCorrelationId();
sendFileCreate(publication, correlationId, buffer.capacity(), file.getName());
streamChunks(publication, correlationId, buffer);
}
}
代码示例来源:origin: real-logic/aeron
final long correlationId = aeron.nextCorrelationId();
代码示例来源:origin: real-logic/aeron
final long correlationId = aeron.nextCorrelationId();
代码示例来源:origin: real-logic/aeron
final long correlationId = aeron.nextCorrelationId();
代码示例来源:origin: real-logic/aeron
private int init(final long nowMs)
{
if (nowMs > (timeOfLastActivityMs + intervalMs))
{
clusterMembersStatusEndpointsCursor = Math.min(
clusterMembersStatusEndpointsCursor + 1, clusterMemberStatusEndpoints.length - 1);
CloseHelper.close(clusterPublication);
final ChannelUri memberStatusUri = ChannelUri.parse(ctx.memberStatusChannel());
memberStatusUri.put(ENDPOINT_PARAM_NAME, clusterMemberStatusEndpoints[clusterMembersStatusEndpointsCursor]);
clusterPublication = ctx.aeron().addExclusivePublication(
memberStatusUri.toString(), ctx.memberStatusStreamId());
correlationId = ctx.aeron().nextCorrelationId();
if (memberStatusPublisher.addPassiveMember(clusterPublication, correlationId, memberEndpoints))
{
timeOfLastActivityMs = nowMs;
return 1;
}
}
return 0;
}
代码示例来源:origin: real-logic/aeron
private long sendChallengeResponse(final long sessionId, final byte[] encodedCredentials, final long deadlineNs)
{
final long correlationId = aeron.nextCorrelationId();
final ChallengeResponseEncoder challengeResponseEncoder = new ChallengeResponseEncoder();
final ExpandableArrayBuffer buffer = new ExpandableArrayBuffer();
challengeResponseEncoder
.wrapAndApplyHeader(buffer, 0, messageHeaderEncoder)
.correlationId(correlationId)
.clusterSessionId(sessionId)
.putEncodedCredentials(encodedCredentials, 0, encodedCredentials.length);
idleStrategy.reset();
while (true)
{
final long result = publication.offer(buffer);
if (result > 0)
{
break;
}
checkResult(result);
checkDeadline(deadlineNs, "failed to connect to cluster");
idleStrategy.idle();
}
return correlationId;
}
代码示例来源:origin: real-logic/aeron
private long sendConnectRequest(
final Publication publication, final byte[] encodedCredentials, final long deadlineNs)
{
final long correlationId = aeron.nextCorrelationId();
final SessionConnectRequestEncoder sessionConnectRequestEncoder = new SessionConnectRequestEncoder();
final ExpandableArrayBuffer buffer = new ExpandableArrayBuffer();
sessionConnectRequestEncoder
.wrapAndApplyHeader(buffer, 0, messageHeaderEncoder)
.correlationId(correlationId)
.responseStreamId(ctx.egressStreamId())
.responseChannel(ctx.egressChannel())
.putEncodedCredentials(encodedCredentials, 0, encodedCredentials.length);
idleStrategy.reset();
while (true)
{
final long result = publication.offer(buffer);
if (result > 0)
{
break;
}
if (Publication.CLOSED == result)
{
throw new ClusterException("unexpected close from cluster");
}
checkDeadline(deadlineNs, "failed to connect to cluster");
idleStrategy.idle();
}
return correlationId;
}
代码示例来源:origin: real-logic/aeron
public static boolean removeMember(
final ClusterMarkFile markFile,
final int memberId,
final boolean isPassive)
{
final String aeronDirectoryName = markFile.decoder().aeronDirectory();
final String archiveChannel = markFile.decoder().archiveChannel();
final String channel = markFile.decoder().serviceControlChannel();
final int toServiceStreamId = markFile.decoder().serviceStreamId();
final int toConsensusModuleStreamId = markFile.decoder().consensusModuleStreamId();
try (Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(aeronDirectoryName));
ConsensusModuleProxy consensusModuleProxy = new ConsensusModuleProxy(
aeron.addPublication(channel, toConsensusModuleStreamId)))
{
if (consensusModuleProxy.removeMember(
aeron.nextCorrelationId(), memberId, isPassive ? BooleanType.TRUE : BooleanType.FALSE))
{
return true;
}
}
return false;
}
代码示例来源:origin: real-logic/aeron
publication, idleStrategy, nanoClock, messageTimeoutNs, DEFAULT_RETRY_ATTEMPTS);
final long correlationId = aeron.nextCorrelationId();
if (!archiveProxy.connect(
ctx.controlResponseChannel(), ctx.controlResponseStreamId(), correlationId, aeronClientInvoker))
内容来源于网络,如有侵权,请联系作者删除!