运行 每 N 分钟或如果项目与平均值不同
Run every N minutes or if item differs from average
我有一个 actor 接收 WeatherConditions
并将其推送(通过使用 OfferAsync
)到 source
。目前它被设置为 运行 它收到的每个项目(它将它存储到数据库)。
public class StoreConditionsActor : ReceiveActor
{
public StoreConditionsActor(ITemperatureDataProvider temperatureDataProvider)
{
var materializer = Context.Materializer();
var source = Source.Queue<WeatherConditions>(10, OverflowStrategy.DropTail);
var graph = source
.To(Sink.ForEach<WeatherConditions>(conditions => temperatureDataProvider.Store(conditions)))
.Run(materializer);
Receive<WeatherConditions>(i =>
{
graph.OfferAsync(i);
});
}
}
我想实现的是:
- 运行 它每 N 分钟只存储一次,并存储这 N 分钟内收到的所有项目的平均值
WeatherConditions
window
- 如果收到的物品符合特定条件(即物品温度比上一件物品的温度高 30%)运行 尽管 "hidden" 及时 window。
我一直在尝试 ConflateWithSeed
、Buffer
、Throttle
,但似乎都不起作用(我是 Akka / Akka Streams 的新手,所以我可能遗漏了一些基本的东西)
此答案使用 Akka Streams 和 Scala,但也许它会激发您的 Akka.NET 解决方案。
groupedWithin
方法可以满足您的第一个要求:
val queue =
Source.queue[Int](10, OverflowStrategy.dropTail)
.groupedWithin(10, 1 second)
.map(group => group.sum / group.size)
.toMat(Sink.foreach(println))(Keep.left)
.run()
Source(1 to 10000)
.throttle(10, 1 second)
.mapAsync(1)(queue.offer(_))
.runWith(Sink.ignore)
在上面的示例中,每秒最多向 SourceQueue
提供 10 个整数,它将传入的元素分组为一秒的包,并计算每个包的相应平均值。
至于您的第二个要求,您可以使用 sliding
将一个元素与前一个元素进行比较。以下示例仅在元素比前一个元素至少大 30% 时才向下游传递该元素:
val source: Source[Int, _] = ???
source
.sliding(2, 1)
.collect {
case Seq(a, b) if b >= 1.3 * a => b
}
.runForeach(println)
我有一个 actor 接收 WeatherConditions
并将其推送(通过使用 OfferAsync
)到 source
。目前它被设置为 运行 它收到的每个项目(它将它存储到数据库)。
public class StoreConditionsActor : ReceiveActor
{
public StoreConditionsActor(ITemperatureDataProvider temperatureDataProvider)
{
var materializer = Context.Materializer();
var source = Source.Queue<WeatherConditions>(10, OverflowStrategy.DropTail);
var graph = source
.To(Sink.ForEach<WeatherConditions>(conditions => temperatureDataProvider.Store(conditions)))
.Run(materializer);
Receive<WeatherConditions>(i =>
{
graph.OfferAsync(i);
});
}
}
我想实现的是:
- 运行 它每 N 分钟只存储一次,并存储这 N 分钟内收到的所有项目的平均值
WeatherConditions
window - 如果收到的物品符合特定条件(即物品温度比上一件物品的温度高 30%)运行 尽管 "hidden" 及时 window。
我一直在尝试 ConflateWithSeed
、Buffer
、Throttle
,但似乎都不起作用(我是 Akka / Akka Streams 的新手,所以我可能遗漏了一些基本的东西)
此答案使用 Akka Streams 和 Scala,但也许它会激发您的 Akka.NET 解决方案。
groupedWithin
方法可以满足您的第一个要求:
val queue =
Source.queue[Int](10, OverflowStrategy.dropTail)
.groupedWithin(10, 1 second)
.map(group => group.sum / group.size)
.toMat(Sink.foreach(println))(Keep.left)
.run()
Source(1 to 10000)
.throttle(10, 1 second)
.mapAsync(1)(queue.offer(_))
.runWith(Sink.ignore)
在上面的示例中,每秒最多向 SourceQueue
提供 10 个整数,它将传入的元素分组为一秒的包,并计算每个包的相应平均值。
至于您的第二个要求,您可以使用 sliding
将一个元素与前一个元素进行比较。以下示例仅在元素比前一个元素至少大 30% 时才向下游传递该元素:
val source: Source[Int, _] = ???
source
.sliding(2, 1)
.collect {
case Seq(a, b) if b >= 1.3 * a => b
}
.runForeach(println)