我有Kafka流应用程序与1.0.0Kafka流api。我有单代理0.10.2.0Kafka和单主题单分区。除producer request.timeout.ms外,所有可配置参数都相同。我将producer request.timeout.ms配置为5分钟,以修复kafka streams程序在生成问题时引发的异常。
在我的流应用程序中,我读取来自Kafka的事件,处理它们并转发到同一个Kafka的另一个主题。
经过统计,我发现处理的时间占5%,剩下的95%用于读写。
尽管我在Kafka有上千万个事件,但有时Kafka民意测验返回的记录是个位数,有时Kafka民意测验返回的记录是上千个。
有时上下文转发需要更多的时间向Kafka发送更少的记录,有时上下文转发需要更少的时间向Kafka发送更多的记录。
我试图通过增加max.poll.records、poll.ms值来提高读取性能。但运气不好。
如何在读取和转发时提高性能?Kafka民调和前进将如何工作?哪些参数有助于提高性能?
下面是我的应用程序中几个重要的producer配置参数。
acks = 1
batch.size = 16384
buffer.memory = 33554432
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 240000
retries = 10
retry.backoff.ms = 100
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
transaction.timeout.ms = 60000
transactional.id = null
以下是我的应用程序中几个重要的使用者配置参数:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 3000
internal.leave.group.on.close = false
isolation.level = read_uncommitted
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 10000
metadata.max.age.ms = 300000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
以下是我的应用程序中几个重要的流配置参数:
application.server =
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
commit.interval.ms = 30000
connections.max.idle.ms = 540000
key.serde = null
metadata.max.age.ms = 300000
num.standby.replicas = 0
num.stream.threads = 1
poll.ms = 1000
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
timestamp.extractor = null
value.serde = null
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =
2条答案
按热度按时间ve7v8dk21#
通过控制键和增加主题的分区数,可以在操作中引入并行性。
以上将增加kafka流并行处理的数量。这可以通过增加kafka streams应用程序的线程数来处理
rmbxnbpk2#
您可以在不同的线程中创建多个kafka使用者,并将其分配给同一使用者组。它们将并行地消耗消息,并且不会丢失消息。
你如何发送信息?有了Kafka,你可以用一种“火而忘”的方式发送消息:它提高了吞吐量。
acks参数控制在生产者认为写入成功之前,必须有多少分区副本接收到记录。
如果设置ack=0,则在假定消息已成功发送之前,生产者不会等待代理的答复。但是,由于生产者不等待服务器的任何响应,因此它可以以网络支持的速度发送消息,因此可以使用此设置来实现非常高的吞吐量。