数据从 Kinesis Stream 中丢失。这可能是什么原因?
Data is getting lost from the Kinesis Stream. What could be the reason for this?
我遇到了一个问题。我正在测试 3 个消费者和 1 个生产者。从生产者产生的所有击键中,消费者无法接收生产者发送的所有数据。这可能是什么原因?
在下面的截图中,生产者发送了a
、b
、c
和d
,但只收到了d
。
右下是生产者,其他3个是收听同一个流的消费者。如我们所见,只有左下角的一个消费者收到了d
,其他数据都丢失了。
我正在测试的代码:
制作人:
var stdin = process.openStdin();
function insert( input ) {
var params = {
Data: input,
PartitionKey: 'users',
StreamName: 'test-stream1'
};
kinesis.putRecord( params, function ( err, data ) {
if ( err ) console.log( err, err.stack ); // an error occurred
else console.log( data ); // successful response
} );
}
stdin.addListener( "data", function ( d ) {
// PRODUCING THE KEY STROKES
// TYPED BY USER INPUT
insert( d.toString().trim() );
} );
消费者:
function getRecord() {
kinesis.describeStream( {
StreamName: 'test-stream1'
}, function ( err, streamData ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
// console.log( streamData ); // successful response
streamData.StreamDescription.Shards.forEach( shard => {
kinesis.getShardIterator( {
ShardId: shard.ShardId,
ShardIteratorType: 'LATEST',
StreamName: 'test-stream1'
}, function ( err, shardIteratordata ) {
if ( err ) {
// console.log( err, err.stack ); // an error occurred
} else {
//console.log(shardIteratordata); // successful response
kinesis.getRecords( {
ShardIterator: shardIteratordata.ShardIterator
}, function ( err, recordsData ) {
if ( err ) {
// console.log( err, err.stack ); // an error occurred
} else {
// console.log( JSON.stringify( recordsData ) ); // successful response
recordsData.Records.forEach( record => {
console.log( record.Data.toString(), shard.ShardId );
} );
}
} );
}
} );
} );
}
} );
}
setInterval( getRecord, 1000 * 1 );
我使用迭代器类型 LATEST
这样每个消费者都能从生产者那里得到最新的数据。
如果我没记错的话,您总是在阅读最近的记录之后。这是通过 ShardIteratorType: 'Latest'
配置的。
根据 documentation 它说
LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.
这应该只用于获取第一个迭代器,之后您需要获取下一个迭代器,从与最后一个迭代器结束的完全相同的位置开始。
因此,您可以使用 GetIterator
请求的 NextShardIterator
(如果存在)来跟进补充记录。参见 doc。
目前您在每个间隔后丢弃迭代器并从最后开始。
例子
我获取了你的代码并将 setInterval
移动到仅使用下一个迭代器
重复 getRecords
请求
function getRecord() {
kinesis.describeStream({ StreamName: 'test-stream1'}, function ( err, streamData ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
// console.log( streamData ); // successful response
streamData.StreamDescription.Shards.forEach( shard => {
kinesis.getShardIterator({
ShardId: shard.ShardId,
ShardIteratorType: 'LATEST',
StreamName: 'test-stream1'
}, function ( err, shardIteratordata ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
var shardIterator = shardIteratordata.ShardIterator;
setInterval(function() {
kinesis.getRecords({ ShardIterator: shardIterator }, function ( err, recordsData ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
// console.log( JSON.stringify( recordsData ) ); // successful response
recordsData.Records.forEach(record => {
console.log( record.Data.toString(), shard.ShardId );
});
shardIterator = iterator = recordsData.NextShardIterator;
}
});
}, 1000 * 1 );
}
});
});
}
});
}
我遇到了一个问题。我正在测试 3 个消费者和 1 个生产者。从生产者产生的所有击键中,消费者无法接收生产者发送的所有数据。这可能是什么原因?
在下面的截图中,生产者发送了a
、b
、c
和d
,但只收到了d
。
右下是生产者,其他3个是收听同一个流的消费者。如我们所见,只有左下角的一个消费者收到了d
,其他数据都丢失了。
我正在测试的代码:
制作人:
var stdin = process.openStdin();
function insert( input ) {
var params = {
Data: input,
PartitionKey: 'users',
StreamName: 'test-stream1'
};
kinesis.putRecord( params, function ( err, data ) {
if ( err ) console.log( err, err.stack ); // an error occurred
else console.log( data ); // successful response
} );
}
stdin.addListener( "data", function ( d ) {
// PRODUCING THE KEY STROKES
// TYPED BY USER INPUT
insert( d.toString().trim() );
} );
消费者:
function getRecord() {
kinesis.describeStream( {
StreamName: 'test-stream1'
}, function ( err, streamData ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
// console.log( streamData ); // successful response
streamData.StreamDescription.Shards.forEach( shard => {
kinesis.getShardIterator( {
ShardId: shard.ShardId,
ShardIteratorType: 'LATEST',
StreamName: 'test-stream1'
}, function ( err, shardIteratordata ) {
if ( err ) {
// console.log( err, err.stack ); // an error occurred
} else {
//console.log(shardIteratordata); // successful response
kinesis.getRecords( {
ShardIterator: shardIteratordata.ShardIterator
}, function ( err, recordsData ) {
if ( err ) {
// console.log( err, err.stack ); // an error occurred
} else {
// console.log( JSON.stringify( recordsData ) ); // successful response
recordsData.Records.forEach( record => {
console.log( record.Data.toString(), shard.ShardId );
} );
}
} );
}
} );
} );
}
} );
}
setInterval( getRecord, 1000 * 1 );
我使用迭代器类型 LATEST
这样每个消费者都能从生产者那里得到最新的数据。
如果我没记错的话,您总是在阅读最近的记录之后。这是通过 ShardIteratorType: 'Latest'
配置的。
根据 documentation 它说
LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.
这应该只用于获取第一个迭代器,之后您需要获取下一个迭代器,从与最后一个迭代器结束的完全相同的位置开始。
因此,您可以使用 GetIterator
请求的 NextShardIterator
(如果存在)来跟进补充记录。参见 doc。
目前您在每个间隔后丢弃迭代器并从最后开始。
例子
我获取了你的代码并将 setInterval
移动到仅使用下一个迭代器
getRecords
请求
function getRecord() {
kinesis.describeStream({ StreamName: 'test-stream1'}, function ( err, streamData ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
// console.log( streamData ); // successful response
streamData.StreamDescription.Shards.forEach( shard => {
kinesis.getShardIterator({
ShardId: shard.ShardId,
ShardIteratorType: 'LATEST',
StreamName: 'test-stream1'
}, function ( err, shardIteratordata ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
var shardIterator = shardIteratordata.ShardIterator;
setInterval(function() {
kinesis.getRecords({ ShardIterator: shardIterator }, function ( err, recordsData ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
// console.log( JSON.stringify( recordsData ) ); // successful response
recordsData.Records.forEach(record => {
console.log( record.Data.toString(), shard.ShardId );
});
shardIterator = iterator = recordsData.NextShardIterator;
}
});
}, 1000 * 1 );
}
});
});
}
});
}