如何以线程安全的方式从 BeforeSendRequest 和 AfterReceiveReply return XML 调用方法?

How can I return XML from BeforeSendRequest and AfterReceiveReply to the calling method in a thread-safe way?

我们有一个使用 Azure WebJob SDK 的控制台应用程序。 WebJob 依赖于使用 SOAP 的 WCF 服务,它通过我们编写的 DLL 访问该服务,该 DLL 以更友好的方式包装自动生成的 WCF 类型。

出于日志记录的目的,我们希望为我们发出的请求保存请求和响应 XML 主体。这些 XML 具尸体将保存在我们的数据库中。但是,因为 WCF 代码存在于低级 DLL 中,它对我们的数据库没有概念,无法保存到它。

DLL使用微软的DI扩展来注册类型,WebJob调用是这样的:

class WebJobClass
{
    IWCFWrapperClient _wcfWrapperClient;

    public WebJobClass(IWCFWrapperClient wcfWrapperClient)
    {
        _wcfWrapperClient = wcfWrapperClient;
    }

    public async Task DoThing()
    {
        var callResult = await _wcfWrapperClient.CallWCFService();
    }
}

IWCFWrapperClient 看起来像这样:

class WCFWrapperClient : IWCFWrapperClient
{
    IWCF _wcf;    // auto-generated by VS, stored in Reference.cs

    public async Task<object> CallWCFService()
    {
        return await _wcf.Call();    // another auto-generated method
    }
}

我已经实现了 IClientMessageInspector,它可以很好地为我提供 XML request/response,但我没有办法将它传回给 WCFWrapperClient.CallWCFService 以便它可以 returned 到 WebJobClass.DoThing(),然后谁可以将它保存到数据库。

问题是多线程。 WebJobs、IIRC 将 运行 并行请求多个,从多个线程调用 DLL。这意味着我们不能,比如说,共享一个静态的 属性 LastRequestXmlBody 因为多个线程可能会覆盖它。我们也不能,比如说,给每个调用一个 Guid 或其他东西,因为除了自动生成的内容之外,无法将 IWCFWrapperClient.CallWCFService 中的任何内容传递到自动生成的 IWCF.Call 中。

那么,我怎样才能以线程安全的方式 return XML 到 WebJobClass.DoThing

我找到了一个使用 ConcurrentDictionary<TKey, TValue> 的解决方案,但它有点难看。

首先,我用新的属性Guid InternalCorrelationId修改了Reference.cs中自动生成的类。由于自动生成的 类 是 partial,这可以在重新生成客户端时不会更改的单独文件中完成。

public partial class AutoGeneratedWCFType
{
    private Guid InternalCorrelationIdField;

    [System.Runtime.Serialization.DataMember()]
    public Guid InternalCorrelationId
    {
        get { return InternalCorrelationIdField; }
        set { InternalCorrelationIdField = value; }
    }
}

接下来,我将所有请求 DTO 类型派生自名为 RequestBase 的类型,并且我所有响应 DTO 类型都派生自名为 ResponseBase 的类型,因此我可以对它们进行通用处理:

public abstract class RequestBase
{
    public Guid InternalCorrelationId { get; set; }
}

public abstract class ResponseBase
{
    public string RequestXml { get; set; }
    public string ResponseXml { get; set; }
}

然后我添加了一个 RequestCorrelator 类型,它只保留 ConcurrentDictionary<Guid, XmlRequestResponse>:

public sealed class RequestCorrelator : IRequestCorrelator
{
    public ConcurrentDictionary<Guid, XmlRequestResponse> PendingCalls { get; }

    public RequestCorrelator() => PendingCalls = new ConcurrentDictionary<Guid, XmlRequestResponse>();
}

public sealed class XmlRequestResponse
{
    public string RequestXml { get; set; }
    public string ResponseXml { get; set; }
}

RequestCorrelator 是它自己的用于 DI 目的的类型 - 您可能只能直接使用 ConcurrentDictionary<TKey, TValue>

最后,我们有了实际获取 XML 的代码,一种实现 IClientMessageInspector:

的类型
public sealed class ClientMessageProvider : IClientMessageInspector
{
    private readonly IRequestCorrelator _requestCorrelator;

    public ClientMessageProvider(IRequestCorrelator requestCorrelator) =>
        _requestCorrelator = requestCorrelator;

    public object BeforeSendRequest(ref Message request, IClientChannel channel)
    {
        var requestXml = request.ToString();
        var internalCorrelationId = GetInternalCorrelationId(requestXml);

