NodeJS putRecords之后没有关于AWS Kinesis的数据

dpiehjr4  于 2023-08-04  发布在  Node.js
关注(0)|答案(1)|浏览(90)

bounty还有6天到期。回答此问题可获得+100声望奖励。Rodrigo正在寻找一个答案从一个有信誉的来源

免责声明:这是我第一次使用AWS Kinesis,所以我可能会期待一些不正确的东西。
我有一个非常简单的AWS Lambda函数,可以将数据插入Kinesis。API响应指示没有错误,并且没有抛出异常; Kinesis只是确认一切正常。但是,当我去AWS控制台并尝试查询数据时,那里什么都没有!

const records = [
  { partitionKey: '1', data: 'Record 1' },
  { partitionKey: '2', data: 'Record 2' },
  { partitionKey: '3', data: 'Record 3' },
]

const params = {
  Records: records.map((record) => ({
    Data: record.data,
    PartitionKey: record.partitionKey,
  })),
  StreamName: streamName,
}

const response = await kinesis.putRecords(params).promise()

const suscess = response.FailedRecordCount === 0

字符串
suscess是真的。
因此,我尝试在同一个Lambda函数中使用代码检索数据。在将数据插入Kinesis之后,我添加了以下代码:

const params2 = {
  ShardIteratorType: 'LATEST',
  ShardId: 'shardId-000000000000',
  StreamName: streamName,
}

const response2 = await kinesis.getShardIterator(params2).promise()
const shardIterator = response2.ShardIterator

const records2 = await kinesis.getRecords({ ShardIterator: shardIterator! }).promise()


records2是一个空数组。
令我惊讶的是,没有任何记录被退回;响应为空,没有检索到有用的信息。此外,没有引发任何异常。
我检查了shardId,它确实存在。
所以我的问题是我做错了什么
为什么AWS Kinesis显示已插入数据,但似乎没有发生任何事情?在AWS控制台中,我可以从使用图中看到put和get操作中的活动。

vawmfj5a

vawmfj5a1#

由于您最近刚开始使用Kinesis数据流,因此建议您阅读architecture and terminology document。它将给予您深入了解数据在系统中的内部流传输方式。
根据AWS文档:
Kinesis数据流是一组碎片。每个碎片都有一个数据记录序列。每个数据记录都有一个由Kinesis Data Streams分配的序列号。
基于此定义,Kinesis数据流在内部使用碎片来流式传输数据。如果您使用的是 * 按需 * 容量模式,则最初将分配最少4个碎片,并根据应用程序的吞吐量增加碎片数。但是,如果您使用 Provisioned 模式创建Kinesis数据流,则必须至少使用2个碎片。因此,Kinesis数据流始终有多个碎片来处理数据流。
关于您在使用者端使用的代码,您明确地将shard ID值硬编码为shardId-000000000000,但是您无法确定您发布的数据是否发送到了该特定shard ID。因此,除了检查发布者端的FailedRecords计数外,还记录putRecords方法的完整响应,它将显示哪个shardId用于流式传输数据。
为了简化操作,我使用了AWS CLI命令来发布数据,您可以轻松地将其解释/转换为node.js以获得相同的结果。
PutRecords命令:

aws kinesis put-records \
    --stream-name <ENTER_STREAM_NAME_HERE> \
    --records Data="Record 1",PartitionKey="1" Data="Record 2",PartitionKey="2" Data="Record 3",PartitionKey="3" --cli-binary-format raw-in-base64-out

字符串
PutRecords回应:

{
    "FailedRecordCount": 0,
    "Records": [
        {
            "SequenceNumber": "49643231014796346134140772956682328171122645057423802450",
            "ShardId": "shardId-000000000005"
        },
        {
            "SequenceNumber": "49643231014796346134140772956683537096942259686598508626",
            "ShardId": "shardId-000000000005"
        },
        {
            "SequenceNumber": "49643231014796346134140772956684746022761874315773214802",
            "ShardId": "shardId-000000000005"
        }
    ]
}


