从Kafka Source Connector记录更多数据

72qzrwbm  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(102)

我在AWS中安装了Kinesis Connector插件,以将我的Kinesis Stream与我的MSK集群连接起来。它正在工作,但我在Cloudwatch中看到的日志记录不是很有帮助:

[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} Finished commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)

如何增强我获得的日志记录?例如,查看每分钟写入群集的记录数。
我看了文档https://docs.confluent.io/platform/current/connect/logging.html,我想我看到的是被写入标准输出的默认值。但我不确定如何更改Kinesis连接器的日志级别以显示更多信息。
这是我目前的配置:

name=msk-connector-kinesis
connector.class=io.confluent.connect.kinesis.KinesisSourceConnector
tasks.max=3

kafka.topic=my-topic
kinesis.region=eu-central-1
kinesis.stream=kinesis_stream_eu-central-1

confluent.topic.bootstrap.servers=<server1>:9098,<server2>:9098,<server3>:9098
confluent.topic.replication.factor=3

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.consumer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
confluent.topic.producer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=all

世界经济论坛

xfb7svmp

xfb7svmp1#

OP要求的指标
“每分钟写入群集的记录数”
可以在Cloudwatch中作为metrics emitted by MSK Connect的一部分找到。这是因为您正在通过MSK连接功能运行连接器(参见问题中OP的注解)。
由于这是一个源连接器(将数据从源推送到MSK),因此您要查找的记录是SourceRecordPollRateSourceRecordWriteRate
这些指标的查询示例:

(取自AWS Big Data Blog
我不认为OP发布的原始线索是有效的(尝试增加日志记录以获得连接器的生产者比率等指标),但值得一提的是,MSK Connect将把Connector generated log records的严重级别INFO,WARN,ERROR和FATAL推到Cloudwatch服务器,Amazon S3或Kinesis Firehose Stream。
根据OP提供的输入,其工作符合预期:

[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} Finished commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)

相关问题