Rx Twitter Stream 在输入第三个搜索主题时停止

Rx Twitter Stream stops on entering the third search topic

我试图从这个视频中重建 Jonathan Worthington 的 Twitter 示例: https://www.youtube.com/watch?v=_VdIQTtRkb8

它开始工作正常。但过了一会儿,Stream 停止了。在输入第三个搜索主题时,应用程序不再收到任何推文。我不知道为什么。它似乎不依赖于时间。因为如果我让它 运行 而不改变搜索主题,它就会继续。有人可以帮忙吗?

这里是主要代码 window:

public ObservableCollection<string> PositiveTweets = new ObservableCollection<string>();
public ObservableCollection<string> NegativeTweets = new ObservableCollection<string>();

public MainWindow()
{
    InitializeComponent();
    FocusManager.SetFocusedElement(this, SearchTextBox);

    PositiveListBox.ItemsSource = PositiveTweets;
    NegativeListBox.ItemsSource = NegativeTweets;

    var keywords = Keywords.Create(new List<string> {"cool"}, new List<string> {"sucks"});
    var sa = new SentimentAnalysis(keywords);

    var topics = Observable
        .FromEventPattern<TextChangedEventArgs>(SearchTextBox, "TextChanged")
        .Select(e => ((TextBox) e.Sender).Text)
        .Throttle(TimeSpan.FromSeconds(1));

    var tweets = topics
        .Select(Twitter.AllTweetsAbout)
        .Switch()
        .Select(sa.Score)
        .Publish();

    tweets.Connect();

    var format = new Func<string, ScoredTweet, string>((topic, st) => String.Format("[user: @{0} | topic: {1} | score: {3}]\r\n{2}\r\n", st.Tweet.User, topic, st.Tweet.Text, st.Score));

    var addToList = new Action<string, ObservableCollection<string>>((item, list) =>
    {
        if (list.Count == 4)
            list.RemoveAt(3);
        list.Insert(0, item);
    });

    tweets
        .Where(x => x.Score >= 0)
        .Sample(TimeSpan.FromSeconds(1))
        .ObserveOnDispatcher()
        .Subscribe(x => addToList(format(x.Tweet.Topic, x), PositiveTweets));

    tweets
        .Where(x => x.Score < 0)
        .Sample(TimeSpan.FromSeconds(1))
        .ObserveOnDispatcher()
        .Subscribe(x => addToList(format(x.Tweet.Topic, x), NegativeTweets));
}

这是 XAML 代码:

<StackPanel Margin="10">
    <DockPanel>
        <Label DockPanel.Dock="Left" Content="Search:" Margin="0,0,10,0" FontSize="20"/>
        <TextBox Name="SearchTextBox" FontSize="20" Focusable="True"/>
    </DockPanel>
    <Label Content="positive" FontSize="20"/>
    <ListBox Name="PositiveListBox" Height="250" FontSize="16"/>
    <Label Content="negative" FontSize="20"/>
    <ListBox Name="NegativeListBox" Height="250" FontSize="16"/>
</StackPanel>

IObservable 是这样创建的:

readonly static SingleUserAuthorizer Auth = new SingleUserAuthorizer
{
    CredentialStore = new InMemoryCredentialStore
    {
        ConsumerKey = ConfigurationManager.AppSettings["consumerKey"],
        ConsumerSecret = ConfigurationManager.AppSettings["consumerSecret"],
        OAuthToken = ConfigurationManager.AppSettings["authtoken"],
        OAuthTokenSecret = ConfigurationManager.AppSettings["authtokensecret"],
    }
};

public static IObservable<Tweet> AllTweetsAbout(string topic)
{
    return Observable.Create<Tweet>(o =>
    {
        var twitterCtx = new TwitterContext(Auth);

        var query = from s in twitterCtx.Streaming
            where s.Type == StreamingType.Filter &&
                    s.Track == topic
            select s;

        var disposed = false;

        query.StartAsync(s =>
        {
            if(disposed)
                s.CloseStream();
            else
                o.OnNext(Tweet.Parse(s.Content, topic));

            return Task.FromResult(true);
        });

        return Disposable.Create(() => disposed = true);
    });
}

最后是情感分析:

public class ScoredTweet
{
    public Tweet Tweet { get; set; }
    public int Score { get; set; }
}

public class SentimentAnalysis
{
    private readonly Keywords _keywords;

    public SentimentAnalysis(Keywords keywords)
    {
        _keywords = keywords;
    }

    public ScoredTweet Score(Tweet tweet)
    {
        return new ScoredTweet
        {
            Tweet = tweet,
            Score = _keywords.Positive.Count(x => tweet.Text.Contains(x)) - _keywords.Negative.Count(x => tweet.Text.Contains(x))
        };
    }
}

public class Keywords
{
    public List<string> Positive { get; private set; }
    public List<string> Negative { get; private set; }

    public static Keywords Create(List<string> positive, List<string> negative)
    {
       return new Keywords
        {
            Positive = positive,
            Negative = negative
        }; 
    }
}

我找到了问题的解决方案。 推特 class 必须不是静态的,推特上下文应该只在创建推特 class.

时创建

在我发布的代码中,AllTweetsAbout 方法是静态的,每次调用该方法时都会创建 Twitter 上下文。这行不通,可能是因为在给定时间或类似情况下发生许多登录操作时,它会被 Twitter Api 以某种方式阻止。

我最近为 LINQ to Twitter 添加了 Rx 支持。也许这会对你有所帮助:

    static async Task DoRxObservableStreamAsync(TwitterContext twitterCtx)
    {
        Console.WriteLine("\nStreamed Content: \n");
        int count = 0;
        var cancelTokenSrc = new CancellationTokenSource();

        try
        {
            var observable =
                await
                    (from strm in twitterCtx.Streaming
                                            .WithCancellation(cancelTokenSrc.Token)
                     where strm.Type == StreamingType.Filter &&
                           strm.Track == "twitter"
                     select strm)
                    .ToObservableAsync();

            observable.Subscribe(
                strm =>
                {
                    HandleStreamResponse(strm);

                    if (count++ >= 5)
                        cancelTokenSrc.Cancel();
                },
                ex => Console.WriteLine(ex.ToString()),
                () => Console.WriteLine("Completed"));
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Stream cancelled.");
        }
    }

您也可以 download the source code 此演示。