本文整理了Java中org.apache.kafka.clients.producer.Producer.send()
方法的一些代码示例,展示了Producer.send()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.send()
方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:send
[英]See KafkaProducer#send(ProducerRecord)
[中]参见卡夫卡制作人#发送(生产记录)
代码示例来源:origin: QNJR-GROUP/EasyTransaction
public Future<RecordMetadata> publishKafkaMessage(ProducerRecord<String,byte[]> record){
return kafkaProducer.send(record);
}
}
代码示例来源:origin: alibaba/canal
private void produce(String topicName, int partition, FlatMessage flatMessage) throws ExecutionException,
InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
partition,
null,
JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
if (kafkaProperties.getTransaction()) {
producer2.send(record);
} else {
producer2.send(record).get();
}
}
代码示例来源:origin: line/armeria
@Override
public void log(RequestLog log) {
final V value = valueExtractor.apply(log);
if (value == null) {
return;
}
final K key = keyExtractor.apply(log);
final ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, value);
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
logger.warn("Failed to send a record to Kafka: {}", producerRecord, exception);
}
});
}
代码示例来源:origin: jmxtrans/jmxtrans
@Override
protected void internalWrite(Server server, Query query, ImmutableList<Result> results) throws Exception {
for (Result result : results) {
log.debug("Query result: [{}]", result);
String message = resultSerializer.serialize(server, query, result);
for (String topic : this.topics) {
log.debug("Topic: [{}] ; Kafka Message: [{}]", topic, message);
producer.send(new ProducerRecord<String, String>(topic, message));
}
}
}
代码示例来源:origin: line/armeria
@Override
protected void writeLog(RequestLog log, L structuredLog) {
final byte[] key = keySelector.selectKey(log, structuredLog);
final ProducerRecord<byte[], L> producerRecord = new ProducerRecord<>(topic, key, structuredLog);
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
logger.warn("failed to send service log to Kafka {}", producerRecord, exception);
}
});
}
代码示例来源:origin: jmxtrans/jmxtrans
@Override
public void doWrite(Server server, Query query, Iterable<Result> results) throws Exception {
for (Result result : results) {
String message = resultSerializer.serialize(server, query, result);
if (message != null) {
producer.send(new ProducerRecord<String, String>(topic, message));
}
}
}
代码示例来源:origin: apache/incubator-gobblin
@Override
public Future<WriteResponse> write(final D record, final WriteCallback callback) {
return new WriteResponseFuture<>(this.producer.send(new ProducerRecord<String, D>(topic, record), new Callback() {
@Override
public void onCompletion(final RecordMetadata metadata, Exception exception) {
if (exception != null) {
callback.onFailure(exception);
} else {
callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
}
}
}), WRITE_RESPONSE_WRAPPER);
}
代码示例来源:origin: OryxProject/oryx
@Override
public void send(K key, M message) {
getProducer().send(new ProducerRecord<>(topic, key, message));
}
代码示例来源:origin: OryxProject/oryx
@Override
public void send(K key, M message) {
getProducer().send(new ProducerRecord<>(topic, key, message));
}
代码示例来源:origin: apache/incubator-gobblin
@Override
public Future<WriteResponse> write(final D record, final WriteCallback callback) {
return new WriteResponseFuture<>(this.producer.send(new ProducerRecord<String, D>(topic, record), new Callback() {
@Override
public void onCompletion(final RecordMetadata metadata, Exception exception) {
if (exception != null) {
callback.onFailure(exception);
} else {
callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
}
}
}), WRITE_RESPONSE_WRAPPER);
}
代码示例来源:origin: openzipkin/brave
@Benchmark public RecordMetadata send_baseCase() throws Exception {
return producer.send(record).get();
}
代码示例来源:origin: apache/incubator-druid
private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue)
{
ObjectContainer<String> objectToSend;
try {
while (true) {
objectToSend = recordQueue.take();
producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback);
}
}
catch (InterruptedException e) {
log.warn(e, "Failed to take record from queue!");
}
}
代码示例来源:origin: apache/kylin
protected void send(String topic, Record record, Callback callback) {
producer.send(new ProducerRecord<>(topic, record.getKey(), record.getValue()), callback);
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldSendCommandCorrectly() throws Exception {
// When
commandTopic.send(commandId1, command1);
// Then
verify(commandProducer).send(new ProducerRecord<>(COMMAND_TOPIC_NAME, 0, commandId1, command1));
verify(future).get();
}
代码示例来源:origin: confluentinc/ksql
@Before
@SuppressWarnings("unchecked")
public void setup() {
commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer, commandProducer);
when(commandProducer.send(any(ProducerRecord.class))).thenReturn(future);
}
代码示例来源:origin: line/armeria
@Test
public void withKeyExtractor() {
final RequestLog log = mock(RequestLog.class);
when(log.authority()).thenReturn("kawamuray");
when(log.decodedPath()).thenReturn("kyuto");
final KafkaAccessLogWriter<String, String> service =
new KafkaAccessLogWriter<>(producer, TOPIC_NAME,
RequestLog::decodedPath, RequestLog::authority);
service.log(log);
verify(producer, times(1)).send(captor.capture(), any(Callback.class));
final ProducerRecord<String, String> record = captor.getValue();
assertThat(record.key()).isEqualTo("kyuto");
assertThat(record.value()).isEqualTo("kawamuray");
}
代码示例来源:origin: line/armeria
@Test
public void withoutKeyExtractor() {
final RequestLog log = mock(RequestLog.class);
when(log.authority()).thenReturn("kawamuray");
final KafkaAccessLogWriter<String, String> service =
new KafkaAccessLogWriter<>(producer, TOPIC_NAME, RequestLog::authority);
service.log(log);
verify(producer, times(1)).send(captor.capture(), any(Callback.class));
final ProducerRecord<String, String> record = captor.getValue();
assertThat(record.key()).isNull();
assertThat(record.value()).isEqualTo("kawamuray");
}
代码示例来源:origin: line/armeria
@Test
public void testWithKeySelector() {
final KafkaStructuredLoggingServiceExposed service = new KafkaStructuredLoggingServiceExposed(
producer, (res, log) -> log.name.getBytes(), false);
final SimpleStructuredLog log = new SimpleStructuredLog("kawamuray");
service.writeLog(null, log);
verify(producer, times(1)).send(captor.capture(), any(Callback.class));
final ProducerRecord<byte[], SimpleStructuredLog> record = captor.getValue();
assertThat(record.key()).isNotNull();
assertThat(new String(record.key())).isEqualTo(log.name);
assertThat(record.value()).isEqualTo(log);
}
代码示例来源:origin: line/armeria
@Test
public void testServiceWithoutKeySelector() {
final KafkaStructuredLoggingServiceExposed service =
new KafkaStructuredLoggingServiceExposed(producer, null, false);
final SimpleStructuredLog log = new SimpleStructuredLog("kawamuray");
service.writeLog(null, log);
verify(producer, times(1)).send(captor.capture(), any(Callback.class));
final ProducerRecord<byte[], SimpleStructuredLog> record = captor.getValue();
assertThat(record.key()).isNull();
assertThat(record.value()).isEqualTo(log);
}
代码示例来源:origin: apache/kafka
@Test
public void testSendToInvalidTopic() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
Time time = new MockTime();
MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, emptyMap());
Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
metadata.update(initialUpdateResponse, time.milliseconds());
MockClient client = new MockClient(time, metadata);
Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
metadata, client, null, time);
String invalidTopicName = "topic abc"; // Invalid topic name due to space
ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
invalidTopicName, false, Collections.emptyList()));
MetadataResponse updateResponse = new MetadataResponse(
new ArrayList<>(initialUpdateResponse.brokers()),
initialUpdateResponse.clusterId(),
initialUpdateResponse.controller().id(),
topicMetadata);
client.prepareMetadataUpdate(updateResponse);
Future<RecordMetadata> future = producer.send(record);
assertEquals("Cluster has incorrect invalid topic list.", Collections.singleton(invalidTopicName),
metadata.fetch().invalidTopics());
TestUtils.assertFutureError(future, InvalidTopicException.class);
producer.close(Duration.ofMillis(0));
}
内容来源于网络,如有侵权,请联系作者删除!