如果您观察到上面的响应,则表明在执行put-records命令时,数据被发送到了shardId-000000000005
让我们假设,我之前没有使用过这个分片中的任何记录,所以我将使用TRIM_HORIZON作为ShardIteratorType来获取数据指针值。
GetShardIterator命令:

aws kinesis get-shard-iterator \
    --stream-name <ENTER_STREAM_NAME_HERE> \
    --shard-id shardId-000000000005 \
    --shard-iterator-type TRIM_HORIZON


GetShardIterator响应:

{
    "ShardIterator": "AAAAAAAAAAH2b4HgeaV/7klnxSTYd3/T9YcQ2eKxjELpkEgXAy1k0hVidh05ZeIUdMBHo0SdJOjBq5HWwGG3dZPCKM8kTBYCWYLhv7OrC9PQo6qdRuhC8uY4LH6GEBenMgf7dzS1wD/oep8EKZvSblDYVCfcpoXT4NbWIt8D5mvx4ZlPssmyuRR92DM0ywU6PjTM8tgOoixD5kEDro/SANFc5ohKIiOHxWjUsfpgvMoJFIFtLpkgQQ=="
}


上面的响应包含指针记录,用于从碎片中读取最旧(未修剪)的数据记录:shardId-000000000005。一旦我们有了迭代器的值,我们就可以使用get-records方法来获取记录。
“获取记录”命令:

aws kinesis get-records \
    --shard-iterator AAAAAAAAAAH2b4HgeaV/7klnxSTYd3/T9YcQ2eKxjELpkEgXAy1k0hVidh05ZeIUdMBHo0SdJOjBq5HWwGG3dZPCKM8kTBYCWYLhv7OrC9PQo6qdRuhC8uY4LH6GEBenMgf7dzS1wD/oep8EKZvSblDYVCfcpoXT4NbWIt8D5mvx4ZlPssmyuRR92DM0ywU6PjTM8tgOoixD5kEDro/SANFc5ohKIiOHxWjUsfpgvMoJFIFtLpkgQQ==


GetRecords响应:

{
    "Records": [
        {
            "SequenceNumber": "49643231014796346134140772956682328171122645057423802450",
            "ApproximateArrivalTimestamp": "2023-08-02T22:15:34.035000+00:00",
            "Data": "UmVjb3JkIDE=",
            "PartitionKey": "1"
        },
        {
            "SequenceNumber": "49643231014796346134140772956683537096942259686598508626",
            "ApproximateArrivalTimestamp": "2023-08-02T22:15:34.038000+00:00",
            "Data": "UmVjb3JkIDI=",
            "PartitionKey": "2"
        },
        {
            "SequenceNumber": "49643231014796346134140772956684746022761874315773214802",
            "ApproximateArrivalTimestamp": "2023-08-02T22:15:34.038000+00:00",
            "Data": "UmVjb3JkIDM=",
            "PartitionKey": "3"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAHIu30Hail1drAR8L9vok/zazMmRawSMqVACRymRKho+06rk6PHZ0G9JbYJLzIjUoo3UVT3XiqcfTL/QO6Dt1SJhY7p2P50V8Dhv2pkGavpNnh43114Mp4i3HAUSsYkwNRW8EJSIcJ/LZysNG1z0KLmbBp+Vau5UOj9mbZu4aU7H+97WqJkoHvK8/BC2AcMnVUlR03/xVHS8zy9fer8v6bRCjDgJMCU9CHyJamX5Douqg==",
    "MillisBehindLatest": 0
}


在您的代码中,您将ShardIteratorType值用作LATEST,这将在碎片中最后发布的记录之后创建一个指针。因此,如果您使用LATEST迭代器类型,请确保在发布数据之前先获取迭代器值。您也可以考虑使用其他迭代器类型,如this文档中所述。
我认为您现在已经确定了代码中的问题,这些问题可能存在于两个地方:
1.ShardId =>发布者可能使用了与您在代码中指定的值不同的shardId
1.ShardIteratorType =>由于您在发布数据后执行了消费者代码,因此它将在最近发布的记录后创建一个指针。因此,即使发布者向shardId-0000000000000发送了消息,您也无法检索之前发布的值。

相关问题