如何在 F# 中使用 Akka.Streams.*.ConcatMany?

How to use Akka.Streams.*.ConcatMany in F#?

我想创建一个流,从传入的元素中创建一个新的源(它将是一个持久性查询),然后将结果展平。像这个简化的例子:

var z = Source.Single(1).ConcatMany(i => Source.Single(i));

此代码按预期编译和工作。我的问题是当我将它翻译成 F#:

let z = Source.Single(1).ConcatMany(fun i -> Source.Single(i))

我收到一条错误消息

This expression was expected to have type
    'IGraph<SourceShape<'a>,Akka.NotUsed>'    
but here has type
    'Source<int,Akka.NotUsed>'    

我认为这是因为 F# 处理 co/contravariance 的方式与 C# 不同,不能简单地转换这些通用特化 (https://github.com/fsharp/fslang-suggestions/issues/162),但我想不出进行转换的方法在 intSourceShape<int> 之间。是否可以将此示例转换为 F#?

我发现的一种解决方法是使用 Akkling.Streams 包装器库:

open Akkling.Streams

let x =
    Source.singleton 1
    |> Source.collectMap(fun x -> Source.singleton x)

如何在没有 Akkling 的情况下执行此操作的问题仍然悬而未决。

查看 the code on GitHub,似乎 Source<TOut, TMat>IGraph 的直接实现,因此您应该能够转换它:

public sealed class Source<TOut, TMat> : IFlow<TOut, TMat>, IGraph<SourceShape<TOut>, TMat>

let z = Source.Single(1).ConcatMany(fun i -> Source.Single(i) :> IGraph<SourceShape<int>,Akka.NotUsed>)

我认为 C# 和 F# 用法的最大区别在于 C# 会自动为您进行向上转换。