kafka.javaapi.producer.Producer.send()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(7.3k)|赞(0)|评价(0)|浏览(225)

本文整理了Java中kafka.javaapi.producer.Producer.send()方法的一些代码示例,展示了Producer.send()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.send()方法的具体详情如下:
包路径:kafka.javaapi.producer.Producer
类名称:Producer
方法名:send

Producer.send介绍

暂无

代码示例

代码示例来源:origin: apache/incubator-pinot

private void publish(JsonNode message)
  throws IOException {
 if (!keepIndexing) {
  avroDataStream.close();
  avroDataStream = null;
  return;
 }
 KeyedMessage<String, byte[]> data =
   new KeyedMessage<String, byte[]>("airlineStatsEvents", message.toString().getBytes("UTF-8"));
 producer.send(data);
}

代码示例来源:origin: apache/incubator-pinot

bytes);
producer.send(data);

代码示例来源:origin: prestodb/presto

@Override
public void addResults(QueryStatusInfo statusInfo, QueryData data)
{
  if (types.get() == null && statusInfo.getColumns() != null) {
    types.set(getTypes(statusInfo.getColumns()));
  }
  if (data.getData() != null) {
    checkState(types.get() != null, "Data without types received!");
    List<Column> columns = statusInfo.getColumns();
    for (List<Object> fields : data.getData()) {
      ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
      for (int i = 0; i < fields.size(); i++) {
        Type type = types.get().get(i);
        Object value = convertValue(fields.get(i), type);
        if (value != null) {
          builder.put(columns.get(i).getName(), value);
        }
      }
      producer.send(new KeyedMessage<>(topicName, count.getAndIncrement(), builder.build()));
    }
  }
}

代码示例来源:origin: apache/incubator-pinot

producer.send(messagesToWrite);
  messagesToWrite.clear();
producer.send(messagesToWrite);

代码示例来源:origin: apache/incubator-pinot

producer.send(messagesToWrite);
  messagesToWrite.clear();
producer.send(messagesToWrite);

代码示例来源:origin: org.apache.kafka/kafka_2.9.2

public void run() {
 try{
  while(true) {
   KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
   if(!data.equals(shutdownMessage)) {
    producer.send(data);
    if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message())));
   }
   else
    break;
  }
  logger.info("Producer thread " + threadName + " finished running");
 } catch (Throwable t){
  logger.fatal("Producer thread failure due to ", t);
 } finally {
  shutdownComplete.countDown();
 }
}

代码示例来源:origin: linkedin/camus

try {
 KeyedMessage keyedMessage = new KeyedMessage("TrackingMonitoringEvent", message);
 producer.send(keyedMessage);
 break;
} catch (Exception e) {

代码示例来源:origin: rakam-io/rakam

@Override
public void store(Event event) {
  GenericDatumWriter writer = new SourceFilteredRecordWriter(event.properties().getSchema(), GenericData.get(), sourceFields);
  ByteBuf buffer = Unpooled.buffer(100);
  BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(
      new ByteBufOutputStream(buffer), null);
  try {
    writer.write(event.properties(), encoder);
  } catch (Exception e) {
    throw new RuntimeException("Couldn't serialize event", e);
  }
  try {
    producer.send(new KeyedMessage<>(event.project() + "_" + event.collection(), buffer.array()));
  } catch (FailedToSendMessageException e) {
    throw new RuntimeException("Couldn't send event to Kafka", e);
  }
}

代码示例来源:origin: linkedin/camus

private static List<Message> writeKafka(String topic, int numOfMessages) {
 List<Message> messages = new ArrayList<Message>();
 List<KeyedMessage<String, String>> kafkaMessages = new ArrayList<KeyedMessage<String, String>>();
 for (int i = 0; i < numOfMessages; i++) {
  Message msg = new Message(RANDOM.nextInt());
  messages.add(msg);
  kafkaMessages.add(new KeyedMessage<String, String>(topic, Integer.toString(i), gson.toJson(msg)));
 }
 Properties producerProps = cluster.getProps();
 producerProps.setProperty("serializer.class", StringEncoder.class.getName());
 producerProps.setProperty("key.serializer.class", StringEncoder.class.getName());
 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(producerProps));
 try {
  producer.send(kafkaMessages);
 } finally {
  producer.close();
 }
 return messages;
}

