Rx Twitter Stream 在输入第三个搜索主题时停止
Rx Twitter Stream stops on entering the third search topic
我试图从这个视频中重建 Jonathan Worthington 的 Twitter 示例:
https://www.youtube.com/watch?v=_VdIQTtRkb8
它开始工作正常。但过了一会儿,Stream 停止了。在输入第三个搜索主题时,应用程序不再收到任何推文。我不知道为什么。它似乎不依赖于时间。因为如果我让它 运行 而不改变搜索主题,它就会继续。有人可以帮忙吗?
这里是主要代码 window:
public ObservableCollection<string> PositiveTweets = new ObservableCollection<string>();
public ObservableCollection<string> NegativeTweets = new ObservableCollection<string>();
public MainWindow()
{
InitializeComponent();
FocusManager.SetFocusedElement(this, SearchTextBox);
PositiveListBox.ItemsSource = PositiveTweets;
NegativeListBox.ItemsSource = NegativeTweets;
var keywords = Keywords.Create(new List<string> {"cool"}, new List<string> {"sucks"});
var sa = new SentimentAnalysis(keywords);
var topics = Observable
.FromEventPattern<TextChangedEventArgs>(SearchTextBox, "TextChanged")
.Select(e => ((TextBox) e.Sender).Text)
.Throttle(TimeSpan.FromSeconds(1));
var tweets = topics
.Select(Twitter.AllTweetsAbout)
.Switch()
.Select(sa.Score)
.Publish();
tweets.Connect();
var format = new Func<string, ScoredTweet, string>((topic, st) => String.Format("[user: @{0} | topic: {1} | score: {3}]\r\n{2}\r\n", st.Tweet.User, topic, st.Tweet.Text, st.Score));
var addToList = new Action<string, ObservableCollection<string>>((item, list) =>
{
if (list.Count == 4)
list.RemoveAt(3);
list.Insert(0, item);
});
tweets
.Where(x => x.Score >= 0)
.Sample(TimeSpan.FromSeconds(1))
.ObserveOnDispatcher()
.Subscribe(x => addToList(format(x.Tweet.Topic, x), PositiveTweets));
tweets
.Where(x => x.Score < 0)
.Sample(TimeSpan.FromSeconds(1))
.ObserveOnDispatcher()
.Subscribe(x => addToList(format(x.Tweet.Topic, x), NegativeTweets));
}
这是 XAML 代码:
<StackPanel Margin="10">
<DockPanel>
<Label DockPanel.Dock="Left" Content="Search:" Margin="0,0,10,0" FontSize="20"/>
<TextBox Name="SearchTextBox" FontSize="20" Focusable="True"/>
</DockPanel>
<Label Content="positive" FontSize="20"/>
<ListBox Name="PositiveListBox" Height="250" FontSize="16"/>
<Label Content="negative" FontSize="20"/>
<ListBox Name="NegativeListBox" Height="250" FontSize="16"/>
</StackPanel>
IObservable 是这样创建的:
readonly static SingleUserAuthorizer Auth = new SingleUserAuthorizer
{
CredentialStore = new InMemoryCredentialStore
{
ConsumerKey = ConfigurationManager.AppSettings["consumerKey"],
ConsumerSecret = ConfigurationManager.AppSettings["consumerSecret"],
OAuthToken = ConfigurationManager.AppSettings["authtoken"],
OAuthTokenSecret = ConfigurationManager.AppSettings["authtokensecret"],
}
};
public static IObservable<Tweet> AllTweetsAbout(string topic)
{
return Observable.Create<Tweet>(o =>
{
var twitterCtx = new TwitterContext(Auth);
var query = from s in twitterCtx.Streaming
where s.Type == StreamingType.Filter &&
s.Track == topic
select s;
var disposed = false;
query.StartAsync(s =>
{
if(disposed)
s.CloseStream();
else
o.OnNext(Tweet.Parse(s.Content, topic));
return Task.FromResult(true);
});
return Disposable.Create(() => disposed = true);
});
}
最后是情感分析:
public class ScoredTweet
{
public Tweet Tweet { get; set; }
public int Score { get; set; }
}
public class SentimentAnalysis
{
private readonly Keywords _keywords;
public SentimentAnalysis(Keywords keywords)
{
_keywords = keywords;
}
public ScoredTweet Score(Tweet tweet)
{
return new ScoredTweet
{
Tweet = tweet,
Score = _keywords.Positive.Count(x => tweet.Text.Contains(x)) - _keywords.Negative.Count(x => tweet.Text.Contains(x))
};
}
}
public class Keywords
{
public List<string> Positive { get; private set; }
public List<string> Negative { get; private set; }
public static Keywords Create(List<string> positive, List<string> negative)
{
return new Keywords
{
Positive = positive,
Negative = negative
};
}
}
我找到了问题的解决方案。
推特 class 必须不是静态的,推特上下文应该只在创建推特 class.
时创建
在我发布的代码中,AllTweetsAbout 方法是静态的,每次调用该方法时都会创建 Twitter 上下文。这行不通,可能是因为在给定时间或类似情况下发生许多登录操作时,它会被 Twitter Api 以某种方式阻止。
我最近为 LINQ to Twitter 添加了 Rx 支持。也许这会对你有所帮助:
static async Task DoRxObservableStreamAsync(TwitterContext twitterCtx)
{
Console.WriteLine("\nStreamed Content: \n");
int count = 0;
var cancelTokenSrc = new CancellationTokenSource();
try
{
var observable =
await
(from strm in twitterCtx.Streaming
.WithCancellation(cancelTokenSrc.Token)
where strm.Type == StreamingType.Filter &&
strm.Track == "twitter"
select strm)
.ToObservableAsync();
observable.Subscribe(
strm =>
{
HandleStreamResponse(strm);
if (count++ >= 5)
cancelTokenSrc.Cancel();
},
ex => Console.WriteLine(ex.ToString()),
() => Console.WriteLine("Completed"));
}
catch (OperationCanceledException)
{
Console.WriteLine("Stream cancelled.");
}
}
您也可以 download the source code 此演示。
我试图从这个视频中重建 Jonathan Worthington 的 Twitter 示例: https://www.youtube.com/watch?v=_VdIQTtRkb8
它开始工作正常。但过了一会儿,Stream 停止了。在输入第三个搜索主题时,应用程序不再收到任何推文。我不知道为什么。它似乎不依赖于时间。因为如果我让它 运行 而不改变搜索主题,它就会继续。有人可以帮忙吗?
这里是主要代码 window:
public ObservableCollection<string> PositiveTweets = new ObservableCollection<string>();
public ObservableCollection<string> NegativeTweets = new ObservableCollection<string>();
public MainWindow()
{
InitializeComponent();
FocusManager.SetFocusedElement(this, SearchTextBox);
PositiveListBox.ItemsSource = PositiveTweets;
NegativeListBox.ItemsSource = NegativeTweets;
var keywords = Keywords.Create(new List<string> {"cool"}, new List<string> {"sucks"});
var sa = new SentimentAnalysis(keywords);
var topics = Observable
.FromEventPattern<TextChangedEventArgs>(SearchTextBox, "TextChanged")
.Select(e => ((TextBox) e.Sender).Text)
.Throttle(TimeSpan.FromSeconds(1));
var tweets = topics
.Select(Twitter.AllTweetsAbout)
.Switch()
.Select(sa.Score)
.Publish();
tweets.Connect();
var format = new Func<string, ScoredTweet, string>((topic, st) => String.Format("[user: @{0} | topic: {1} | score: {3}]\r\n{2}\r\n", st.Tweet.User, topic, st.Tweet.Text, st.Score));
var addToList = new Action<string, ObservableCollection<string>>((item, list) =>
{
if (list.Count == 4)
list.RemoveAt(3);
list.Insert(0, item);
});
tweets
.Where(x => x.Score >= 0)
.Sample(TimeSpan.FromSeconds(1))
.ObserveOnDispatcher()
.Subscribe(x => addToList(format(x.Tweet.Topic, x), PositiveTweets));
tweets
.Where(x => x.Score < 0)
.Sample(TimeSpan.FromSeconds(1))
.ObserveOnDispatcher()
.Subscribe(x => addToList(format(x.Tweet.Topic, x), NegativeTweets));
}
这是 XAML 代码:
<StackPanel Margin="10">
<DockPanel>
<Label DockPanel.Dock="Left" Content="Search:" Margin="0,0,10,0" FontSize="20"/>
<TextBox Name="SearchTextBox" FontSize="20" Focusable="True"/>
</DockPanel>
<Label Content="positive" FontSize="20"/>
<ListBox Name="PositiveListBox" Height="250" FontSize="16"/>
<Label Content="negative" FontSize="20"/>
<ListBox Name="NegativeListBox" Height="250" FontSize="16"/>
</StackPanel>
IObservable 是这样创建的:
readonly static SingleUserAuthorizer Auth = new SingleUserAuthorizer
{
CredentialStore = new InMemoryCredentialStore
{
ConsumerKey = ConfigurationManager.AppSettings["consumerKey"],
ConsumerSecret = ConfigurationManager.AppSettings["consumerSecret"],
OAuthToken = ConfigurationManager.AppSettings["authtoken"],
OAuthTokenSecret = ConfigurationManager.AppSettings["authtokensecret"],
}
};
public static IObservable<Tweet> AllTweetsAbout(string topic)
{
return Observable.Create<Tweet>(o =>
{
var twitterCtx = new TwitterContext(Auth);
var query = from s in twitterCtx.Streaming
where s.Type == StreamingType.Filter &&
s.Track == topic
select s;
var disposed = false;
query.StartAsync(s =>
{
if(disposed)
s.CloseStream();
else
o.OnNext(Tweet.Parse(s.Content, topic));
return Task.FromResult(true);
});
return Disposable.Create(() => disposed = true);
});
}
最后是情感分析:
public class ScoredTweet
{
public Tweet Tweet { get; set; }
public int Score { get; set; }
}
public class SentimentAnalysis
{
private readonly Keywords _keywords;
public SentimentAnalysis(Keywords keywords)
{
_keywords = keywords;
}
public ScoredTweet Score(Tweet tweet)
{
return new ScoredTweet
{
Tweet = tweet,
Score = _keywords.Positive.Count(x => tweet.Text.Contains(x)) - _keywords.Negative.Count(x => tweet.Text.Contains(x))
};
}
}
public class Keywords
{
public List<string> Positive { get; private set; }
public List<string> Negative { get; private set; }
public static Keywords Create(List<string> positive, List<string> negative)
{
return new Keywords
{
Positive = positive,
Negative = negative
};
}
}
我找到了问题的解决方案。 推特 class 必须不是静态的,推特上下文应该只在创建推特 class.
时创建在我发布的代码中,AllTweetsAbout 方法是静态的,每次调用该方法时都会创建 Twitter 上下文。这行不通,可能是因为在给定时间或类似情况下发生许多登录操作时,它会被 Twitter Api 以某种方式阻止。
我最近为 LINQ to Twitter 添加了 Rx 支持。也许这会对你有所帮助:
static async Task DoRxObservableStreamAsync(TwitterContext twitterCtx)
{
Console.WriteLine("\nStreamed Content: \n");
int count = 0;
var cancelTokenSrc = new CancellationTokenSource();
try
{
var observable =
await
(from strm in twitterCtx.Streaming
.WithCancellation(cancelTokenSrc.Token)
where strm.Type == StreamingType.Filter &&
strm.Track == "twitter"
select strm)
.ToObservableAsync();
observable.Subscribe(
strm =>
{
HandleStreamResponse(strm);
if (count++ >= 5)
cancelTokenSrc.Cancel();
},
ex => Console.WriteLine(ex.ToString()),
() => Console.WriteLine("Completed"));
}
catch (OperationCanceledException)
{
Console.WriteLine("Stream cancelled.");
}
}
您也可以 download the source code 此演示。