org.apache.kafka.clients.producer.Producer.flush()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(223)

本文整理了Java中org.apache.kafka.clients.producer.Producer.flush()方法的一些代码示例,展示了Producer.flush()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.flush()方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:flush

Producer.flush介绍

[英]See KafkaProducer#flush()
[中]参见《卡夫卡制作人》#flush()

代码示例

代码示例来源:origin: apache/incubator-druid

@Override
public void flush()
{
 producer.flush();
}

代码示例来源:origin: apache/incubator-gobblin

@Override
public void flush()
  throws IOException {
  this.producer.flush();
}

代码示例来源:origin: openzipkin/brave

@Override public void flush() {
 delegate.flush();
}

代码示例来源:origin: uber-common/jvm-profiler

@Override
public void close() {
  synchronized (this) {
    if (producer == null) {
      return;
    }
    producer.flush();
    producer.close();
    producer = null;
  }
}

代码示例来源:origin: uber-common/jvm-profiler

@Override
public void report(String profilerName, Map<String, Object> metrics) {
  ensureProducer();
  String topicName = getTopic(profilerName);
  
  String str = JsonUtils.serialize(metrics);
  byte[] message = str.getBytes(StandardCharsets.UTF_8);
  Future<RecordMetadata> future = producer.send(
      new ProducerRecord<String, byte[]>(topicName, message));
  if (syncMode) {
    producer.flush();
    try {
      future.get();
    } catch (InterruptedException | ExecutionException e) {
      throw new RuntimeException(e);
    }
  }
}

代码示例来源:origin: apache/nifi

public PublishResult complete() {
  if (tracker == null) {
    if (messagesSent.get() == 0L) {
      return PublishResult.EMPTY;
    }
    throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  }
  producer.flush();
  try {
    tracker.awaitCompletion(maxAckWaitMillis);
    return tracker.createPublishResult();
  } catch (final InterruptedException e) {
    logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    Thread.currentThread().interrupt();
    return tracker.failOutstanding(e);
  } catch (final TimeoutException e) {
    logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    return tracker.failOutstanding(e);
  } finally {
    tracker = null;
  }
}

代码示例来源:origin: apache/nifi

public PublishResult complete() {
  if (tracker == null) {
    throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  }
  producer.flush();
  try {
    tracker.awaitCompletion(maxAckWaitMillis);
    return tracker.createPublishResult();
  } catch (final InterruptedException e) {
    logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    Thread.currentThread().interrupt();
    return tracker.failOutstanding(e);
  } catch (final TimeoutException e) {
    logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    return tracker.failOutstanding(e);
  } finally {
    tracker = null;
  }
}

代码示例来源:origin: linkedin/cruise-control

_producer.flush();
if (LOG.isDebugEnabled()) {
 LOG.debug("Stored {} partition metric samples and {} broker metric samples to Kafka",

代码示例来源:origin: apache/nifi

public PublishResult complete() {
  if (tracker == null) {
    if (messagesSent.get() == 0L) {
      return PublishResult.EMPTY;
    }
    rollback();
    throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  }
  producer.flush();
  if (activeTransaction) {
    producer.commitTransaction();
    activeTransaction = false;
  }
  try {
    tracker.awaitCompletion(maxAckWaitMillis);
    return tracker.createPublishResult();
  } catch (final InterruptedException e) {
    logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    Thread.currentThread().interrupt();
    return tracker.failOutstanding(e);
  } catch (final TimeoutException e) {
    logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    return tracker.failOutstanding(e);
  } finally {
    tracker = null;
  }
}

代码示例来源:origin: apache/nifi

public PublishResult complete() {
  if (tracker == null) {
    if (messagesSent.get() == 0L) {
      return PublishResult.EMPTY;
    }
    rollback();
    throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  }
  producer.flush();
  if (activeTransaction) {
    producer.commitTransaction();
    activeTransaction = false;
  }
  try {
    tracker.awaitCompletion(maxAckWaitMillis);
    return tracker.createPublishResult();
  } catch (final InterruptedException e) {
    logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    Thread.currentThread().interrupt();
    return tracker.failOutstanding(e);
  } catch (final TimeoutException e) {
    logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    return tracker.failOutstanding(e);
  } finally {
    tracker = null;
  }
}

代码示例来源:origin: apache/nifi

public PublishResult complete() {
  if (tracker == null) {
    if (messagesSent.get() == 0L) {
      return PublishResult.EMPTY;
    }
    rollback();
    throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  }
  producer.flush();
  if (activeTransaction) {
    producer.commitTransaction();
    activeTransaction = false;
  }
  try {
    tracker.awaitCompletion(maxAckWaitMillis);
    return tracker.createPublishResult();
  } catch (final InterruptedException e) {
    logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    Thread.currentThread().interrupt();
    return tracker.failOutstanding(e);
  } catch (final TimeoutException e) {
    logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    return tracker.failOutstanding(e);
  } finally {
    tracker = null;
  }
}

代码示例来源:origin: spring-projects/spring-kafka

@Override
public void flush() {
  this.delegate.flush();
}

代码示例来源:origin: apache/incubator-gobblin

producer.flush();
producer.flush();
producer.flush();

代码示例来源:origin: spring-projects/spring-kafka

/**
 * {@inheritDoc}
 * <p><b>Note</b> It only makes sense to invoke this method if the
 * {@link ProducerFactory} serves up a singleton producer (such as the
 * {@link DefaultKafkaProducerFactory}).
 */
@Override
public void flush() {
  Producer<K, V> producer = getTheProducer();
  try {
    producer.flush();
  }
  finally {
    closeProducer(producer, inTransaction());
  }
}

代码示例来源:origin: confluentinc/kafka-streams-examples

/**
 * @param topic          Kafka topic to write the data records to
 * @param records        Data records to write to Kafka
 * @param producerConfig Kafka producer configuration
 * @param <K>            Key type of the data records
 * @param <V>            Value type of the data records
 */
public static <K, V> void produceKeyValuesSynchronously(
  String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
  throws ExecutionException, InterruptedException {
 Producer<K, V> producer = new KafkaProducer<>(producerConfig);
 for (KeyValue<K, V> record : records) {
  Future<RecordMetadata> f = producer.send(
    new ProducerRecord<>(topic, record.key, record.value));
  f.get();
 }
 producer.flush();
 producer.close();
}

代码示例来源:origin: org.apache.kafka/connect-runtime

/**
 * Flush the underlying producer to ensure that all pending writes have been sent.
 */
public void flush() {
  producer.flush();
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public void flush() {
  log.debug("Flushing producer");
  producer.flush();
  checkForException();
}

代码示例来源:origin: org.jbpm.contrib/kafka-workitem

@Override
  public void close() {
    if (producer != null) {
      producer.flush();
      producer.close();
    }
  }
}

代码示例来源:origin: apache/incubator-rya

@Override
  public void fromCollection(final Collection<VisibilityStatement> statements) throws RyaStreamsException {
    requireNonNull(statements);

    for(final VisibilityStatement statement : statements) {
      producer.send(new ProducerRecord<>(topic, statement));
    }
    producer.flush();
  }
}

代码示例来源:origin: reactor/reactor-kafka

/**
 * Tests invocation of methods on KafkaProducer using {@link KafkaSender#doOnProducer(java.util.function.Function)}
 */
@Test
public void producerMethods() {
  testProducerMethod(p -> assertEquals(0, p.metrics().size()));
  testProducerMethod(p -> assertEquals(2, p.partitionsFor(topic).size()));
  testProducerMethod(p -> p.flush());
}

相关文章