仅当记录数超过 x 时才启动 Kinesis 消费者?
Start Kinesis consumer only when there's over x number of records?
有没有办法创建具有缓冲区限制的 Kinesis 消费者?点赞 here:
#Flush when buffer exceeds 100000 Amazon Kinesis records, 64 MB size limit or when time since last buffer exceeds 1 hour
bufferByteSizeLimit = 67108864
bufferRecordCountLimit = 100000
bufferMillisecondsLimit = 3600000
基本上,我只想在有大量数据时才开始 IRecordProcessor
。我无法使用上面的连接器代码,因为我需要 amazon-kinesis-client
.
的 latest 版本
我最终实现了自己的解决方案。
- 有一个
ConcurrentHashMap
来存储流数据
private val recsMap = new ConcurrentHashMap[String, List[RecordStore]]
private val currByteSize = new AtomicLong(0L)
private val currRecordCount = new AtomicLong(0L)
private val currSeconds = new AtomicLong(0L)
- 更新计数器(按 size/time/记录数)
- 达到计数器时清除数据
recsMap.foreach(write2File())
// clean up
recsMap.remove(writtenRecs())
- 检查点和重置计数器
// reset counters
currByteSize.getAndSet(value)
currRecordCount.getAndSet(value)
currSeconds.getAndSet(value)
有没有办法创建具有缓冲区限制的 Kinesis 消费者?点赞 here:
#Flush when buffer exceeds 100000 Amazon Kinesis records, 64 MB size limit or when time since last buffer exceeds 1 hour
bufferByteSizeLimit = 67108864
bufferRecordCountLimit = 100000
bufferMillisecondsLimit = 3600000
基本上,我只想在有大量数据时才开始 IRecordProcessor
。我无法使用上面的连接器代码,因为我需要 amazon-kinesis-client
.
我最终实现了自己的解决方案。
- 有一个
ConcurrentHashMap
来存储流数据private val recsMap = new ConcurrentHashMap[String, List[RecordStore]] private val currByteSize = new AtomicLong(0L) private val currRecordCount = new AtomicLong(0L) private val currSeconds = new AtomicLong(0L)
- 更新计数器(按 size/time/记录数)
- 达到计数器时清除数据
recsMap.foreach(write2File()) // clean up recsMap.remove(writtenRecs())
- 检查点和重置计数器
// reset counters currByteSize.getAndSet(value) currRecordCount.getAndSet(value) currSeconds.getAndSet(value)