作为我们当前kafka集群的一部分,高可用性测试(ha)正在进行中。目标是,当生产者作业将数据推送到某个主题的特定分区时,kafka集群中的所有代理都会依次重新启动(停止第一个代理-重新启动它,在第一个代理出现后,对第二个代理执行相同的步骤,依此类推)。在测试进行期间,制作人的工作将在大约30分钟内推送700万张唱片。在作业结束时,发现大约有1000条记录丢失。
下面是我们Kafka集群的细节:(Kafka2.10-0.8.2.0)
-3个kafka代理,每个代理有2个100gb挂载
创建主题时使用:-复制因子3-min.insync.replica=2
服务器属性:
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/drive1,/drive2
num.partitions=1
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=10000
log.retention.hours=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=1800000
log.cleaner.enable=false
zookeeper.connect=ZK1:2181,ZK2:2181,ZK3:2181
zookeeper.connection.timeout.ms=10000
advertised.host.name=XXXX
auto.leader.rebalance.enable=true
auto.create.topics.enable=false
queued.max.requests=500
delete.topic.enable=true
controlled.shutdown.enable=true
unclean.leader.election=false
num.replica.fetchers=4
controller.message.queue.size=10
producer.properties(具有新producer api的aync producer)
bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
acks=all
buffer.memory=33554432
compression.type=snappy
batch.size=32768
linger.ms=5
max.request.size=1048576
block.on.buffer.full=true
reconnect.backoff.ms=10
retry.backoff.ms=100
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
是否有人可以共享有关kafka集群和ha的任何信息,以确保在启动kafka代理时不会丢失数据?
还有,这是我的制片人代码。这是一个火和忘记类型的生产者。到目前为止,我们还没有明确地处理故障。几乎数百万张唱片都能正常工作。我看到的问题,只有当Kafka经纪人重新启动如上所述。
public void sendMessage(List<byte[]> messages, String destination, Integer parition, String kafkaDBKey) {
for(byte[] message : messages) {
producer.send(new ProducerRecord<byte[], byte[]>(destination, parition, kafkaDBKey.getBytes(), message));
}
}
1条答案
按热度按时间wr98u20j1#
通过在生产者端将默认重试值从0增加到4000,我们能够成功地发送数据而不会丢失数据。
retries=4000
由于此设置,有可能发送两次相同的消息,并且消息在使用者接收到消息时顺序不一致(第二个消息可能在第一个消息之前到达)。但对于我们目前的问题,这不是一个问题,而是在消费者方面处理的,以确保一切正常。