数据从 Kinesis Stream 中丢失。这可能是什么原因?

Data is getting lost from the Kinesis Stream. What could be the reason for this?

我遇到了一个问题。我正在测试 3 个消费者和 1 个生产者。从生产者产生的所有击键中,消费者无法接收生产者发送的所有数据。这可能是什么原因?

在下面的截图中,生产者发送了abcd,但只收到了d

右下是生产者,其他3个是收听同一个流的消费者。如我们所见,只有左下角的一个消费者收到了d,其他数据都丢失了。

我正在测试的代码:

制作人:

var stdin = process.openStdin();

function insert( input ) {

    var params = {
    Data: input,
    PartitionKey: 'users',
    StreamName: 'test-stream1'
    };
    kinesis.putRecord( params, function ( err, data ) {
    if ( err ) console.log( err, err.stack ); // an error occurred
    else console.log( data );           // successful response
    } );
}



stdin.addListener( "data", function ( d ) {
    // PRODUCING THE KEY STROKES
    // TYPED BY USER INPUT
    insert( d.toString().trim() );
} );

消费者:

    function getRecord() {
        kinesis.describeStream( {
        StreamName: 'test-stream1'
        }, function ( err, streamData ) {
        if ( err ) {
            console.log( err, err.stack ); // an error occurred
        } else {
            // console.log( streamData ); // successful response
            streamData.StreamDescription.Shards.forEach( shard => {
            kinesis.getShardIterator( {
                ShardId: shard.ShardId,
                ShardIteratorType: 'LATEST',
                StreamName: 'test-stream1'
            }, function ( err, shardIteratordata ) {
                if ( err ) {
                    // console.log( err, err.stack ); // an error occurred
                } else {
                    //console.log(shardIteratordata); // successful response
                    kinesis.getRecords( {
                        ShardIterator: shardIteratordata.ShardIterator
                    }, function ( err, recordsData ) {
                        if ( err ) {
                            // console.log( err, err.stack ); // an error occurred
                        } else {
                            // console.log( JSON.stringify( recordsData ) ); // successful response
                            recordsData.Records.forEach( record => {
                                console.log( record.Data.toString(), shard.ShardId );
                            } );
                        }
                    } );
                }
            } );
            } );
        }
        } );
    }

    setInterval( getRecord, 1000 * 1 );

我使用迭代器类型 LATEST 这样每个消费者都能从生产者那里得到最新的数据。

如果我没记错的话,您总是在阅读最近的记录之后。这是通过 ShardIteratorType: 'Latest' 配置的。 根据 documentation 它说

LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.

这应该只用于获取第一个迭代器,之后您需要获取下一个迭代器,从与最后一个迭代器结束的完全相同的位置开始。

因此,您可以使用 GetIterator 请求的 NextShardIterator(如果存在)来跟进补充记录。参见 doc

目前您在每个间隔后丢弃迭代器并从最后开始。

例子

我获取了你的代码并将 setInterval 移动到仅使用下一个迭代器

重复 getRecords 请求
function getRecord() {
  kinesis.describeStream({ StreamName: 'test-stream1'}, function ( err, streamData ) {
    if ( err ) {
      console.log( err, err.stack ); // an error occurred
    } else {
      // console.log( streamData ); // successful response
      streamData.StreamDescription.Shards.forEach( shard => {
        kinesis.getShardIterator({
          ShardId: shard.ShardId,
          ShardIteratorType: 'LATEST',
          StreamName: 'test-stream1'
        }, function ( err, shardIteratordata ) {
          if ( err ) {
            console.log( err, err.stack ); // an error occurred
          } else {
            var shardIterator = shardIteratordata.ShardIterator;

            setInterval(function() {
              kinesis.getRecords({ ShardIterator: shardIterator }, function ( err, recordsData ) {
                if ( err ) {
                  console.log( err, err.stack ); // an error occurred
                } else {
                  // console.log( JSON.stringify( recordsData ) ); // successful response
                  recordsData.Records.forEach(record => {
                    console.log( record.Data.toString(), shard.ShardId );
                  });
                  shardIterator = iterator = recordsData.NextShardIterator;
                }
              });
            }, 1000 * 1 );

          }
        });
      });
    }
  });
}