本文整理了Java中kafka.message.Message.payload()
方法的一些代码示例,展示了Message.payload()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.payload()
方法的具体详情如下:
包路径:kafka.message.Message
类名称:Message
方法名:payload
暂无
代码示例来源:origin: apache/incubator-pinot
public byte[] getMessageAtIndex(int index) {
return messageList.get(index).message().payload().array();
}
代码示例来源:origin: prestodb/presto
ByteBuffer message = messageAndOffset.message().payload();
if (message != null) {
messageData = new byte[message.remaining()];
代码示例来源:origin: apache/incubator-pinot
public int getMessageOffsetAtIndex(int index) {
return messageList.get(index).message().payload().arrayOffset();
}
代码示例来源:origin: alibaba/jstorm
@SuppressWarnings("unchecked")
public Iterable<List<Object>> generateTuples(Message msg) {
Iterable<List<Object>> tups = null;
ByteBuffer payload = msg.payload();
if (payload == null) {
return null;
}
tups = Arrays.asList(Utils.tuple(Utils.toByteArray(payload)));
return tups;
}
代码示例来源:origin: apache/incubator-gobblin
@Override
public byte[] getMessageBytes() {
return getBytes(this.messageAndOffset.message().payload());
}
代码示例来源:origin: apache/incubator-gobblin
private ByteArrayBasedKafkaRecord getMockMessageAndOffset(ByteBuffer payload) {
MessageAndOffset mockMessageAndOffset = mock(MessageAndOffset.class);
Message mockMessage = mock(Message.class);
when(mockMessage.payload()).thenReturn(payload);
when(mockMessageAndOffset.message()).thenReturn(mockMessage);
return new Kafka08ConsumerRecord(mockMessageAndOffset);
}
代码示例来源:origin: Graylog2/graylog2-server
final byte[] payloadBytes = ByteBufferUtils.readBytes(messageAndOffset.message().payload());
if (LOG.isTraceEnabled()) {
final byte[] keyBytes = ByteBufferUtils.readBytes(messageAndOffset.message().key());
代码示例来源:origin: alibaba/jstorm
public EmitState emit(SpoutOutputCollector collector) {
if (emittingMessages.isEmpty()) {
fillMessages();
}
int count = 0;
while (true) {
MessageAndOffset toEmitMsg = emittingMessages.pollFirst();
if (toEmitMsg == null) {
return EmitState.EMIT_END;
}
count ++;
Iterable<List<Object>> tups = generateTuples(toEmitMsg.message());
if (tups != null) {
for (List<Object> tuple : tups) {
LOG.debug("emit message {}", new String(Utils.toByteArray(toEmitMsg.message().payload())));
collector.emit(tuple, new KafkaMessageId(partition, toEmitMsg.offset()));
}
if(count>=config.batchSendCount) {
break;
}
} else {
ack(toEmitMsg.offset());
}
}
if (emittingMessages.isEmpty()) {
return EmitState.EMIT_END;
} else {
return EmitState.EMIT_MORE;
}
}
代码示例来源:origin: apache/flink
if (running) {
messagesInFetch++;
final ByteBuffer payload = msg.message().payload();
final long offset = msg.offset();
代码示例来源:origin: pinterest/secor
ByteBuffer payload = messageAndOffset.message().payload();
payloadBytes = new byte[payload.limit()];
payload.get(payloadBytes);
代码示例来源:origin: edwardcapriolo/IronCount
public static String getMessage(Message message) {
ByteBuffer buffer = message.payload();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
}
}
代码示例来源:origin: linkedin/camus
/**
* Fetches the next Kafka message and stuffs the results into the key and
* value
*
* @param etlKey
* @return true if there exists more events
* @throws IOException
*/
public KafkaMessage getNext(EtlKey etlKey) throws IOException {
if (hasNext()) {
MessageAndOffset msgAndOffset = messageIter.next();
Message message = msgAndOffset.message();
byte[] payload = getBytes(message.payload());
byte[] key = getBytes(message.key());
if (payload == null) {
log.warn("Received message with null message.payload(): " + msgAndOffset);
}
etlKey.clear();
etlKey.set(kafkaRequest.getTopic(), kafkaRequest.getLeaderId(), kafkaRequest.getPartition(), currentOffset,
msgAndOffset.offset() + 1, message.checksum());
etlKey.setMessageSize(msgAndOffset.message().size());
currentOffset = msgAndOffset.offset() + 1; // increase offset
currentCount++; // increase count
return new KafkaMessage(payload, key, kafkaRequest.getTopic(), kafkaRequest.getPartition(),
msgAndOffset.offset(), message.checksum());
} else {
return null;
}
}
代码示例来源:origin: org.apache.storm/storm-kafka
public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) {
ByteBuffer payload = msg.payload();
if (payload == null) {
return null;
}
return scheme.deserializeMessageWithMetadata(payload, partition, offset);
}
代码示例来源:origin: fjfd/microscope
private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
for (MessageAndOffset messageAndOffset : messageSet) {
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(new String(bytes, "UTF-8"));
}
}
代码示例来源:origin: jetoile/hadoop-unit
private void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
for (MessageAndOffset messageAndOffset : messageSet) {
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
LOG.debug(new String(bytes, "UTF-8"));
}
}
代码示例来源:origin: com.butor/butor-kafka
protected String mesasgeAndOffsetToString(MessageAndOffset messageAndOffset) {
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
return messageDecoder.fromBytes(bytes).toString();
}
代码示例来源:origin: Allianzcortex/code_collection
@SuppressWarnings("unchecked")
public Iterable<List<Object>> generateTuples(Message msg) {
Iterable<List<Object>> tups = null;
ByteBuffer payload = msg.payload();
if (payload == null) {
return null;
}
tups = Arrays.asList(Utils.tuple(Utils.toByteArray(payload)));
return tups;
}
代码示例来源:origin: org.apache.gobblin/gobblin-kafka-08
@Override
public byte[] getMessageBytes() {
return getBytes(this.messageAndOffset.message().payload());
}
代码示例来源:origin: apache/apex-malhar
public String getMessage(Message message)
{
ByteBuffer buffer = message.payload();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
}
代码示例来源:origin: wurstmeister/storm-kafka-0.8-plus
public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
Iterable<List<Object>> tups;
ByteBuffer payload = msg.payload();
ByteBuffer key = msg.key();
if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
} else {
tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
}
return tups;
}
内容来源于网络,如有侵权,请联系作者删除!