kafka.message.Message.key()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(219)

本文整理了Java中kafka.message.Message.key()方法的一些代码示例,展示了Message.key()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.key()方法的具体详情如下:
包路径:kafka.message.Message
类名称:Message
方法名:key

Message.key介绍

暂无

代码示例

代码示例来源:origin: apache/incubator-gobblin

@Override
public byte[] getKeyBytes() {
 return getBytes(this.messageAndOffset.message().key());
}

代码示例来源:origin: apache/flink

ByteBuffer keyPayload = msg.message().key();
keyBytes = new byte[keySize];
keyPayload.get(keyBytes);

代码示例来源:origin: prestodb/presto

ByteBuffer key = messageAndOffset.message().key();
if (key != null) {
  keyData = new byte[key.remaining()];

代码示例来源:origin: Graylog2/graylog2-server

final byte[] keyBytes = ByteBufferUtils.readBytes(messageAndOffset.message().key());
LOG.trace("Read message {} contains {}", bytesToHex(keyBytes), bytesToHex(payloadBytes));

代码示例来源:origin: pinterest/secor

byte[] keyBytes = null;
if (messageAndOffset.message().hasKey()) {
  ByteBuffer key = messageAndOffset.message().key();
  keyBytes = new byte[key.limit()];
  key.get(keyBytes);

代码示例来源:origin: linkedin/camus

/**
 * Fetches the next Kafka message and stuffs the results into the key and
 * value
 *
 * @param etlKey
 * @return true if there exists more events
 * @throws IOException
 */
public KafkaMessage getNext(EtlKey etlKey) throws IOException {
 if (hasNext()) {
  MessageAndOffset msgAndOffset = messageIter.next();
  Message message = msgAndOffset.message();
  byte[] payload = getBytes(message.payload());
  byte[] key = getBytes(message.key());
  if (payload == null) {
   log.warn("Received message with null message.payload(): " + msgAndOffset);
  }
  etlKey.clear();
  etlKey.set(kafkaRequest.getTopic(), kafkaRequest.getLeaderId(), kafkaRequest.getPartition(), currentOffset,
    msgAndOffset.offset() + 1, message.checksum());
  etlKey.setMessageSize(msgAndOffset.message().size());
  currentOffset = msgAndOffset.offset() + 1; // increase offset
  currentCount++; // increase count
  return new KafkaMessage(payload, key, kafkaRequest.getTopic(), kafkaRequest.getPartition(),
    msgAndOffset.offset(), message.checksum());
 } else {
  return null;
 }
}

代码示例来源:origin: com.linkedin.gobblin/gobblin-kafka-08

@Override
public byte[] getKeyBytes() {
 return getBytes(this.messageAndOffset.message().key());
}

代码示例来源:origin: org.apache.gobblin/gobblin-kafka-08

@Override
public byte[] getKeyBytes() {
 return getBytes(this.messageAndOffset.message().key());
}

代码示例来源:origin: org.apache.storm/storm-kafka

public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, String topic) {
  Iterable<List<Object>> tups;
  ByteBuffer payload = msg.payload();
  if (payload == null) {
    return null;
  }
  ByteBuffer key = msg.key();
  if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
    tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(key, payload);
  } else {
    if (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic) {
      tups = ((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, payload);
    } else {
      tups = kafkaConfig.scheme.deserialize(payload);
    }
  }
  return tups;
}

代码示例来源:origin: org.graylog2/graylog2-server

final byte[] keyBytes = ByteBufferUtils.readBytes(messageAndOffset.message().key());
LOG.trace("Read message {} contains {}", bytesToHex(keyBytes), bytesToHex(payloadBytes));

代码示例来源:origin: wurstmeister/storm-kafka-0.8-plus

public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
  Iterable<List<Object>> tups;
  ByteBuffer payload = msg.payload();
  ByteBuffer key = msg.key();
  if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
    tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
  } else {
    tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
  }
  return tups;
}

代码示例来源:origin: org.graylog2/graylog2-shared

final byte[] keyBytes = Utils.readBytes(messageAndOffset.message().key());
LOG.trace("Read message {} contains {}", bytesToHex(keyBytes), bytesToHex(payloadBytes));

代码示例来源:origin: prestosql/presto

