我们发现JavaKafkaProducer0.9客户端在发送小消息时性能非常差。消息不会被累积到一个更大的请求批中,因此每个小记录都是单独发送的。
我们的客户端配置有什么问题?还是其他问题?
使用kafka客户端0.9.0.0。我们在kafka unreleased 9.0.1或9.1 fixed或unresolved列表中没有看到任何相关的帖子,因此我们主要关注客户机配置和服务器示例。
我们理解linger.ms应该导致客户机将记录累积到一个批中。
我们将linger.ms设置为10(也尝试了100和1000),但这并没有导致批量累积记录。如果记录大小约为100字节,请求缓冲区大小为16k,那么我们预计在单个请求中会发送大约160条消息。
尽管已经分配了一个新的bluemix消息传递中心(kafka服务器0.9)服务示例,但客户端的跟踪似乎表明分区可能已满。测试客户机在一个循环中发送多条消息,没有其他i/o。
日志显示了一个重复序列,其中有一行可疑的内容:“唤醒发件人,因为主题mytopic分区0已满或正在获取新批”。
因此,在我们的测试用例中,新分配的分区基本上应该是空的,那么为什么生产者客户机将获得一个新的批呢?
2015-12-10 15:14:41,335 3677 [main] TRACE com.isllc.client.producer.ExploreProducer - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00011 2015-12-10T15:14:41.335-05:00'
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - Sending record ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af, value=[B@4923ab24 with callback null to topic mytopic partition 0
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator - Allocating a new 16384 byte message buffer for topic mytopic partition 0
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - Waking up the sender since topic mytopic partition 0 is either full or getting a new batch
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - Nodes with data ready to send: [Node(0, kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)]
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - Created 1 produce requests: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, request=RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)]
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - Received produce response from node 0 with correlation id 11
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.RecordBatch - Produced messages to topic-partition mytopic-0 with base offset offset 130 and error: null.
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - Send returned metadata: Topic='mytopic', Partition=0, Offset=130
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00012 2015-12-10T15:14:41.412-05:00'
Log entries repeat like the above for each record sent
我们提供了以下属性文件:
2015-12-10 15:14:37,843 185 [main] INFO com.isllc.client.AbstractClient - Properties retrieved from file for Kafka client: kafka-producer.properties
2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient - acks=-1
2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient - ssl.protocol=TLSv1.2
2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient - key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - client.id=ExploreProducer
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.identification.algorithm=HTTPS
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.password=changeit
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.type=JKS
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.enabled.protocols=TLSv1.2
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - security.protocol=SASL_SSL
Plus we added linger.ms=10 in code.
kafka客户端显示展开/合并的配置列表(并显示linger.ms设置):
2015-12-10 15:14:37,970 312 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, kafka03-prod01.messagehub.services.us-south.bluemix.net:9094, kafka04-prod01.messagehub.services.us-south.bluemix.net:9094, kafka05-prod01.messagehub.services.us-south.bluemix.net:9094]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = [hidden]
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id = ExploreProducer
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLSv1.2
request.timeout.ms = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2]
acks = -1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = SASL_SSL
retries = 0
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
ssl.truststore.location = /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms = 10
Kafka在发送100条记录后的衡量标准:
Duration for 100 sends 8787 ms. Sent 7687 bytes.
batch-size-avg = 109.87 [The average number of bytes sent per partition per-request.]
batch-size-max = 110.0 [The max number of bytes sent per partition per-request.]
buffer-available-bytes = 3.3554432E7 [The total amount of buffer memory that is not being used (either unallocated or in the free list).]
buffer-exhausted-rate = 0.0 [The average per-second number of record sends that are dropped due to buffer exhaustion]
buffer-total-bytes = 3.3554432E7 [The maximum amount of buffer memory the client can use (whether or not it is currently used).]
bufferpool-wait-ratio = 0.0 [The fraction of time an appender waits for space allocation.]
byte-rate = 291.8348916277093 []
compression-rate = 0.0 []
compression-rate-avg = 0.0 [The average compression rate of record batches.]
connection-close-rate = 0.0 [Connections closed per second in the window.]
connection-count = 2.0 [The current number of active connections.]
connection-creation-rate = 0.05180541884681138 [New connections established per second in the window.]
incoming-byte-rate = 10.342564641029007 []
io-ratio = 0.0038877559207471236 [The fraction of time the I/O thread spent doing I/O]
io-time-ns-avg = 353749.2840375587 [The average length of time for I/O per select call in nanoseconds.]
io-wait-ratio = 0.21531227995769162 [The fraction of time the I/O thread spent waiting.]
io-wait-time-ns-avg = 1.9591901192488264E7 [The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.]
metadata-age = 8.096 [The age in seconds of the current producer metadata being used.]
network-io-rate = 5.2937784999213795 [The average number of network operations (reads or writes) on all connections per second.]
outgoing-byte-rate = 451.2298783403283 []
produce-throttle-time-avg = 0.0 [The average throttle time in ms]
produce-throttle-time-max = 0.0 [The maximum throttle time in ms]
record-error-rate = 0.0 [The average per-second number of record sends that resulted in errors]
record-queue-time-avg = 15.5 [The average time in ms record batches spent in the record accumulator.]
record-queue-time-max = 434.0 [The maximum time in ms record batches spent in the record accumulator.]
record-retry-rate = 0.0 []
record-send-rate = 2.65611304417116 [The average number of records sent per second.]
record-size-avg = 97.87 [The average record size]
record-size-max = 98.0 [The maximum record size]
records-per-request-avg = 1.0 [The average number of records per request.]
request-latency-avg = 0.0 [The average request latency in ms]
request-latency-max = 74.0 []
request-rate = 2.6468892499606897 [The average number of requests sent per second.]
request-size-avg = 42.0 [The average size of all requests in the window..]
request-size-max = 170.0 [The maximum size of any request sent in the window.]
requests-in-flight = 0.0 [The current number of in-flight requests awaiting a response.]
response-rate = 2.651196976060479 [The average number of responses received per second.]
select-rate = 10.989861465830819 [Number of times the I/O layer checked for new I/O to perform per second]
waiting-threads = 0.0 [The number of user threads blocked waiting for buffer memory to enqueue their records]
谢谢
1条答案
按热度按时间30byixjq1#
kafka用户邮件列表中的guozhang wang通过查看我们的应用程序代码能够识别问题:
郭璋,
是的-你发现了问题!
我们已经插入了用于调试的.get(),但是没有想到(巨大的!)副作用。
使用异步回调可以很好地工作。
我们现在可以在14秒内将100000条记录从笔记本电脑发送到bluemix云,速度快了约1000倍,
非常感谢你!
加里
2015年12月13日下午2:48, Realm 璋写道:
加里,
您正在调用“kafkaproducer.send(record).get();”对于每一条消息,get()调用都会阻塞直到future被初始化,这可以有效地同步发送的所有消息,方法是在发送下一条消息之前请求每条消息的ack,因此不会进行批处理。
您可以尝试使用“send(record,callback)”进行异步发送,并让回调处理来自返回元数据的错误。
郭璋