我有一个接收大量get请求的应用程序(5分钟内大约250000个)。应用程序解析查询参数并发布到kafka。要发布的代码如下:
public class KafkaProcessor {
private static final String BATCH_SIZE = "batch.size";
private static final String REQUEST_REQUIRED_ACKS = "request.required.acks";
private static final String PRODUCER_TYPE = "producer.type";
private static final String VALUE_SERIALIZER = "value.serializer";
private static final String KEY_SERIALIZER = "key.serializer";
private static final String METADATA_BROKER_LIST = "bootstrap.servers";
private static final String MAX_BLOCK_MS = "max.block.ms";
private static final String KAFKA_ENABLED = "enabled";
private static Properties props = new Properties();
private static KafkaProducer<String, String> producer;
private static ProducerRecord<String, String> producerRecord;
private static String topic;
static {
boolean isEnabled = Boolean.parseBoolean(ResourceProps.INSTANCE.getKafkaProps(KAFKA_ENABLED));
if (isEnabled) {
//Setting up a producer configuration.
props.put(METADATA_BROKER_LIST, "x.x.x.x:9092,y.y.y.y:9092");
props.put(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
props.put(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
props.put(PRODUCER_TYPE, "async");
props.put(REQUEST_REQUIRED_ACKS, "1");
props.put(BATCH_SIZE, "1000");
props.put(MAX_BLOCK_MS, "10000");
producer = new KafkaProducer<>(props);
topic = "pixel-server";
}
}
private static void publishToKafka(JSONObject data) {
producerRecord = new ProducerRecord<String, String>(topic, data.toString());
producer.send(producerRecord, new Callback() {
@Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
}
}
});
}
}
我的应用程序托管在aws示例中。kafka服务器也在另一台aws机器中。
但是,如果Kafka关闭或Kafka由于任何原因响应缓慢,则我的应用程序将冻结,无法进一步处理任何请求。我想知道如何使我的应用程序独立于Kafka,也就是说,如果Kafka出现故障(或响应缓慢),那么它应该不会影响我的应用程序。
我尝试了两种方法,比如如果kafka给出了一个超时,然后计算超时异常的数量,然后停止发布到kafka,但是由于请求量非常大,所以当我得到任何超时异常时,我的应用程序就会冻结。
任何帮助或指示都将不胜感激。
我用的是Kafka0.8.2。我的服务器在vertx中。ubuntu中使用的操作系统。ulimit设置为max。
1条答案
按热度按时间ncecgwcz1#
假设您的kafka集群中有三个或更多节点(这对于任何高负载的应用程序都是至关重要的),您可以尝试以下技巧:
尝试设置
acks
生产者配置到0
. 这将影响应用程序的一致性(有些消息可能会在生产者端被丢弃,并将永远丢失)。文件上说:如果设置为零,那么生产者根本不会等待服务器的任何确认。记录将立即添加到套接字缓冲区并被视为已发送。在这种情况下,不能保证服务器已收到记录
套
max.block.ms
生产者配置到0
. 这将导致应用程序在每次发送到kafka集群时立即抛出timeoutexception,而不进行任何阻塞,但仅在内存缓冲区溢出时抛出。请注意,它只影响客户端阻塞,而不影响网络调用!减少
request.timeout.ms
小值(如10
或者100
). 这将导致kafka客户机在任何耗时超过的网络操作上抛出timeoutexceptionrequest.timeout.ms
.还有一些建议:
将您的kafka示例更新到最新版本以获得更稳定的集群;
为了获得高可用性,您的kafka集群必须至少包含三个节点(并且总是奇数个节点以避免大脑分裂的情况)
你应该试着和我一起玩
max.batch.size
以及linger.ms
producer配置以达到应用程序的最佳延迟吞吐量比率