代码示例来源:origin: linkedin/camus

private Producer mockProducerThirdSendSucceed() {
 Producer mockProducer = EasyMock.createMock(Producer.class);
 mockProducer.send((KeyedMessage) EasyMock.anyObject());
 EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).times(2);
 mockProducer.send((KeyedMessage) EasyMock.anyObject());
 EasyMock.expectLastCall().times(1);
 mockProducer.close();
 EasyMock.expectLastCall().anyTimes();
 EasyMock.replay(mockProducer);
 return mockProducer;
}

代码示例来源:origin: linkedin/camus

private Producer mockProducerSendThrowsException() {
 Producer mockProducer = EasyMock.createMock(Producer.class);
 mockProducer.send((KeyedMessage) EasyMock.anyObject());
 EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).anyTimes();
 mockProducer.close();
 EasyMock.expectLastCall().anyTimes();
 EasyMock.replay(mockProducer);
 return mockProducer;
}

代码示例来源:origin: caskdata/cdap

void publish(List<KeyedMessage<String, byte[]>> messages) {
 // Clear the interrupt flag, otherwise it won't be able to publish
 boolean threadInterrupted = Thread.interrupted();
 try {
  producer.send(messages);
 } finally {
  // Reset the interrupt flag if needed
  if (threadInterrupted) {
   Thread.currentThread().interrupt();
  }
 }
}

代码示例来源:origin: apache/eagle

@Override
protected void execute(Object key, Map event, OutputCollector collector) throws Exception {
  try {
    String output = new ObjectMapper().writeValueAsString(event);
    // partition key may cause data skew
    //producer.send(new KeyedMessage(this.topicId, key, output));
    producer.send(new KeyedMessage(this.topicId, output));
  } catch (Exception ex) {
    LOG.error(ex.getMessage(), ex);
    throw ex;
  }
}

代码示例来源:origin: spotify/ffwd

@Override
  public Void call() throws Exception {
    producer.send(eventConverter.toMessage(event));
    return null;
  }
}, executorService);

代码示例来源:origin: hengyunabc/metrics-kafka

@Override
  public void run() {
    try {
    KeyedMessage<String, String> message = new KeyedMessage<String, String>(
        topic, "" + count++, mapper.writeValueAsString(result));
      producer.send(message);
    } catch (Exception e) {
      logger.error("send metrics to kafka error!", e);
    }
  }
});

代码示例来源:origin: ottogroup/SPQR

public void run() {
    try {
      kafkaProducer.send(new KeyedMessage<byte[], byte[]>(topicId, jsonMapper.writeValueAsBytes(registry)));
    } catch (JsonProcessingException e) {
      logger.error("Failed to send message to kafka [topic="+topicId+"]. Reason: " + e.getMessage(), e);
    }
  }
});

代码示例来源:origin: com.github.hackerwin7/jlib-utils

/**
 * sendKafkaMsg batch
 * @param msgs
 */
public void sendKafkaMsg(List<KafkaMsg> msgs) {
  if(msgs.size() > 0)
    producer.send(toKeyedMessage(msgs));
}

代码示例来源:origin: gwenshap/kafka-examples

@Override
public void produce(String s) {
  KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic, null, s);
  producer.send(message);
}

代码示例来源:origin: Stratio/Decision

@Override
public void process(Iterable<StratioStreamingMessage> messages) throws Exception {
  List<KeyedMessage<String, String>> kafkaMessages = new ArrayList<>();
  for (StratioStreamingMessage message : messages) {
    kafkaMessages.add(new KeyedMessage<String, String>(message.getStreamName(), getSerializer().deserialize(
        message)));
  }
  getProducer().send(kafkaMessages);
}

代码示例来源:origin: olacabs/fabric

@Before
public void setUp() throws InitializationException {
  processor = spy(new KafkaWriter());
  producer = mock(Producer.class);
  processor.setTopicOnJsonPath(true);
  processor.setKafkaTopic("/default");
  processor.setIngestionPoolSize(10);
  doReturn(producer).when(processor).getProducer();
  doNothing().when(producer).send(anyList());
}

相关文章