如何通过Kinesis Data Firehose转换coloudwatch日志事件以在ElasticSearch中插入记录

bwleehnv  于 2023-03-29  发布在  ElasticSearch
关注(0)|答案(1)|浏览(181)

我正在尝试通过创建Kinesis firehose订阅过滤器将我的cloudwatch日志流式传输到AWS Opensearch(ElasticSearch)。我想了解Kinesis数据firehose是否支持通过批量API批量插入到Elastic Search,还是插入单个记录?
所以基本上,我已经创建了一个lambda转换器,它聚合了Cloudwatch日志消息并将其转换为ElasticSearch的批量插入API所需的格式,我已经通过直接调用elasticsearch的批量API测试了我的数据格式,它工作正常,但是当我尝试通过firehose做同样的事情时,我得到了以下错误:

message : One or more records are malformed. Please ensure that each record is single valid JSON object and that it does not contain newlines
error code : OS.MalformedData

转换后的数据:

{"index":{"_index":"test","_type":"/aws/lambda/HelloWorldLogProducer","_id":"36495659535505340260631192776509739326813505134549729280"}}
{"timeMillis":1636521972493,"thread":"main","level":"INFO","loggerName":"helloworld.App","message":"Inflating logs to increase size","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{"AWSRequestId":"332de390-42b3-44dd-937a-cc4089ff9510"},"threadId":1,"threadPriority":5,"jobId":"${ctx:request_id}","clientUUID":"${ctx:client_uuid}","clientUUIDHeader":"${ctx:client_uuid_header}"}
{"index":{"_index":"test","_type":"/aws/lambda/HelloWorldLogProducer","_id":"36495659535884452929006213369915846537448527280151396358"}}
{"timeMillis":1636521973189,"thread":"main","level":"INFO","loggerName":"helloworld.App","message":"{ \"message\": \"hello world\", \"location\": \"3.137.161.28\" }","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{"AWSRequestId":"332de390-42b3-44dd-937a-cc4089ff9510"},"threadId":1,"threadPriority":5,"jobId":"${ctx:request_id}","clientUUID":"${ctx:client_uuid}","clientUUIDHeader":"${ctx:client_uuid_header}"}

firehose似乎不支持批量插入,就像cloud watch到opensearch订阅过滤器一样,但是我不能在opensearch订阅过滤器中定义索引名称。
以前有人遇到过类似的问题吗?

hrirmatl

hrirmatl1#

从lambda转换器获取转换后的数据后,Kinesis Firehose会在其上添加索引细节。因此,如果您想将其用于opensearch批量API,则必须从第一个对象中删除索引数据,因为它将由firehose添加。
在本例中,从lambda转换器返回的数据是

{"index":{"_index":"test","_type":"/aws/lambda/HelloWorldLogProducer","_id":"36495659535505340260631192776509739326813505134549729280"}}
{"timeMillis":1636521972493,"thread":"main","level":"INFO","loggerName":"helloworld.App","message":"Inflating logs to increase size","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{"AWSRequestId":"332de390-42b3-44dd-937a-cc4089ff9510"},"threadId":1,"threadPriority":5,"jobId":"${ctx:request_id}","clientUUID":"${ctx:client_uuid}","clientUUIDHeader":"${ctx:client_uuid_header}"}
{"index":{"_index":"test","_type":"/aws/lambda/HelloWorldLogProducer","_id":"36495659535884452929006213369915846537448527280151396358"}}
{"timeMillis":1636521973189,"thread":"main","level":"INFO","loggerName":"helloworld.App","message":"{ \"message\": \"hello world\", \"location\": \"3.137.161.28\" }","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{"AWSRequestId":"332de390-42b3-44dd-937a-cc4089ff9510"},"threadId":1,"threadPriority":5,"jobId":"${ctx:request_id}","clientUUID":"${ctx:client_uuid}","clientUUIDHeader":"${ctx:client_uuid_header}"}

到达消防水管后,它会变成

{"index":{"_index":"test","_type":"/aws/lambda/HelloWorldLogProducer","_id":"36495659535505340260631192776509739326813505134549729280"}}
{"index":{"_index":"test","_type":"/aws/lambda/HelloWorldLogProducer","_id":"36495659535505340260631192776509739326813505134549729280"}}
{"timeMillis":1636521972493,"thread":"main","level":"INFO","loggerName":"helloworld.App","message":"Inflating logs to increase size","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{"AWSRequestId":"332de390-42b3-44dd-937a-cc4089ff9510"},"threadId":1,"threadPriority":5,"jobId":"${ctx:request_id}","clientUUID":"${ctx:client_uuid}","clientUUIDHeader":"${ctx:client_uuid_header}"}
{"index":{"_index":"test","_type":"/aws/lambda/HelloWorldLogProducer","_id":"36495659535884452929006213369915846537448527280151396358"}}
{"timeMillis":1636521973189,"thread":"main","level":"INFO","loggerName":"helloworld.App","message":"{ \"message\": \"hello world\", \"location\": \"3.137.161.28\" }","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{"AWSRequestId":"332de390-42b3-44dd-937a-cc4089ff9510"},"threadId":1,"threadPriority":5,"jobId":"${ctx:request_id}","clientUUID":"${ctx:client_uuid}","clientUUIDHeader":"${ctx:client_uuid_header}"}

这破坏了批量API的语法,因此尝试删除第一个对象的索引,之后应该可以正常工作。

相关问题