        if (internalCorrelationId != null)
        {
            if (_requestCorrelator.PendingCalls.TryGetValue(internalCorrelationId.Value,
                out var requestResponse))
            {
                requestResponse.RequestXml = requestXml;
            }

            request = RemoveInternalCorrelationId(request);
        }
        
        return internalCorrelationId;
    }

    public void AfterReceiveReply(ref Message reply, object correlationState)
    {
        // WCF can internally correlate a request between BeforeSendRequest and
        // AfterReceiveReply. We reuse the same correlation ID we added to the
        // XML as our correlation state.
    
        var responseXml = reply.ToString();
        var internalCorrelationId = (correlationState is Guid guid)
            ? guid
            : default;

        if (_requestCorrelator.PendingCalls.TryGetValue(internalCorrelationId,
            out var requestResponse))
        {
            requestResponse.ResponseXml = responseXml;
        }
    }

    private static Guid? GetInternalCorrelationId(string requestXml)
    {
        var document = XDocument.Parse(requestXml);
        var internalCorrelationIdElement = /* You'll have to write this yourself;
        every WCF XML request is different. */

        return internalCorrelationIdElement != null
            ? Guid.Parse(internalCorrelationIdElement.Value)
            : null;
    }
    
    private static Message RemoveInternalCorrelationId(Message oldMessage)
    {
        // 
        var buffer = oldMessage.CreateBufferedCopy(2 * 1024 * 1024);
        var tempMessage = buffer.CreateMessage();
        var dictionaryReader = tempMessage.GetReaderAtBodyContents();
        var document = new XmlDocument();
        
        document.Load(dictionaryReader);
        dictionaryReader.Close();

        var internalCorrelationIdNode = /* You'll also have to write this yourself. */
        
        var parent = internalCorrelationIdNode.ParentNode;
        parent.RemoveChild(internalCorrelationIdNode);

        var memoryStream = new MemoryStream();
        var xmlWriter = XmlWriter.Create(memoryStream);
        document.Save(xmlWriter);
        xmlWriter.Flush();
        xmlWriter.Close();

        memoryStream.Position = 0;
        var xmlReader = XmlReader.Create(memoryStream);

        var newMessage = Message.CreateMessage(oldMessage.Version, null, xmlReader);
        newMessage.Headers.CopyHeadersFrom(oldMessage);
        newMessage.Properties.CopyProperties(oldMessage.Properties);

        return newMessage;
    }
}

简而言之,这种类型:

  1. 在 XML 请求中查找关联 ID。
  2. 找到具有相同关联 ID 的 XmlRequestResponse 并将请求添加到它。
  3. 删除相关 ID 元素,这样服务就不会得到他们不期望的元素。
  4. 收到回复后,使用 correlationState 找到 XmlRequestResponse 并将回复 XML 写入其中。

现在我们要做的就是改变IWCFWrapperClient:

private async Task<TDtoResult> ExecuteCallWithLogging<TDtoRequest,
    TWcfRequest,
    TWcfResponse,
    TDtoResult>(TDtoRequest request,
    Func<TDtoRequest, TWcfRequest> dtoToWcfConverter,
    Func<TWcfRequest, Task<TWcfResponse>> wcfCall,
    Func<TWcfResponse, TDtoResult> wcfToDtoConverter)
    where TDtoRequest : CorrelationBase
    where TDtoResult : WcfBase
{
    request.InternalCorrelationId = Guid.NewGuid();
    var xmlRequestResponse = new XmlRequestResponse();
    _requestCorrelator.PendingCalls.GetOrAdd(request.InternalCorrelationId,
        xmlRequestResponse);

    var response = await contractingCall(dtoToWcfConverter(request));
    _requestCorrelator.PendingCalls.TryRemove(request.InternalCorrelationId, out _);
    return wcfToDtoConverter(response).WithRequestResponse(xmlRequestResponse);
}

public async Task<DoThingResponseDto> DoThing(DoThingRequestDto request) =>
    await ExecuteCallWithLogging(request,
        r => r.ToWcfModel(),
        async d => await _wcf.Call(d),
        d => d.ToDtoModel());

WithRequestResponse实现如下:

public static T WithRequestResponse<T>(this T item, XmlRequestResponse requestResponse)
    where T : ResponseBase
{
    item.RequestXml = requestResponse?.RequestXml;
    item.ResponseXml = requestResponse?.ResponseXml;

    return item;
}

我们开始了。 WCF 在响应对象中调用 return 它们的 XML 而不仅仅是您可以打印到控制台或记录到文件的内容。