Scala 中的尾递归用例解决方案
Tail Recursion Use Case Solution in Scala
我正在尝试使用尾递归解决问题。
用例是:
我有文件夹列表,每个文件夹都有文件列表,每个文件都有几条记录。
我想对记录进行一些转换,然后将它们分批写入kinesis。
val listOfFolders = Folder1(File1(RF11, RF12, RF13), File2(RF21,RF22))
我想在 kinesis 中一次写两条记录。
到目前为止我已经尝试过:
listOfFolders.map { folder =>
val files = fetchAllFilesFromFolder(folder)
if (files.nonEmpty) {
sendBatch(files, Seq.empty[(ByteBuffer, String)], 2)
} else {
logger.info(s"No files are present in folder")
}
}
@scala.annotation.tailrec
def sendBatch(
files: Seq[Files],
buffer: Seq[(ByteBuffer, String)],
numberOfRecordsToSend: Int
): Unit =
files match {
case Nil => {
if (buffer.nonEmpty) {
sendToKinesis(streamName, buffer) map { putDataResult =>
val putDataList = putDataResult.getRecords.asScala.toList
logger.info(
s"Successfully Sent"
)
}
} else {
logger.info(s"Successfully sent")
}
}
case head :: tail => {
val fileData = readFileData()
val byteData: Seq[(ByteBuffer, String)] = transformDataAndConvertToByteBuffer(fileData)
val currentBatch = buffer ++ byteData
if (currentBatch.size >= numberOfRecordsToSend) {
sendToKinesis(streamName, buffer) map { putRecordRes =>
val putDataList = putRecordRes.getRecords.asScala.toList
logger.info(
s"Sent successfully"
)
}
sendBatch(tail, Seq.empty[(ByteBuffer, String)], 2)
} else {
sendBatch(tail, currentBatch, 2)
}
}
}
sendToKinesis 使用 KCL putRecords。
上面代码的问题是:
从一个文件中读取所有数据。所以如果文件有 5 条记录将发送
5 条记录到 kinesis 但批量大小为 2.
无法从 map 调用尾递归方法。
如果 - 如果 file1 有 3 条记录,则还应注意它应该发送 2
一起记录 RF11、RF12,然后一起记录 RF13、RF21,最后记录
RF22.
我不想在我的代码中使用任何变量。可以用tail rec解决吗?
你有两个子问题
- 如何发送固定大小的批次
@scala.annotation.tailrec
def sendBatch(file: Option[File], buffer: Seq[(ByteBuffer, String)], numbersOfRecrodsToSend: Int): Seq[(ByteBuffer, String)] = {
if (buffer.length < numbersOfRecrodsToSend) {
// case 1: too few records to be sent
file match {
// case 1.1: file was not yet read
case Some(f) => sendBatch(None, buffer ++ getByteData(f), numbersOfRecrodsToSend)
// case 1.2: too few records, file was already read, return leftover records
case None => buffer
}
} else {
// case 2: we can send numbersOfRecrodsToSend to Kinesis
val (toSend, newBuffer) = buffer.splitAt(numbersOfRecrodsToSend)
sendToKinesis(streamName, toSend)
sendBatch(file, newBuffer, numbersOfRecrodsToSend)
}
}
- 如何迭代列表并发送固定大小的批次
// start with empty list of files to send and for each folder
// add it's files to the buffer and send as many records as you can
// the leftover is going to be passed to next iteration for both files and directories
val partial = listOfFolders.foldLeft(Seq.empty[(ByteBuffer, String)]) { case (acc, folder) =>
fetchAllFilesFromFolder(folder).foldLeft(acc) { case (acc2, file) =>
sendBatch(Some(file), acc2, numbersOfRecrodsToSend)
}
}
// if any records have left - send them too
if (partial.nonEmpty) {
sendToKinesis(streamName, partial)
}
希望你明白了。
我正在尝试使用尾递归解决问题。 用例是:
我有文件夹列表,每个文件夹都有文件列表,每个文件都有几条记录。 我想对记录进行一些转换,然后将它们分批写入kinesis。
val listOfFolders = Folder1(File1(RF11, RF12, RF13), File2(RF21,RF22))
我想在 kinesis 中一次写两条记录。 到目前为止我已经尝试过:
listOfFolders.map { folder =>
val files = fetchAllFilesFromFolder(folder)
if (files.nonEmpty) {
sendBatch(files, Seq.empty[(ByteBuffer, String)], 2)
} else {
logger.info(s"No files are present in folder")
}
}
@scala.annotation.tailrec
def sendBatch(
files: Seq[Files],
buffer: Seq[(ByteBuffer, String)],
numberOfRecordsToSend: Int
): Unit =
files match {
case Nil => {
if (buffer.nonEmpty) {
sendToKinesis(streamName, buffer) map { putDataResult =>
val putDataList = putDataResult.getRecords.asScala.toList
logger.info(
s"Successfully Sent"
)
}
} else {
logger.info(s"Successfully sent")
}
}
case head :: tail => {
val fileData = readFileData()
val byteData: Seq[(ByteBuffer, String)] = transformDataAndConvertToByteBuffer(fileData)
val currentBatch = buffer ++ byteData
if (currentBatch.size >= numberOfRecordsToSend) {
sendToKinesis(streamName, buffer) map { putRecordRes =>
val putDataList = putRecordRes.getRecords.asScala.toList
logger.info(
s"Sent successfully"
)
}
sendBatch(tail, Seq.empty[(ByteBuffer, String)], 2)
} else {
sendBatch(tail, currentBatch, 2)
}
}
}
sendToKinesis 使用 KCL putRecords。
上面代码的问题是:
从一个文件中读取所有数据。所以如果文件有 5 条记录将发送 5 条记录到 kinesis 但批量大小为 2.
无法从 map 调用尾递归方法。
如果 - 如果 file1 有 3 条记录,则还应注意它应该发送 2 一起记录 RF11、RF12,然后一起记录 RF13、RF21,最后记录 RF22.
我不想在我的代码中使用任何变量。可以用tail rec解决吗?
你有两个子问题
- 如何发送固定大小的批次
@scala.annotation.tailrec
def sendBatch(file: Option[File], buffer: Seq[(ByteBuffer, String)], numbersOfRecrodsToSend: Int): Seq[(ByteBuffer, String)] = {
if (buffer.length < numbersOfRecrodsToSend) {
// case 1: too few records to be sent
file match {
// case 1.1: file was not yet read
case Some(f) => sendBatch(None, buffer ++ getByteData(f), numbersOfRecrodsToSend)
// case 1.2: too few records, file was already read, return leftover records
case None => buffer
}
} else {
// case 2: we can send numbersOfRecrodsToSend to Kinesis
val (toSend, newBuffer) = buffer.splitAt(numbersOfRecrodsToSend)
sendToKinesis(streamName, toSend)
sendBatch(file, newBuffer, numbersOfRecrodsToSend)
}
}
- 如何迭代列表并发送固定大小的批次
// start with empty list of files to send and for each folder
// add it's files to the buffer and send as many records as you can
// the leftover is going to be passed to next iteration for both files and directories
val partial = listOfFolders.foldLeft(Seq.empty[(ByteBuffer, String)]) { case (acc, folder) =>
fetchAllFilesFromFolder(folder).foldLeft(acc) { case (acc2, file) =>
sendBatch(Some(file), acc2, numbersOfRecrodsToSend)
}
}
// if any records have left - send them too
if (partial.nonEmpty) {
sendToKinesis(streamName, partial)
}
希望你明白了。