具有共享资源的观察者模式中的 C# 竞争条件预防

C# Race-condition prevention in observer pattern with shared resource

我有一个线程 WebListener,它使用最新的传入消息更新 public 变量。我已经用观察者模式实现了 class,所以我可以在收到新消息时通知订阅者。每个订阅者然后可以决定是否从 public 变量中检索消息。

我担心以下情况:

  1. WebListner 收到消息 'a'
  2. WebListener 写入消息'a'
  3. WebListener 通知订阅者已收到消息'a'
  4. 订阅者收到消息通知'a'
  5. WebListener 收到另一条消息'b'
  6. WebListener 用 'b'
  7. 覆盖消息 'a'
  8. WebListener 通知订阅者收到消息'b'
  9. 订阅者阅读了消息 'a' 但实际上收到了 'b' 注意 - 这 100% 没问题 - 我们想收到最新的消息 (b)
  10. 订阅者阅读消息 'b' 并再次获得 'b'。 注意 - 这不应该 return b - 它应该 return 没有或抛出异常

**编辑:订阅者在收到通知后每次召回消息时都应该收到一条新消息。这就是我试图使用随通知传递的令牌来解决的问题。如果先前通知的消息已被覆盖,则结果应该是消息未被检索到。在任何阶段我都不想检索当前消息以外的消息。 **

以最有效的方式防止这种情况的最佳方法是什么?

我想生成一个唯一的令牌并将其与通知一起发送给订阅者以进行比较令牌 -> 读取值类型的操作,但这会导致 WebListener 覆盖比较之间的值的竞争条件问题并阅读操作。

using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System;
using System.Net;
using System.Threading;

// Observer design pattern allows objects created from classes that inherit from 
// IObserver to be notified when an event is triggered when subscribed. 
// In this case, the event is when a HTTP message is received on the listenport endpoint.

public interface IObserver
{
    //Receive update from subject
    void Update(ITopic topic);
}

public interface ITopic
{
    //Subscribe an observer to the topic
    void Subscribe(IObserver observer);

    //Unsubscribe a subscriber from the topic
    void Unsubscribe(IObserver observer);

    //Notify all subscribers of an event
    void Notify();
}

// WebListener listens for incoming http messages on the listenport endpoint and notifies subscribers.
// Subscribers can access the message through the public method GetMessage.
public class WebListener : MonoBehaviour, ITopic
{
    string listenport = "http://192.168.1.3:8080";

    private HttpListener listener;
    private Thread listenerThread;

    HttpListenerRequest message;

    void Start()
    {
        listener = new HttpListener();
        listener.Prefixes.Add(listenport);
        listener.Start();

        listenerThread = new Thread(startListener);
        listenerThread.Start();

        Debug.Log("WebListener: Listener started");
    }

    private void startListener()
    {
        while (true)
        {
            var result = listener.BeginGetContext(ListenerCallback, listener);
            result.AsyncWaitHandle.WaitOne();
        }
    }

    private void ListenerCallback(IAsyncResult result)
    {
        var context = listener.EndGetContext(result);
        ReceiveMessage(context);
        context.Response.Close();
    }

    private void ReceiveMessage(HttpListenerContext context)
    {
        message = context.Request;
    }

    // SUBSCRIBERS WILL CALL THIS TO RETRIEVE MESSAGE
    public HttpListenerRequest GetMessage()
    {
        return message;
    }

    private List<IObserver> _observers = new List<IObserver>();

    // Implement ITopic interface methods
    public void Subscribe(IObserver observer)
    {
        this._observers.Add(observer);
        Debug.Log("WebListener: Added Subscriber");
    }

    public void Unsubscribe(IObserver observer)
    {
        this._observers.Remove(observer);
        Debug.Log("WebListener: Removed Subscriber");
    }

    public void Notify()
    {
        //WAS GOING TO GENERATE TOKEN HERE AND PASS IN THE UPDATE FUNCTION BUT THIS WILL CREATE RACE-CONDITION.
        Debug.Log("WebListener: Notifying subscribers of incoming message");
        foreach (var observer in _observers)
        {
            observer.Update(this);

        }
    }
}

您无法真正更新全局状态并期望侦听器神奇地获得一致的历史值而不以某种方式存储它们。

选项:

  • 将听众的期望更改为 "something changed in global state, need to check"
  • 在事件中向监听器发送更改记录
  • 每个侦听器都有更改队列
  • 使状态不可变并将当前状态发送给事件中的侦听器
  • 虽然它可能对您的特定情况没有用,但在允许下一个 "receive" 之前同步接收和通知调用以等待所有 "notify" 处理程序完成是一个选项(由 [=10= 建议])