ByteBuffer key = messageAndOffset.message().key();
if (key != null) {
  keyData = new byte[key.remaining()];

代码示例来源:origin: apache/apex-malhar

private void initializeLastProcessingOffset()
{
 // read last received kafka message
 TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)), this.getTopic());
 if (tm == null) {
  throw new RuntimeException("Failed to retrieve topic metadata");
 }
 partitionNum = tm.partitionsMetadata().size();
 lastMsgs = new HashMap<Integer, Pair<byte[],byte[]>>(partitionNum);
 for (PartitionMetadata pm : tm.partitionsMetadata()) {
  String leadBroker = pm.leader().host();
  int port = pm.leader().port();
  String clientName = this.getClass().getName().replace('$', '.') + "_Client_" + tm.topic() + "_" + pm.partitionId();
  SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
  long readOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(), kafka.api.OffsetRequest.LatestTime(), clientName);
  FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(tm.topic(), pm.partitionId(), readOffset - 1, 100000).build();
  FetchResponse fetchResponse = consumer.fetch(req);
  for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(tm.topic(), pm.partitionId())) {
   Message m = messageAndOffset.message();
   ByteBuffer payload = m.payload();
   ByteBuffer key = m.key();
   byte[] valueBytes = new byte[payload.limit()];
   byte[] keyBytes = new byte[key.limit()];
   payload.get(valueBytes);
   key.get(keyBytes);
   lastMsgs.put(pm.partitionId(), new Pair<byte[], byte[]>(keyBytes, valueBytes));
  }
 }
}

代码示例来源:origin: org.apache.apex/malhar-contrib

private void initializeLastProcessingOffset()
{
 // read last received kafka message
 TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)), this.getTopic());
 if (tm == null) {
  throw new RuntimeException("Failed to retrieve topic metadata");
 }
 partitionNum = tm.partitionsMetadata().size();
 lastMsgs = new HashMap<Integer, Pair<byte[],byte[]>>(partitionNum);
 for (PartitionMetadata pm : tm.partitionsMetadata()) {
  String leadBroker = pm.leader().host();
  int port = pm.leader().port();
  String clientName = this.getClass().getName().replace('$', '.') + "_Client_" + tm.topic() + "_" + pm.partitionId();
  SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
  long readOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(), kafka.api.OffsetRequest.LatestTime(), clientName);
  FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(tm.topic(), pm.partitionId(), readOffset - 1, 100000).build();
  FetchResponse fetchResponse = consumer.fetch(req);
  for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(tm.topic(), pm.partitionId())) {
   Message m = messageAndOffset.message();
   ByteBuffer payload = m.payload();
   ByteBuffer key = m.key();
   byte[] valueBytes = new byte[payload.limit()];
   byte[] keyBytes = new byte[key.limit()];
   payload.get(valueBytes);
   key.get(keyBytes);
   lastMsgs.put(pm.partitionId(), new Pair<byte[], byte[]>(keyBytes, valueBytes));
  }
 }
}

代码示例来源:origin: HiveKa/HiveKa

payload.set(bytes, 0, origSize);
buf = message.key();
if(buf != null){
 origSize = buf.remaining();

代码示例来源:origin: com.github.hackerwin7/jlib-utils

ByteBuffer keyBuffer = messageAndOffset.message().key();
byte[] keyBytes = new byte[keyBuffer.limit()];
keyBuffer.get(keyBytes);

代码示例来源:origin: com.ebay.jetstream/jetstream-messaging

ByteBuffer k = message.key();
ByteBuffer p = message.payload();
byte[] key = new byte[k.limit()];

代码示例来源:origin: com.linkedin.camus/camus-etl-kafka

/**
 * Fetches the next Kafka message and stuffs the results into the key and
 * value
 *
 * @param etlKey
 * @return true if there exists more events
 * @throws IOException
 */
public KafkaMessage getNext(EtlKey etlKey) throws IOException {
 if (hasNext()) {
  MessageAndOffset msgAndOffset = messageIter.next();
  Message message = msgAndOffset.message();
  byte[] payload = getBytes(message.payload());
  byte[] key = getBytes(message.key());
  if (payload == null) {
   log.warn("Received message with null message.payload(): " + msgAndOffset);
  }
  etlKey.clear();
  etlKey.set(kafkaRequest.getTopic(), kafkaRequest.getLeaderId(), kafkaRequest.getPartition(), currentOffset,
    msgAndOffset.offset() + 1, message.checksum());
  etlKey.setMessageSize(msgAndOffset.message().size());
  currentOffset = msgAndOffset.offset() + 1; // increase offset
  currentCount++; // increase count
  return new KafkaMessage(payload, key, kafkaRequest.getTopic(), kafkaRequest.getPartition(),
    msgAndOffset.offset(), message.checksum());
 } else {
  return null;
 }
}

代码示例来源:origin: HomeAdvisor/Kafdrop

private MessageVO createMessage(Message message, MessageDeserializer deserializer)
{
 MessageVO vo = new MessageVO();
 if (message.hasKey())
 {
   vo.setKey(ByteUtils.readString(message.key()));
 }
 if (!message.isNull())
 {
   final String messageString = deserializer.deserializeMessage(message.payload());
   vo.setMessage(messageString);
 }
 vo.setValid(message.isValid());
 vo.setCompressionCodec(message.compressionCodec().name());
 vo.setChecksum(message.checksum());
 vo.setComputedChecksum(message.computeChecksum());
 return vo;
}

相关文章