本文整理了Java中kafka.javaapi.producer.Producer.send()
方法的一些代码示例,展示了Producer.send()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.send()
方法的具体详情如下:
包路径:kafka.javaapi.producer.Producer
类名称:Producer
方法名:send
暂无
代码示例来源:origin: apache/incubator-pinot
private void publish(JsonNode message)
throws IOException {
if (!keepIndexing) {
avroDataStream.close();
avroDataStream = null;
return;
}
KeyedMessage<String, byte[]> data =
new KeyedMessage<String, byte[]>("airlineStatsEvents", message.toString().getBytes("UTF-8"));
producer.send(data);
}
代码示例来源:origin: apache/incubator-pinot
bytes);
producer.send(data);
代码示例来源:origin: prestodb/presto
@Override
public void addResults(QueryStatusInfo statusInfo, QueryData data)
{
if (types.get() == null && statusInfo.getColumns() != null) {
types.set(getTypes(statusInfo.getColumns()));
}
if (data.getData() != null) {
checkState(types.get() != null, "Data without types received!");
List<Column> columns = statusInfo.getColumns();
for (List<Object> fields : data.getData()) {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
for (int i = 0; i < fields.size(); i++) {
Type type = types.get().get(i);
Object value = convertValue(fields.get(i), type);
if (value != null) {
builder.put(columns.get(i).getName(), value);
}
}
producer.send(new KeyedMessage<>(topicName, count.getAndIncrement(), builder.build()));
}
}
}
代码示例来源:origin: apache/incubator-pinot
producer.send(messagesToWrite);
messagesToWrite.clear();
producer.send(messagesToWrite);
代码示例来源:origin: apache/incubator-pinot
producer.send(messagesToWrite);
messagesToWrite.clear();
producer.send(messagesToWrite);
代码示例来源:origin: org.apache.kafka/kafka_2.9.2
public void run() {
try{
while(true) {
KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
if(!data.equals(shutdownMessage)) {
producer.send(data);
if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message())));
}
else
break;
}
logger.info("Producer thread " + threadName + " finished running");
} catch (Throwable t){
logger.fatal("Producer thread failure due to ", t);
} finally {
shutdownComplete.countDown();
}
}
代码示例来源:origin: linkedin/camus
try {
KeyedMessage keyedMessage = new KeyedMessage("TrackingMonitoringEvent", message);
producer.send(keyedMessage);
break;
} catch (Exception e) {
代码示例来源:origin: rakam-io/rakam
@Override
public void store(Event event) {
GenericDatumWriter writer = new SourceFilteredRecordWriter(event.properties().getSchema(), GenericData.get(), sourceFields);
ByteBuf buffer = Unpooled.buffer(100);
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(
new ByteBufOutputStream(buffer), null);
try {
writer.write(event.properties(), encoder);
} catch (Exception e) {
throw new RuntimeException("Couldn't serialize event", e);
}
try {
producer.send(new KeyedMessage<>(event.project() + "_" + event.collection(), buffer.array()));
} catch (FailedToSendMessageException e) {
throw new RuntimeException("Couldn't send event to Kafka", e);
}
}
代码示例来源:origin: linkedin/camus
private static List<Message> writeKafka(String topic, int numOfMessages) {
List<Message> messages = new ArrayList<Message>();
List<KeyedMessage<String, String>> kafkaMessages = new ArrayList<KeyedMessage<String, String>>();
for (int i = 0; i < numOfMessages; i++) {
Message msg = new Message(RANDOM.nextInt());
messages.add(msg);
kafkaMessages.add(new KeyedMessage<String, String>(topic, Integer.toString(i), gson.toJson(msg)));
}
Properties producerProps = cluster.getProps();
producerProps.setProperty("serializer.class", StringEncoder.class.getName());
producerProps.setProperty("key.serializer.class", StringEncoder.class.getName());
Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(producerProps));
try {
producer.send(kafkaMessages);
} finally {
producer.close();
}
return messages;
}
代码示例来源:origin: linkedin/camus
private Producer mockProducerThirdSendSucceed() {
Producer mockProducer = EasyMock.createMock(Producer.class);
mockProducer.send((KeyedMessage) EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).times(2);
mockProducer.send((KeyedMessage) EasyMock.anyObject());
EasyMock.expectLastCall().times(1);
mockProducer.close();
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(mockProducer);
return mockProducer;
}
代码示例来源:origin: linkedin/camus
private Producer mockProducerSendThrowsException() {
Producer mockProducer = EasyMock.createMock(Producer.class);
mockProducer.send((KeyedMessage) EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).anyTimes();
mockProducer.close();
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(mockProducer);
return mockProducer;
}
代码示例来源:origin: caskdata/cdap
void publish(List<KeyedMessage<String, byte[]>> messages) {
// Clear the interrupt flag, otherwise it won't be able to publish
boolean threadInterrupted = Thread.interrupted();
try {
producer.send(messages);
} finally {
// Reset the interrupt flag if needed
if (threadInterrupted) {
Thread.currentThread().interrupt();
}
}
}
代码示例来源:origin: apache/eagle
@Override
protected void execute(Object key, Map event, OutputCollector collector) throws Exception {
try {
String output = new ObjectMapper().writeValueAsString(event);
// partition key may cause data skew
//producer.send(new KeyedMessage(this.topicId, key, output));
producer.send(new KeyedMessage(this.topicId, output));
} catch (Exception ex) {
LOG.error(ex.getMessage(), ex);
throw ex;
}
}
代码示例来源:origin: spotify/ffwd
@Override
public Void call() throws Exception {
producer.send(eventConverter.toMessage(event));
return null;
}
}, executorService);
代码示例来源:origin: hengyunabc/metrics-kafka
@Override
public void run() {
try {
KeyedMessage<String, String> message = new KeyedMessage<String, String>(
topic, "" + count++, mapper.writeValueAsString(result));
producer.send(message);
} catch (Exception e) {
logger.error("send metrics to kafka error!", e);
}
}
});
代码示例来源:origin: ottogroup/SPQR
public void run() {
try {
kafkaProducer.send(new KeyedMessage<byte[], byte[]>(topicId, jsonMapper.writeValueAsBytes(registry)));
} catch (JsonProcessingException e) {
logger.error("Failed to send message to kafka [topic="+topicId+"]. Reason: " + e.getMessage(), e);
}
}
});
代码示例来源:origin: com.github.hackerwin7/jlib-utils
/**
* sendKafkaMsg batch
* @param msgs
*/
public void sendKafkaMsg(List<KafkaMsg> msgs) {
if(msgs.size() > 0)
producer.send(toKeyedMessage(msgs));
}
代码示例来源:origin: gwenshap/kafka-examples
@Override
public void produce(String s) {
KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic, null, s);
producer.send(message);
}
代码示例来源:origin: Stratio/Decision
@Override
public void process(Iterable<StratioStreamingMessage> messages) throws Exception {
List<KeyedMessage<String, String>> kafkaMessages = new ArrayList<>();
for (StratioStreamingMessage message : messages) {
kafkaMessages.add(new KeyedMessage<String, String>(message.getStreamName(), getSerializer().deserialize(
message)));
}
getProducer().send(kafkaMessages);
}
代码示例来源:origin: olacabs/fabric
@Before
public void setUp() throws InitializationException {
processor = spy(new KafkaWriter());
producer = mock(Producer.class);
processor.setTopicOnJsonPath(true);
processor.setKafkaTopic("/default");
processor.setIngestionPoolSize(10);
doReturn(producer).when(processor).getProducer();
doNothing().when(producer).send(anyList());
}
内容来源于网络,如有侵权,请联系作者删除!