运行 每 N 分钟或如果项目与平均值不同

Run every N minutes or if item differs from average

我有一个 actor 接收 WeatherConditions 并将其推送(通过使用 OfferAsync)到 source。目前它被设置为 运行 它收到的每个项目(它将它存储到数据库)。

public class StoreConditionsActor : ReceiveActor
{
    public StoreConditionsActor(ITemperatureDataProvider temperatureDataProvider)
    {
        var materializer = Context.Materializer();
        var source = Source.Queue<WeatherConditions>(10, OverflowStrategy.DropTail);

        var graph = source
            .To(Sink.ForEach<WeatherConditions>(conditions => temperatureDataProvider.Store(conditions)))
            .Run(materializer);

        Receive<WeatherConditions>(i =>
        {
            graph.OfferAsync(i);
        });
    }
}

我想实现的是:

  1. 运行 它每 N 分钟只存储一次,并存储这 N 分钟内收到的所有项目的平均值 WeatherConditions window
  2. 如果收到的物品符合特定条件(即物品温度比上一件物品的温度高 30%)运行 尽管 "hidden" 及时 window。

我一直在尝试 ConflateWithSeedBufferThrottle,但似乎都不起作用(我是 Akka / Akka Streams 的新手,所以我可能遗漏了一些基本的东西)

此答案使用 Akka Streams 和 Scala,但也许它会激发您的 Akka.NET 解决方案。

groupedWithin方法可以满足您的第一个要求:

val queue =
  Source.queue[Int](10, OverflowStrategy.dropTail)
    .groupedWithin(10, 1 second)
    .map(group => group.sum / group.size)
    .toMat(Sink.foreach(println))(Keep.left)
    .run()

Source(1 to 10000)
  .throttle(10, 1 second)
  .mapAsync(1)(queue.offer(_))
  .runWith(Sink.ignore)

在上面的示例中,每秒最多向 SourceQueue 提供 10 个整数,它将传入的元素分组为一秒的包,并计算每个包的相应平均值。

至于您的第二个要求,您可以使用 sliding 将一个元素与前一个元素进行比较。以下示例仅在元素比前一个元素至少大 30% 时才向下游传递该元素:

val source: Source[Int, _] = ???

source
  .sliding(2, 1)
  .collect {
    case Seq(a, b) if b >= 1.3 * a => b
  }
  .runForeach(println)