本文整理了Java中io.aeron.logbuffer.Header
类的一些代码示例,展示了Header
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Header
类的具体详情如下:
包路径:io.aeron.logbuffer.Header
类名称:Header
[英]Represents the header of the data frame for accessing meta data fields.
[中]表示用于访问元数据字段的数据帧的标头。
代码示例来源:origin: real-logic/aeron
(buffer, offset, length, header) ->
assertThat(header.type(), is(HeaderFlyweight.HDR_TYPE_DATA));
assertThat(header.termId(), is(ACTIVE_TERM_ID));
assertThat(header.streamId(), is(STREAM_ID));
assertThat(header.sessionId(), is(SESSION_ID));
assertThat(header.termOffset(), is(0));
assertThat(header.frameLength(), is(DataHeaderFlyweight.HEADER_LENGTH + FAKE_PAYLOAD.length));
},
Integer.MAX_VALUE,
代码示例来源:origin: real-logic/aeron
@Test
public void shouldAssembleTwoPartMessage()
{
when(header.flags())
.thenReturn(FrameDescriptor.BEGIN_FRAG_FLAG)
.thenReturn(FrameDescriptor.END_FRAG_FLAG);
final UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[1024]);
final int offset = 0;
final int length = srcBuffer.capacity() / 2;
srcBuffer.setMemory(0, length, (byte)65);
srcBuffer.setMemory(length, length, (byte)66);
adapter.onFragment(srcBuffer, offset, length, header);
adapter.onFragment(srcBuffer, length, length, header);
final ArgumentCaptor<UnsafeBuffer> bufferArg = ArgumentCaptor.forClass(UnsafeBuffer.class);
final ArgumentCaptor<Header> headerArg = ArgumentCaptor.forClass(Header.class);
verify(delegateFragmentHandler, times(1)).onFragment(
bufferArg.capture(), eq(offset), eq(length * 2), headerArg.capture());
final UnsafeBuffer capturedBuffer = bufferArg.getValue();
for (int i = 0; i < srcBuffer.capacity(); i++)
{
assertThat("same at i=" + i, capturedBuffer.getByte(i), is(srcBuffer.getByte(i)));
}
final Header capturedHeader = headerArg.getValue();
assertThat(capturedHeader.sessionId(), is(SESSION_ID));
assertThat(capturedHeader.flags(), is(FrameDescriptor.END_FRAG_FLAG));
}
代码示例来源:origin: real-logic/aeron
int offset = termOffset;
final int capacity = termBuffer.capacity();
header.buffer(termBuffer);
header.offset(frameOffset);
handler.onFragment(termBuffer, frameOffset + HEADER_LENGTH, frameLength - HEADER_LENGTH, header);
代码示例来源:origin: real-logic/aeron
/**
* Get the current position to which the image has advanced on reading this message.
*
* @return the current position to which the image has advanced on reading this message.
*/
public final long position()
{
final int resultingOffset = BitUtil.align(termOffset() + frameLength(), FRAME_ALIGNMENT);
return computePosition(termId(), resultingOffset, positionBitsToShift, initialTermId);
}
代码示例来源:origin: real-logic/aeron
/**
* Return a reusable, parameterized {@link FragmentHandler} that prints to stdout for the first stream(STREAM)
*
* @param streamId to show when printing
* @return subscription data handler function that prints the message contents
*/
public static FragmentHandler reassembledStringMessage1(final int streamId)
{
return (buffer, offset, length, header) ->
{
final byte[] data = new byte[length];
buffer.getBytes(offset, data);
System.out.format(
"message to stream %d from session %x term id %x term offset %d (%d@%d)%n",
streamId, header.sessionId(), header.termId(), header.termOffset(), length, offset);
if (length != 10000)
{
System.out.format(
"Received message was not assembled properly;" +
" received length was %d, but was expecting 10000%n",
length);
}
};
}
代码示例来源:origin: real-logic/aeron
final UnsafeBuffer termBuffer = activeTermBuffer(initialPosition);
final int capacity = termBuffer.capacity();
header.buffer(termBuffer);
long resultingPosition = initialPosition;
header.offset(frameOffset);
initialOffset = offset;
if ((header.flags() & END_FRAG_FLAG) == END_FRAG_FLAG)
代码示例来源:origin: real-logic/aeron
/**
* Return a reusable, parametrised {@link FragmentHandler} that prints to stdout
*
* @param streamId to show when printing
* @return subscription data handler function that prints the message contents
*/
public static FragmentHandler printStringMessage(final int streamId)
{
return (buffer, offset, length, header) ->
{
final byte[] data = new byte[length];
buffer.getBytes(offset, data);
System.out.println(String.format(
"Message to stream %d from session %d (%d@%d) <<%s>>",
streamId, header.sessionId(), length, offset, new String(data)));
};
}
代码示例来源:origin: real-logic/aeron
/**
* The implementation of {@link FragmentHandler} that reassembles and forwards whole messages.
*
* @param buffer containing the data.
* @param offset at which the data begins.
* @param length of the data in bytes.
* @param header representing the meta data for the data.
*/
public void onFragment(final DirectBuffer buffer, final int offset, final int length, final Header header)
{
final byte flags = header.flags();
if ((flags & UNFRAGMENTED) == UNFRAGMENTED)
{
delegate.onFragment(buffer, offset, length, header);
}
else
{
handleFragment(buffer, offset, length, header, flags);
}
}
代码示例来源:origin: real-logic/artio
final Header header)
final int streamId = header.streamId();
final long endPosition = header.position();
final int aeronSessionId = header.sessionId();
if ((header.flags() & BEGIN_FLAG) != BEGIN_FLAG)
代码示例来源:origin: real-logic/artio
private Action quiesceFragment(final DirectBuffer buffer, final int offset, final int length, final Header header)
{
if (completedPosition(header.sessionId()) <= header.position())
{
return onFragment(buffer, offset, length, header);
}
return CONTINUE;
}
代码示例来源:origin: real-logic/artio
public Action onFragment(final DirectBuffer buffer, final int offset, final int length, final Header header)
{
final int streamId = header.streamId();
final int aeronSessionId = header.sessionId();
final long endPosition = header.position();
DebugLogger.log(
LogTag.INDEX,
"Indexing @ %d from [%d, %d]%n",
endPosition,
streamId,
aeronSessionId);
for (int i = 0, size = indices.size(); i < size; i++)
{
indices.get(i).onFragment(buffer, offset, length, header);
}
return CONTINUE;
}
代码示例来源:origin: real-logic/aeron
private void validateFragment(final DirectBuffer buffer, final int offset, final int length, final Header header)
{
int actual = buffer.getInt(offset, LITTLE_ENDIAN);
if (fragmentCount != actual)
{
throw new IllegalStateException("expected=" + fragmentCount + " actual=" + actual);
}
actual = buffer.getInt(offset + (length - 4), LITTLE_ENDIAN);
if (fragmentCount != actual)
{
throw new IllegalStateException("expected=" + fragmentCount + " actual=" + actual);
}
remaining -= length;
fragmentCount++;
receivedPosition = header.position();
}
代码示例来源:origin: real-logic/artio
final Header header)
final int streamId = header.streamId();
final long endPosition = header.position();
final byte flags = header.flags();
final int length = BitUtil.align(srcLength, FRAME_ALIGNMENT);
代码示例来源:origin: real-logic/aeron
final Header header = new Header(INITIAL_TERM_ID, Integer.numberOfLeadingZeros(TERM_BUFFER_LENGTH));
header.buffer(buffer);
代码示例来源:origin: real-logic/aeron
@Before
public void setUp()
{
header.buffer(termBuffer);
when(termBuffer.getInt(anyInt(), any(ByteOrder.class))).thenReturn(SESSION_ID);
}
代码示例来源:origin: real-logic/aeron
this.positionBitsToShift = LogBufferDescriptor.positionBitsToShift(termLength);
this.initialTermId = LogBufferDescriptor.initialTermId(logBuffers.metaDataBuffer());
header = new Header(initialTermId, positionBitsToShift, this);
代码示例来源:origin: real-logic/aeron
private void recordFragment(
final RecordingWriter recordingWriter,
final UnsafeBuffer buffer,
final DataHeaderFlyweight headerFlyweight,
final Header header,
final int message,
final byte flags,
final int type)
{
final int offset = INITIAL_TERM_OFFSET + message * FRAME_LENGTH;
headerFlyweight.wrap(buffer, offset, HEADER_LENGTH);
headerFlyweight
.streamId(STREAM_ID)
.sessionId(SESSION_ID)
.termOffset(offset)
.termId(INITIAL_TERM_ID)
.reservedValue(message)
.headerType(type)
.flags(flags)
.frameLength(FRAME_LENGTH);
buffer.setMemory(
offset + HEADER_LENGTH,
FRAME_LENGTH - HEADER_LENGTH,
(byte)message);
header.offset(offset);
recordingWriter.onBlock(buffer, offset, FRAME_LENGTH, SESSION_ID, INITIAL_TERM_ID);
recordingPosition += FRAME_LENGTH;
}
代码示例来源:origin: real-logic/aeron
/**
* Return a reusable, parameterised {@link FragmentHandler} that prints to stdout for the second stream (STREAM + 1)
*
* @param streamId to show when printing
* @return subscription data handler function that prints the message contents
*/
public static FragmentHandler reassembledStringMessage2(final int streamId)
{
return (buffer, offset, length, header) ->
{
final byte[] data = new byte[length];
buffer.getBytes(offset, data);
System.out.format(
"message to stream %d from session %x term id %x term offset %d (%d@%d)%n",
streamId, header.sessionId(), header.termId(), header.termOffset(), length, offset);
if (length != 9000)
{
System.out.format(
"Received message was not assembled properly; received length was %d, but was expecting 9000%n",
length);
}
};
}
}
代码示例来源:origin: real-logic/aeron
private void handleFragment(
final DirectBuffer buffer, final int offset, final int length, final Header header, final byte flags)
{
if ((flags & BEGIN_FRAG_FLAG) == BEGIN_FRAG_FLAG)
{
final BufferBuilder builder = getBufferBuilder(header.sessionId());
builder.reset().append(buffer, offset, length);
}
else
{
final BufferBuilder builder = builderBySessionIdMap.get(header.sessionId());
if (null != builder && builder.limit() != 0)
{
builder.append(buffer, offset, length);
if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG)
{
final int msgLength = builder.limit();
delegate.onFragment(builder.buffer(), 0, msgLength, header);
builder.reset();
}
}
}
}
代码示例来源:origin: real-logic/aeron
/**
* The implementation of {@link FragmentHandler} that reassembles and forwards whole messages.
*
* @param buffer containing the data.
* @param offset at which the data begins.
* @param length of the data in bytes.
* @param header representing the meta data for the data.
*/
public void onFragment(final DirectBuffer buffer, final int offset, final int length, final Header header)
{
final byte flags = header.flags();
if ((flags & UNFRAGMENTED) == UNFRAGMENTED)
{
delegate.onFragment(buffer, offset, length, header);
}
else
{
handleFragment(buffer, offset, length, header, flags);
}
}
内容来源于网络,如有侵权,请联系作者删除!