kafka.message.Message.payload()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.3k)|赞(0)|评价(0)|浏览(206)

本文整理了Java中kafka.message.Message.payload()方法的一些代码示例,展示了Message.payload()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.payload()方法的具体详情如下:
包路径:kafka.message.Message
类名称:Message
方法名:payload

Message.payload介绍

暂无

代码示例

代码示例来源:origin: apache/incubator-pinot

public byte[] getMessageAtIndex(int index) {
 return messageList.get(index).message().payload().array();
}

代码示例来源:origin: prestodb/presto

ByteBuffer message = messageAndOffset.message().payload();
if (message != null) {
  messageData = new byte[message.remaining()];

代码示例来源:origin: apache/incubator-pinot

public int getMessageOffsetAtIndex(int index) {
 return messageList.get(index).message().payload().arrayOffset();
}

代码示例来源:origin: alibaba/jstorm

@SuppressWarnings("unchecked")
public Iterable<List<Object>> generateTuples(Message msg) {
  Iterable<List<Object>> tups = null;
  ByteBuffer payload = msg.payload();
  if (payload == null) {
    return null;
  }
  tups = Arrays.asList(Utils.tuple(Utils.toByteArray(payload)));
  return tups;
}

代码示例来源:origin: apache/incubator-gobblin

@Override
public byte[] getMessageBytes() {
 return getBytes(this.messageAndOffset.message().payload());
}

代码示例来源:origin: apache/incubator-gobblin

private ByteArrayBasedKafkaRecord getMockMessageAndOffset(ByteBuffer payload) {
 MessageAndOffset mockMessageAndOffset = mock(MessageAndOffset.class);
 Message mockMessage = mock(Message.class);
 when(mockMessage.payload()).thenReturn(payload);
 when(mockMessageAndOffset.message()).thenReturn(mockMessage);
 return new Kafka08ConsumerRecord(mockMessageAndOffset);
}

代码示例来源:origin: Graylog2/graylog2-server

final byte[] payloadBytes = ByteBufferUtils.readBytes(messageAndOffset.message().payload());
if (LOG.isTraceEnabled()) {
  final byte[] keyBytes = ByteBufferUtils.readBytes(messageAndOffset.message().key());

代码示例来源:origin: alibaba/jstorm

public EmitState emit(SpoutOutputCollector collector) {
  if (emittingMessages.isEmpty()) {
    fillMessages();
  }
  int count = 0;
  while (true) {
    MessageAndOffset toEmitMsg = emittingMessages.pollFirst();
    if (toEmitMsg == null) {
      return EmitState.EMIT_END;
    }
    count ++;
    Iterable<List<Object>> tups = generateTuples(toEmitMsg.message());
    if (tups != null) {
      for (List<Object> tuple : tups) {
        LOG.debug("emit message {}", new String(Utils.toByteArray(toEmitMsg.message().payload())));
        collector.emit(tuple, new KafkaMessageId(partition, toEmitMsg.offset()));
      }
      if(count>=config.batchSendCount) {
        break;
      }
    } else {
      ack(toEmitMsg.offset());
    }
  }
  if (emittingMessages.isEmpty()) {
    return EmitState.EMIT_END;
  } else {
    return EmitState.EMIT_MORE;
  }
}

代码示例来源:origin: apache/flink

if (running) {
  messagesInFetch++;
  final ByteBuffer payload = msg.message().payload();
  final long offset = msg.offset();

代码示例来源:origin: pinterest/secor

ByteBuffer payload = messageAndOffset.message().payload();
payloadBytes = new byte[payload.limit()];
payload.get(payloadBytes);

代码示例来源:origin: edwardcapriolo/IronCount

public static String getMessage(Message message) {
  ByteBuffer buffer = message.payload();
  byte[] bytes = new byte[buffer.remaining()];
  buffer.get(bytes);
  return new String(bytes);
 }
}

代码示例来源:origin: linkedin/camus

/**
 * Fetches the next Kafka message and stuffs the results into the key and
 * value
 *
 * @param etlKey
 * @return true if there exists more events
 * @throws IOException
 */
public KafkaMessage getNext(EtlKey etlKey) throws IOException {
 if (hasNext()) {
  MessageAndOffset msgAndOffset = messageIter.next();
  Message message = msgAndOffset.message();
  byte[] payload = getBytes(message.payload());
  byte[] key = getBytes(message.key());
  if (payload == null) {
   log.warn("Received message with null message.payload(): " + msgAndOffset);
  }
  etlKey.clear();
  etlKey.set(kafkaRequest.getTopic(), kafkaRequest.getLeaderId(), kafkaRequest.getPartition(), currentOffset,
    msgAndOffset.offset() + 1, message.checksum());
  etlKey.setMessageSize(msgAndOffset.message().size());
  currentOffset = msgAndOffset.offset() + 1; // increase offset
  currentCount++; // increase count
  return new KafkaMessage(payload, key, kafkaRequest.getTopic(), kafkaRequest.getPartition(),
    msgAndOffset.offset(), message.checksum());
 } else {
  return null;
 }
}

代码示例来源:origin: org.apache.storm/storm-kafka

public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) {
  ByteBuffer payload = msg.payload();
  if (payload == null) {
    return null;
  }
  return scheme.deserializeMessageWithMetadata(payload, partition, offset);
}

代码示例来源:origin: fjfd/microscope

private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
  for (MessageAndOffset messageAndOffset : messageSet) {
    ByteBuffer payload = messageAndOffset.message().payload();
    byte[] bytes = new byte[payload.limit()];
    payload.get(bytes);
    System.out.println(new String(bytes, "UTF-8"));
  }
}

代码示例来源:origin: jetoile/hadoop-unit

private void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
  for (MessageAndOffset messageAndOffset : messageSet) {
    ByteBuffer payload = messageAndOffset.message().payload();
    byte[] bytes = new byte[payload.limit()];
    payload.get(bytes);
    LOG.debug(new String(bytes, "UTF-8"));
  }
}

代码示例来源:origin: com.butor/butor-kafka

protected String mesasgeAndOffsetToString(MessageAndOffset messageAndOffset) {
   ByteBuffer payload = messageAndOffset.message().payload();
   byte[] bytes = new byte[payload.limit()];
   payload.get(bytes);
   return messageDecoder.fromBytes(bytes).toString();
}

代码示例来源:origin: Allianzcortex/code_collection

@SuppressWarnings("unchecked")
public Iterable<List<Object>> generateTuples(Message msg) {
  Iterable<List<Object>> tups = null;
  ByteBuffer payload = msg.payload();
  if (payload == null) {
    return null;
  }
  tups = Arrays.asList(Utils.tuple(Utils.toByteArray(payload)));
  return tups;
}

代码示例来源:origin: org.apache.gobblin/gobblin-kafka-08

@Override
public byte[] getMessageBytes() {
 return getBytes(this.messageAndOffset.message().payload());
}

代码示例来源:origin: apache/apex-malhar

public String getMessage(Message message)
{
 ByteBuffer buffer = message.payload();
 byte[] bytes = new byte[buffer.remaining()];
 buffer.get(bytes);
 return new String(bytes);
}

代码示例来源:origin: wurstmeister/storm-kafka-0.8-plus

public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
  Iterable<List<Object>> tups;
  ByteBuffer payload = msg.payload();
  ByteBuffer key = msg.key();
  if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
    tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
  } else {
    tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
  }
  return tups;
}

相关文章