如何以线程安全的方式从 BeforeSendRequest 和 AfterReceiveReply return XML 调用方法?
How can I return XML from BeforeSendRequest and AfterReceiveReply to the calling method in a thread-safe way?
我们有一个使用 Azure WebJob SDK 的控制台应用程序。 WebJob 依赖于使用 SOAP 的 WCF 服务,它通过我们编写的 DLL 访问该服务,该 DLL 以更友好的方式包装自动生成的 WCF 类型。
出于日志记录的目的,我们希望为我们发出的请求保存请求和响应 XML 主体。这些 XML 具尸体将保存在我们的数据库中。但是,因为 WCF 代码存在于低级 DLL 中,它对我们的数据库没有概念,无法保存到它。
DLL使用微软的DI扩展来注册类型,WebJob调用是这样的:
class WebJobClass
{
IWCFWrapperClient _wcfWrapperClient;
public WebJobClass(IWCFWrapperClient wcfWrapperClient)
{
_wcfWrapperClient = wcfWrapperClient;
}
public async Task DoThing()
{
var callResult = await _wcfWrapperClient.CallWCFService();
}
}
IWCFWrapperClient
看起来像这样:
class WCFWrapperClient : IWCFWrapperClient
{
IWCF _wcf; // auto-generated by VS, stored in Reference.cs
public async Task<object> CallWCFService()
{
return await _wcf.Call(); // another auto-generated method
}
}
我已经实现了 IClientMessageInspector
,它可以很好地为我提供 XML request/response,但我没有办法将它传回给 WCFWrapperClient.CallWCFService
以便它可以 returned 到 WebJobClass.DoThing()
,然后谁可以将它保存到数据库。
问题是多线程。 WebJobs、IIRC 将 运行 并行请求多个,从多个线程调用 DLL。这意味着我们不能,比如说,共享一个静态的 属性 LastRequestXmlBody
因为多个线程可能会覆盖它。我们也不能,比如说,给每个调用一个 Guid 或其他东西,因为除了自动生成的内容之外,无法将 IWCFWrapperClient.CallWCFService
中的任何内容传递到自动生成的 IWCF.Call
中。
那么,我怎样才能以线程安全的方式 return XML 到 WebJobClass.DoThing
?
我找到了一个使用 ConcurrentDictionary<TKey, TValue>
的解决方案,但它有点难看。
首先,我用新的属性Guid InternalCorrelationId
修改了Reference.cs
中自动生成的类。由于自动生成的 类 是 partial
,这可以在重新生成客户端时不会更改的单独文件中完成。
public partial class AutoGeneratedWCFType
{
private Guid InternalCorrelationIdField;
[System.Runtime.Serialization.DataMember()]
public Guid InternalCorrelationId
{
get { return InternalCorrelationIdField; }
set { InternalCorrelationIdField = value; }
}
}
接下来,我将所有请求 DTO 类型派生自名为 RequestBase
的类型,并且我所有响应 DTO 类型都派生自名为 ResponseBase
的类型,因此我可以对它们进行通用处理:
public abstract class RequestBase
{
public Guid InternalCorrelationId { get; set; }
}
public abstract class ResponseBase
{
public string RequestXml { get; set; }
public string ResponseXml { get; set; }
}
然后我添加了一个 RequestCorrelator
类型,它只保留 ConcurrentDictionary<Guid, XmlRequestResponse>
:
public sealed class RequestCorrelator : IRequestCorrelator
{
public ConcurrentDictionary<Guid, XmlRequestResponse> PendingCalls { get; }
public RequestCorrelator() => PendingCalls = new ConcurrentDictionary<Guid, XmlRequestResponse>();
}
public sealed class XmlRequestResponse
{
public string RequestXml { get; set; }
public string ResponseXml { get; set; }
}
RequestCorrelator
是它自己的用于 DI 目的的类型 - 您可能只能直接使用 ConcurrentDictionary<TKey, TValue>
。
最后,我们有了实际获取 XML 的代码,一种实现 IClientMessageInspector
:
的类型
public sealed class ClientMessageProvider : IClientMessageInspector
{
private readonly IRequestCorrelator _requestCorrelator;
public ClientMessageProvider(IRequestCorrelator requestCorrelator) =>
_requestCorrelator = requestCorrelator;
public object BeforeSendRequest(ref Message request, IClientChannel channel)
{
var requestXml = request.ToString();
var internalCorrelationId = GetInternalCorrelationId(requestXml);
if (internalCorrelationId != null)
{
if (_requestCorrelator.PendingCalls.TryGetValue(internalCorrelationId.Value,
out var requestResponse))
{
requestResponse.RequestXml = requestXml;
}
request = RemoveInternalCorrelationId(request);
}
return internalCorrelationId;
}
public void AfterReceiveReply(ref Message reply, object correlationState)
{
// WCF can internally correlate a request between BeforeSendRequest and
// AfterReceiveReply. We reuse the same correlation ID we added to the
// XML as our correlation state.
var responseXml = reply.ToString();
var internalCorrelationId = (correlationState is Guid guid)
? guid
: default;
if (_requestCorrelator.PendingCalls.TryGetValue(internalCorrelationId,
out var requestResponse))
{
requestResponse.ResponseXml = responseXml;
}
}
private static Guid? GetInternalCorrelationId(string requestXml)
{
var document = XDocument.Parse(requestXml);
var internalCorrelationIdElement = /* You'll have to write this yourself;
every WCF XML request is different. */
return internalCorrelationIdElement != null
? Guid.Parse(internalCorrelationIdElement.Value)
: null;
}
private static Message RemoveInternalCorrelationId(Message oldMessage)
{
//
var buffer = oldMessage.CreateBufferedCopy(2 * 1024 * 1024);
var tempMessage = buffer.CreateMessage();
var dictionaryReader = tempMessage.GetReaderAtBodyContents();
var document = new XmlDocument();
document.Load(dictionaryReader);
dictionaryReader.Close();
var internalCorrelationIdNode = /* You'll also have to write this yourself. */
var parent = internalCorrelationIdNode.ParentNode;
parent.RemoveChild(internalCorrelationIdNode);
var memoryStream = new MemoryStream();
var xmlWriter = XmlWriter.Create(memoryStream);
document.Save(xmlWriter);
xmlWriter.Flush();
xmlWriter.Close();
memoryStream.Position = 0;
var xmlReader = XmlReader.Create(memoryStream);
var newMessage = Message.CreateMessage(oldMessage.Version, null, xmlReader);
newMessage.Headers.CopyHeadersFrom(oldMessage);
newMessage.Properties.CopyProperties(oldMessage.Properties);
return newMessage;
}
}
简而言之,这种类型:
- 在 XML 请求中查找关联 ID。
- 找到具有相同关联 ID 的
XmlRequestResponse
并将请求添加到它。
- 删除相关 ID 元素,这样服务就不会得到他们不期望的元素。
- 收到回复后,使用
correlationState
找到 XmlRequestResponse
并将回复 XML 写入其中。
现在我们要做的就是改变IWCFWrapperClient
:
private async Task<TDtoResult> ExecuteCallWithLogging<TDtoRequest,
TWcfRequest,
TWcfResponse,
TDtoResult>(TDtoRequest request,
Func<TDtoRequest, TWcfRequest> dtoToWcfConverter,
Func<TWcfRequest, Task<TWcfResponse>> wcfCall,
Func<TWcfResponse, TDtoResult> wcfToDtoConverter)
where TDtoRequest : CorrelationBase
where TDtoResult : WcfBase
{
request.InternalCorrelationId = Guid.NewGuid();
var xmlRequestResponse = new XmlRequestResponse();
_requestCorrelator.PendingCalls.GetOrAdd(request.InternalCorrelationId,
xmlRequestResponse);
var response = await contractingCall(dtoToWcfConverter(request));
_requestCorrelator.PendingCalls.TryRemove(request.InternalCorrelationId, out _);
return wcfToDtoConverter(response).WithRequestResponse(xmlRequestResponse);
}
public async Task<DoThingResponseDto> DoThing(DoThingRequestDto request) =>
await ExecuteCallWithLogging(request,
r => r.ToWcfModel(),
async d => await _wcf.Call(d),
d => d.ToDtoModel());
WithRequestResponse
实现如下:
public static T WithRequestResponse<T>(this T item, XmlRequestResponse requestResponse)
where T : ResponseBase
{
item.RequestXml = requestResponse?.RequestXml;
item.ResponseXml = requestResponse?.ResponseXml;
return item;
}
我们开始了。 WCF 在响应对象中调用 return 它们的 XML 而不仅仅是您可以打印到控制台或记录到文件的内容。
我们有一个使用 Azure WebJob SDK 的控制台应用程序。 WebJob 依赖于使用 SOAP 的 WCF 服务,它通过我们编写的 DLL 访问该服务,该 DLL 以更友好的方式包装自动生成的 WCF 类型。
出于日志记录的目的,我们希望为我们发出的请求保存请求和响应 XML 主体。这些 XML 具尸体将保存在我们的数据库中。但是,因为 WCF 代码存在于低级 DLL 中,它对我们的数据库没有概念,无法保存到它。
DLL使用微软的DI扩展来注册类型,WebJob调用是这样的:
class WebJobClass
{
IWCFWrapperClient _wcfWrapperClient;
public WebJobClass(IWCFWrapperClient wcfWrapperClient)
{
_wcfWrapperClient = wcfWrapperClient;
}
public async Task DoThing()
{
var callResult = await _wcfWrapperClient.CallWCFService();
}
}
IWCFWrapperClient
看起来像这样:
class WCFWrapperClient : IWCFWrapperClient
{
IWCF _wcf; // auto-generated by VS, stored in Reference.cs
public async Task<object> CallWCFService()
{
return await _wcf.Call(); // another auto-generated method
}
}
我已经实现了 IClientMessageInspector
,它可以很好地为我提供 XML request/response,但我没有办法将它传回给 WCFWrapperClient.CallWCFService
以便它可以 returned 到 WebJobClass.DoThing()
,然后谁可以将它保存到数据库。
问题是多线程。 WebJobs、IIRC 将 运行 并行请求多个,从多个线程调用 DLL。这意味着我们不能,比如说,共享一个静态的 属性 LastRequestXmlBody
因为多个线程可能会覆盖它。我们也不能,比如说,给每个调用一个 Guid 或其他东西,因为除了自动生成的内容之外,无法将 IWCFWrapperClient.CallWCFService
中的任何内容传递到自动生成的 IWCF.Call
中。
那么,我怎样才能以线程安全的方式 return XML 到 WebJobClass.DoThing
?
我找到了一个使用 ConcurrentDictionary<TKey, TValue>
的解决方案,但它有点难看。
首先,我用新的属性Guid InternalCorrelationId
修改了Reference.cs
中自动生成的类。由于自动生成的 类 是 partial
,这可以在重新生成客户端时不会更改的单独文件中完成。
public partial class AutoGeneratedWCFType
{
private Guid InternalCorrelationIdField;
[System.Runtime.Serialization.DataMember()]
public Guid InternalCorrelationId
{
get { return InternalCorrelationIdField; }
set { InternalCorrelationIdField = value; }
}
}
接下来,我将所有请求 DTO 类型派生自名为 RequestBase
的类型,并且我所有响应 DTO 类型都派生自名为 ResponseBase
的类型,因此我可以对它们进行通用处理:
public abstract class RequestBase
{
public Guid InternalCorrelationId { get; set; }
}
public abstract class ResponseBase
{
public string RequestXml { get; set; }
public string ResponseXml { get; set; }
}
然后我添加了一个 RequestCorrelator
类型,它只保留 ConcurrentDictionary<Guid, XmlRequestResponse>
:
public sealed class RequestCorrelator : IRequestCorrelator
{
public ConcurrentDictionary<Guid, XmlRequestResponse> PendingCalls { get; }
public RequestCorrelator() => PendingCalls = new ConcurrentDictionary<Guid, XmlRequestResponse>();
}
public sealed class XmlRequestResponse
{
public string RequestXml { get; set; }
public string ResponseXml { get; set; }
}
RequestCorrelator
是它自己的用于 DI 目的的类型 - 您可能只能直接使用 ConcurrentDictionary<TKey, TValue>
。
最后,我们有了实际获取 XML 的代码,一种实现 IClientMessageInspector
:
public sealed class ClientMessageProvider : IClientMessageInspector
{
private readonly IRequestCorrelator _requestCorrelator;
public ClientMessageProvider(IRequestCorrelator requestCorrelator) =>
_requestCorrelator = requestCorrelator;
public object BeforeSendRequest(ref Message request, IClientChannel channel)
{
var requestXml = request.ToString();
var internalCorrelationId = GetInternalCorrelationId(requestXml);
if (internalCorrelationId != null)
{
if (_requestCorrelator.PendingCalls.TryGetValue(internalCorrelationId.Value,
out var requestResponse))
{
requestResponse.RequestXml = requestXml;
}
request = RemoveInternalCorrelationId(request);
}
return internalCorrelationId;
}
public void AfterReceiveReply(ref Message reply, object correlationState)
{
// WCF can internally correlate a request between BeforeSendRequest and
// AfterReceiveReply. We reuse the same correlation ID we added to the
// XML as our correlation state.
var responseXml = reply.ToString();
var internalCorrelationId = (correlationState is Guid guid)
? guid
: default;
if (_requestCorrelator.PendingCalls.TryGetValue(internalCorrelationId,
out var requestResponse))
{
requestResponse.ResponseXml = responseXml;
}
}
private static Guid? GetInternalCorrelationId(string requestXml)
{
var document = XDocument.Parse(requestXml);
var internalCorrelationIdElement = /* You'll have to write this yourself;
every WCF XML request is different. */
return internalCorrelationIdElement != null
? Guid.Parse(internalCorrelationIdElement.Value)
: null;
}
private static Message RemoveInternalCorrelationId(Message oldMessage)
{
//
var buffer = oldMessage.CreateBufferedCopy(2 * 1024 * 1024);
var tempMessage = buffer.CreateMessage();
var dictionaryReader = tempMessage.GetReaderAtBodyContents();
var document = new XmlDocument();
document.Load(dictionaryReader);
dictionaryReader.Close();
var internalCorrelationIdNode = /* You'll also have to write this yourself. */
var parent = internalCorrelationIdNode.ParentNode;
parent.RemoveChild(internalCorrelationIdNode);
var memoryStream = new MemoryStream();
var xmlWriter = XmlWriter.Create(memoryStream);
document.Save(xmlWriter);
xmlWriter.Flush();
xmlWriter.Close();
memoryStream.Position = 0;
var xmlReader = XmlReader.Create(memoryStream);
var newMessage = Message.CreateMessage(oldMessage.Version, null, xmlReader);
newMessage.Headers.CopyHeadersFrom(oldMessage);
newMessage.Properties.CopyProperties(oldMessage.Properties);
return newMessage;
}
}
简而言之,这种类型:
- 在 XML 请求中查找关联 ID。
- 找到具有相同关联 ID 的
XmlRequestResponse
并将请求添加到它。 - 删除相关 ID 元素,这样服务就不会得到他们不期望的元素。
- 收到回复后,使用
correlationState
找到XmlRequestResponse
并将回复 XML 写入其中。
现在我们要做的就是改变IWCFWrapperClient
:
private async Task<TDtoResult> ExecuteCallWithLogging<TDtoRequest,
TWcfRequest,
TWcfResponse,
TDtoResult>(TDtoRequest request,
Func<TDtoRequest, TWcfRequest> dtoToWcfConverter,
Func<TWcfRequest, Task<TWcfResponse>> wcfCall,
Func<TWcfResponse, TDtoResult> wcfToDtoConverter)
where TDtoRequest : CorrelationBase
where TDtoResult : WcfBase
{
request.InternalCorrelationId = Guid.NewGuid();
var xmlRequestResponse = new XmlRequestResponse();
_requestCorrelator.PendingCalls.GetOrAdd(request.InternalCorrelationId,
xmlRequestResponse);
var response = await contractingCall(dtoToWcfConverter(request));
_requestCorrelator.PendingCalls.TryRemove(request.InternalCorrelationId, out _);
return wcfToDtoConverter(response).WithRequestResponse(xmlRequestResponse);
}
public async Task<DoThingResponseDto> DoThing(DoThingRequestDto request) =>
await ExecuteCallWithLogging(request,
r => r.ToWcfModel(),
async d => await _wcf.Call(d),
d => d.ToDtoModel());
WithRequestResponse
实现如下:
public static T WithRequestResponse<T>(this T item, XmlRequestResponse requestResponse)
where T : ResponseBase
{
item.RequestXml = requestResponse?.RequestXml;
item.ResponseXml = requestResponse?.ResponseXml;
return item;
}
我们开始了。 WCF 在响应对象中调用 return 它们的 XML 而不仅仅是您可以打印到控制台或记录到文件的内容。