如何在推特上引发新事件,以便客户可以收到事件通知?
How to Raise new event on twitter so that clients can receive notification of the event?
我正在使用 LINQ to Twitter 4.1.0 检索推文,搜索特定的标签 eg.#abc。现在,只要有人在我的帐户中使用相同的标签 eg.#abc 发推文,我就会收到通知。谁能建议我怎么做?
如果您使用的是 REST API,您可以执行定期运行的搜索查询,使用 SinceID
/MaxID
确保您不会搜索你已经看过的推文。它是这样工作的:
static async Task DoPagedSearchAsync(TwitterContext twitterCtx)
{
const int MaxSearchEntriesToReturn = 100;
string searchTerm = "twitter";
// oldest id you already have for this search term
ulong sinceID = 1;
// used after the first query to track current session
ulong maxID;
var combinedSearchResults = new List<Status>();
List<Status> searchResponse =
await
(from search in twitterCtx.Search
where search.Type == SearchType.Search &&
search.Query == searchTerm &&
search.Count == MaxSearchEntriesToReturn &&
search.SinceID == sinceID
select search.Statuses)
.SingleOrDefaultAsync();
combinedSearchResults.AddRange(searchResponse);
ulong previousMaxID = ulong.MaxValue;
do
{
// one less than the newest id you've just queried
maxID = searchResponse.Min(status => status.StatusID) - 1;
Debug.Assert(maxID < previousMaxID);
previousMaxID = maxID;
searchResponse =
await
(from search in twitterCtx.Search
where search.Type == SearchType.Search &&
search.Query == searchTerm &&
search.Count == MaxSearchEntriesToReturn &&
search.MaxID == maxID &&
search.SinceID == sinceID
select search.Statuses)
.SingleOrDefaultAsync();
combinedSearchResults.AddRange(searchResponse);
} while (searchResponse.Any());
combinedSearchResults.ForEach(tweet =>
Console.WriteLine(
"\n User: {0} ({1})\n Tweet: {2}",
tweet.User.ScreenNameResponse,
tweet.User.UserIDResponse,
tweet.Text));
}
在此示例中,SinceID
设置为 1 以获取自 Twitter 开始以来的所有推文。但是,您应该将其作为参数传递,之前保留了先前查询中最旧的推文 ID。
由于您没有 post 任何显示您想要的代码,我假设您在 REST API 上使用某种形式的轮询,与示例相差不大多于。但是,您也可以使用流式传输 API,其中 returns 在发布推文后几秒钟内匹配推文。下面是使用 Filter
流的示例:
static async Task DoFilterStreamAsync(TwitterContext twitterCtx)
{
Console.WriteLine("\nStreamed Content: \n");
int count = 0;
var cancelTokenSrc = new CancellationTokenSource();
try
{
await
(from strm in twitterCtx.Streaming
.WithCancellation(cancelTokenSrc.Token)
where strm.Type == StreamingType.Filter &&
strm.Track == "twitter"
select strm)
.StartAsync(async strm =>
{
await HandleStreamResponse(strm);
if (count++ >= 5)
cancelTokenSrc.Cancel();
});
}
catch (IOException ex)
{
// Twitter might have closed the stream,
// which they do sometimes. You should
// restart the stream, but be sure to
// read Twitter documentation on stream
// back-off strategies to prevent your
// app from being blocked.
Console.WriteLine(ex.ToString());
}
catch (OperationCanceledException)
{
Console.WriteLine("Stream cancelled.");
}
}
HandleStreamResponse()
是您要编写的处理通知的方法。我的建议是你使用消息队列或其他东西,这样你就不会在你使用的术语流行或获得高流量时阻止流。请快速处理消息。
如果你更喜欢响应式编程,你可以这样做:
static async Task DoRxObservableStreamAsync(TwitterContext twitterCtx)
{
Console.WriteLine("\nStreamed Content: \n");
int count = 0;
var cancelTokenSrc = new CancellationTokenSource();
try
{
var observable =
await
(from strm in twitterCtx.Streaming
.WithCancellation(cancelTokenSrc.Token)
where strm.Type == StreamingType.Filter &&
strm.Track == "twitter"
select strm)
.ToObservableAsync();
observable.Subscribe(
async strm =>
{
await HandleStreamResponse(strm);
if (count++ >= 5)
cancelTokenSrc.Cancel();
},
ex => Console.WriteLine(ex.ToString()),
() => Console.WriteLine("Completed"));
}
catch (OperationCanceledException)
{
Console.WriteLine("Stream cancelled.");
}
}
在回调和反应场景中,监控 steam 以查看它是否已关闭(例如 OperationCancelledException
)。然后你必须创建一个全新的流实例到 re-start。如果发生这种情况,请跟踪您最后看到的 TweetID
并使用 REST API 搜索从那时到新流开始之间的所有推文。
更新
在每个demo中,都有一个HandleStreamResponse
方法,也就是async
。您可以下载 demo source code 并逐步了解它的工作原理。本质上,StreamContent
类型有一个 EntityType
属性 告诉响应是什么类型。由于源代码中的演示使用不同类型的流,因此 switch
语句说明了所有可能的消息类型。但是,在 Search
流的情况下,唯一的响应将是 StreamEntityType.Status
,这将简化您的代码,因为您可以消除其他情况。知道类型后,您可以使用 as
运算符对 Entity
属性 进行转换,例如 var status = strm.Entity as Status
,然后查询 status
变量属性你需要的信息。
static async Task<int> HandleStreamResponse(StreamContent strm)
{
switch (strm.EntityType)
{
case StreamEntityType.Control:
var control = strm.Entity as Control;
Console.WriteLine("Control URI: {0}", control.URL);
break;
case StreamEntityType.Delete:
var delete = strm.Entity as Delete;
Console.WriteLine("Delete - User ID: {0}, Status ID: {1}", delete.UserID, delete.StatusID);
break;
case StreamEntityType.DirectMessage:
var dm = strm.Entity as DirectMessage;
Console.WriteLine("Direct Message - Sender: {0}, Text: {1}", dm.Sender, dm.Text);
break;
case StreamEntityType.Disconnect:
var disconnect = strm.Entity as Disconnect;
Console.WriteLine("Disconnect - {0}", disconnect.Reason);
break;
case StreamEntityType.Event:
var evt = strm.Entity as Event;
Console.WriteLine("Event - Event Name: {0}", evt.EventName);
break;
case StreamEntityType.ForUser:
var user = strm.Entity as ForUser;
Console.WriteLine("For User - User ID: {0}, # Friends: {1}", user.UserID, user.Friends.Count);
break;
case StreamEntityType.FriendsList:
var friends = strm.Entity as FriendsList;
Console.WriteLine("Friends List - # Friends: {0}", friends.Friends.Count);
break;
case StreamEntityType.GeoScrub:
var scrub = strm.Entity as GeoScrub;
Console.WriteLine("GeoScrub - User ID: {0}, Up to Status ID: {1}", scrub.UserID, scrub.UpToStatusID);
break;
case StreamEntityType.Limit:
var limit = strm.Entity as Limit;
Console.WriteLine("Limit - Track: {0}", limit.Track);
break;
case StreamEntityType.Stall:
var stall = strm.Entity as Stall;
Console.WriteLine("Stall - Code: {0}, Message: {1}, % Full: {2}", stall.Code, stall.Message, stall.PercentFull);
break;
case StreamEntityType.Status:
var status = strm.Entity as Status;
Console.WriteLine("Status - @{0}: {1}", status.User.ScreenNameResponse, status.Text);
break;
case StreamEntityType.StatusWithheld:
var statusWithheld = strm.Entity as StatusWithheld;
Console.WriteLine("Status Withheld - Status ID: {0}, # Countries: {1}", statusWithheld.StatusID, statusWithheld.WithheldInCountries.Count);
break;
case StreamEntityType.TooManyFollows:
var follows = strm.Entity as TooManyFollows;
Console.WriteLine("Too Many Follows - Message: {0}", follows.Message);
break;
case StreamEntityType.UserWithheld:
var userWithheld = strm.Entity as UserWithheld;
Console.WriteLine("User Withheld - User ID: {0}, # Countries: {1}", userWithheld.UserID, userWithheld.WithheldInCountries.Count);
break;
case StreamEntityType.ParseError:
var unparsedJson = strm.Entity as string;
Console.WriteLine("Parse Error - {0}", unparsedJson);
break;
case StreamEntityType.Unknown:
default:
Console.WriteLine("Unknown - " + strm.Content + "\n");
break;
}
return await Task.FromResult(0);
}
我正在使用 LINQ to Twitter 4.1.0 检索推文,搜索特定的标签 eg.#abc。现在,只要有人在我的帐户中使用相同的标签 eg.#abc 发推文,我就会收到通知。谁能建议我怎么做?
如果您使用的是 REST API,您可以执行定期运行的搜索查询,使用 SinceID
/MaxID
确保您不会搜索你已经看过的推文。它是这样工作的:
static async Task DoPagedSearchAsync(TwitterContext twitterCtx)
{
const int MaxSearchEntriesToReturn = 100;
string searchTerm = "twitter";
// oldest id you already have for this search term
ulong sinceID = 1;
// used after the first query to track current session
ulong maxID;
var combinedSearchResults = new List<Status>();
List<Status> searchResponse =
await
(from search in twitterCtx.Search
where search.Type == SearchType.Search &&
search.Query == searchTerm &&
search.Count == MaxSearchEntriesToReturn &&
search.SinceID == sinceID
select search.Statuses)
.SingleOrDefaultAsync();
combinedSearchResults.AddRange(searchResponse);
ulong previousMaxID = ulong.MaxValue;
do
{
// one less than the newest id you've just queried
maxID = searchResponse.Min(status => status.StatusID) - 1;
Debug.Assert(maxID < previousMaxID);
previousMaxID = maxID;
searchResponse =
await
(from search in twitterCtx.Search
where search.Type == SearchType.Search &&
search.Query == searchTerm &&
search.Count == MaxSearchEntriesToReturn &&
search.MaxID == maxID &&
search.SinceID == sinceID
select search.Statuses)
.SingleOrDefaultAsync();
combinedSearchResults.AddRange(searchResponse);
} while (searchResponse.Any());
combinedSearchResults.ForEach(tweet =>
Console.WriteLine(
"\n User: {0} ({1})\n Tweet: {2}",
tweet.User.ScreenNameResponse,
tweet.User.UserIDResponse,
tweet.Text));
}
在此示例中,SinceID
设置为 1 以获取自 Twitter 开始以来的所有推文。但是,您应该将其作为参数传递,之前保留了先前查询中最旧的推文 ID。
由于您没有 post 任何显示您想要的代码,我假设您在 REST API 上使用某种形式的轮询,与示例相差不大多于。但是,您也可以使用流式传输 API,其中 returns 在发布推文后几秒钟内匹配推文。下面是使用 Filter
流的示例:
static async Task DoFilterStreamAsync(TwitterContext twitterCtx)
{
Console.WriteLine("\nStreamed Content: \n");
int count = 0;
var cancelTokenSrc = new CancellationTokenSource();
try
{
await
(from strm in twitterCtx.Streaming
.WithCancellation(cancelTokenSrc.Token)
where strm.Type == StreamingType.Filter &&
strm.Track == "twitter"
select strm)
.StartAsync(async strm =>
{
await HandleStreamResponse(strm);
if (count++ >= 5)
cancelTokenSrc.Cancel();
});
}
catch (IOException ex)
{
// Twitter might have closed the stream,
// which they do sometimes. You should
// restart the stream, but be sure to
// read Twitter documentation on stream
// back-off strategies to prevent your
// app from being blocked.
Console.WriteLine(ex.ToString());
}
catch (OperationCanceledException)
{
Console.WriteLine("Stream cancelled.");
}
}
HandleStreamResponse()
是您要编写的处理通知的方法。我的建议是你使用消息队列或其他东西,这样你就不会在你使用的术语流行或获得高流量时阻止流。请快速处理消息。
如果你更喜欢响应式编程,你可以这样做:
static async Task DoRxObservableStreamAsync(TwitterContext twitterCtx)
{
Console.WriteLine("\nStreamed Content: \n");
int count = 0;
var cancelTokenSrc = new CancellationTokenSource();
try
{
var observable =
await
(from strm in twitterCtx.Streaming
.WithCancellation(cancelTokenSrc.Token)
where strm.Type == StreamingType.Filter &&
strm.Track == "twitter"
select strm)
.ToObservableAsync();
observable.Subscribe(
async strm =>
{
await HandleStreamResponse(strm);
if (count++ >= 5)
cancelTokenSrc.Cancel();
},
ex => Console.WriteLine(ex.ToString()),
() => Console.WriteLine("Completed"));
}
catch (OperationCanceledException)
{
Console.WriteLine("Stream cancelled.");
}
}
在回调和反应场景中,监控 steam 以查看它是否已关闭(例如 OperationCancelledException
)。然后你必须创建一个全新的流实例到 re-start。如果发生这种情况,请跟踪您最后看到的 TweetID
并使用 REST API 搜索从那时到新流开始之间的所有推文。
更新
在每个demo中,都有一个HandleStreamResponse
方法,也就是async
。您可以下载 demo source code 并逐步了解它的工作原理。本质上,StreamContent
类型有一个 EntityType
属性 告诉响应是什么类型。由于源代码中的演示使用不同类型的流,因此 switch
语句说明了所有可能的消息类型。但是,在 Search
流的情况下,唯一的响应将是 StreamEntityType.Status
,这将简化您的代码,因为您可以消除其他情况。知道类型后,您可以使用 as
运算符对 Entity
属性 进行转换,例如 var status = strm.Entity as Status
,然后查询 status
变量属性你需要的信息。
static async Task<int> HandleStreamResponse(StreamContent strm)
{
switch (strm.EntityType)
{
case StreamEntityType.Control:
var control = strm.Entity as Control;
Console.WriteLine("Control URI: {0}", control.URL);
break;
case StreamEntityType.Delete:
var delete = strm.Entity as Delete;
Console.WriteLine("Delete - User ID: {0}, Status ID: {1}", delete.UserID, delete.StatusID);
break;
case StreamEntityType.DirectMessage:
var dm = strm.Entity as DirectMessage;
Console.WriteLine("Direct Message - Sender: {0}, Text: {1}", dm.Sender, dm.Text);
break;
case StreamEntityType.Disconnect:
var disconnect = strm.Entity as Disconnect;
Console.WriteLine("Disconnect - {0}", disconnect.Reason);
break;
case StreamEntityType.Event:
var evt = strm.Entity as Event;
Console.WriteLine("Event - Event Name: {0}", evt.EventName);
break;
case StreamEntityType.ForUser:
var user = strm.Entity as ForUser;
Console.WriteLine("For User - User ID: {0}, # Friends: {1}", user.UserID, user.Friends.Count);
break;
case StreamEntityType.FriendsList:
var friends = strm.Entity as FriendsList;
Console.WriteLine("Friends List - # Friends: {0}", friends.Friends.Count);
break;
case StreamEntityType.GeoScrub:
var scrub = strm.Entity as GeoScrub;
Console.WriteLine("GeoScrub - User ID: {0}, Up to Status ID: {1}", scrub.UserID, scrub.UpToStatusID);
break;
case StreamEntityType.Limit:
var limit = strm.Entity as Limit;
Console.WriteLine("Limit - Track: {0}", limit.Track);
break;
case StreamEntityType.Stall:
var stall = strm.Entity as Stall;
Console.WriteLine("Stall - Code: {0}, Message: {1}, % Full: {2}", stall.Code, stall.Message, stall.PercentFull);
break;
case StreamEntityType.Status:
var status = strm.Entity as Status;
Console.WriteLine("Status - @{0}: {1}", status.User.ScreenNameResponse, status.Text);
break;
case StreamEntityType.StatusWithheld:
var statusWithheld = strm.Entity as StatusWithheld;
Console.WriteLine("Status Withheld - Status ID: {0}, # Countries: {1}", statusWithheld.StatusID, statusWithheld.WithheldInCountries.Count);
break;
case StreamEntityType.TooManyFollows:
var follows = strm.Entity as TooManyFollows;
Console.WriteLine("Too Many Follows - Message: {0}", follows.Message);
break;
case StreamEntityType.UserWithheld:
var userWithheld = strm.Entity as UserWithheld;
Console.WriteLine("User Withheld - User ID: {0}, # Countries: {1}", userWithheld.UserID, userWithheld.WithheldInCountries.Count);
break;
case StreamEntityType.ParseError:
var unparsedJson = strm.Entity as string;
Console.WriteLine("Parse Error - {0}", unparsedJson);
break;
case StreamEntityType.Unknown:
default:
Console.WriteLine("Unknown - " + strm.Content + "\n");
break;
}
return await Task.FromResult(0);
}