如何在推特上引发新事件,以便客户可以收到事件通知?

How to Raise new event on twitter so that clients can receive notification of the event?

我正在使用 LINQ to Twitter 4.1.0 检索推文,搜索特定的标签 eg.#abc。现在,只要有人在我的帐户中使用相同的标签 eg.#abc 发推文,我就会收到通知。谁能建议我怎么做?

如果您使用的是 REST API,您可以执行定期运行的搜索查询,使用 SinceID/MaxID 确保您不会搜索你已经看过的推文。它是这样工作的:

    static async Task DoPagedSearchAsync(TwitterContext twitterCtx)
    {
        const int MaxSearchEntriesToReturn = 100;

        string searchTerm = "twitter";

        // oldest id you already have for this search term
        ulong sinceID = 1;

        // used after the first query to track current session
        ulong maxID; 

        var combinedSearchResults = new List<Status>();

        List<Status> searchResponse =
            await
            (from search in twitterCtx.Search
             where search.Type == SearchType.Search &&
                   search.Query == searchTerm &&
                   search.Count == MaxSearchEntriesToReturn &&
                   search.SinceID == sinceID
             select search.Statuses)
            .SingleOrDefaultAsync();

        combinedSearchResults.AddRange(searchResponse);
        ulong previousMaxID = ulong.MaxValue;
        do
        {
            // one less than the newest id you've just queried
            maxID = searchResponse.Min(status => status.StatusID) - 1;

            Debug.Assert(maxID < previousMaxID);
            previousMaxID = maxID;

            searchResponse =
                await
                (from search in twitterCtx.Search
                 where search.Type == SearchType.Search &&
                       search.Query == searchTerm &&
                       search.Count == MaxSearchEntriesToReturn &&
                       search.MaxID == maxID &&
                       search.SinceID == sinceID
                 select search.Statuses)
                .SingleOrDefaultAsync();

            combinedSearchResults.AddRange(searchResponse);
        } while (searchResponse.Any());

        combinedSearchResults.ForEach(tweet =>
            Console.WriteLine(
                "\n  User: {0} ({1})\n  Tweet: {2}",
                tweet.User.ScreenNameResponse,
                tweet.User.UserIDResponse,
                tweet.Text));
    }

在此示例中,SinceID 设置为 1 以获取自 Twitter 开始以来的所有推文。但是,您应该将其作为参数传递,之前保留了先前查询中最旧的推文 ID。

由于您没有 post 任何显示您想要的代码,我假设您在 REST API 上使用某种形式的轮询,与示例相差不大多于。但是,您也可以使用流式传输 API,其中 returns 在发布推文后几秒钟内匹配推文。下面是使用 Filter 流的示例:

    static async Task DoFilterStreamAsync(TwitterContext twitterCtx)
    {
        Console.WriteLine("\nStreamed Content: \n");
        int count = 0;
        var cancelTokenSrc = new CancellationTokenSource();

        try
        {
            await
                (from strm in twitterCtx.Streaming
                                        .WithCancellation(cancelTokenSrc.Token)
                 where strm.Type == StreamingType.Filter &&
                       strm.Track == "twitter"
                 select strm)
                .StartAsync(async strm =>
                {
                    await HandleStreamResponse(strm);

                    if (count++ >= 5)
                        cancelTokenSrc.Cancel();
                });
        }
        catch (IOException ex)
        {
            // Twitter might have closed the stream,
            // which they do sometimes. You should
            // restart the stream, but be sure to
            // read Twitter documentation on stream
            // back-off strategies to prevent your
            // app from being blocked.
            Console.WriteLine(ex.ToString());
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Stream cancelled.");
        }
    }

HandleStreamResponse() 是您要编写的处理通知的方法。我的建议是你使用消息队列或其他东西,这样你就不会在你使用的术语流行或获得高流量时阻止流。请快速处理消息。

如果你更喜欢响应式编程,你可以这样做:

    static async Task DoRxObservableStreamAsync(TwitterContext twitterCtx)
    {
        Console.WriteLine("\nStreamed Content: \n");
        int count = 0;
        var cancelTokenSrc = new CancellationTokenSource();

        try
        {
            var observable =
                await
                    (from strm in twitterCtx.Streaming
                                            .WithCancellation(cancelTokenSrc.Token)
                     where strm.Type == StreamingType.Filter &&
                           strm.Track == "twitter"
                     select strm)
                    .ToObservableAsync();

            observable.Subscribe(
                async strm =>
                {
                    await HandleStreamResponse(strm);

                    if (count++ >= 5)
                        cancelTokenSrc.Cancel();
                },
                ex => Console.WriteLine(ex.ToString()),
                () => Console.WriteLine("Completed"));
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Stream cancelled.");
        }
    }

在回调和反应场景中,监控 steam 以查看它是否已关闭(例如 OperationCancelledException)。然后你必须创建一个全新的流实例到 re-start。如果发生这种情况,请跟踪您最后看到的 TweetID 并使用 REST API 搜索从那时到新流开始之间的所有推文。

更新

在每个demo中,都有一个HandleStreamResponse方法,也就是async。您可以下载 demo source code 并逐步了解它的工作原理。本质上,StreamContent 类型有一个 EntityType 属性 告诉响应是什么类型。由于源代码中的演示使用不同类型的流,因此 switch 语句说明了所有可能的消息类型。但是,在 Search 流的情况下,唯一的响应将是 StreamEntityType.Status,这将简化您的代码,因为您可以消除其他情况。知道类型后,您可以使用 as 运算符对 Entity 属性 进行转换,例如 var status = strm.Entity as Status,然后查询 status 变量属性你需要的信息。

    static async Task<int> HandleStreamResponse(StreamContent strm)
    {
        switch (strm.EntityType)
        {
            case StreamEntityType.Control:
                var control = strm.Entity as Control;
                Console.WriteLine("Control URI: {0}", control.URL);
                break;
            case StreamEntityType.Delete:
                var delete = strm.Entity as Delete;
                Console.WriteLine("Delete - User ID: {0}, Status ID: {1}", delete.UserID, delete.StatusID);
                break;
            case StreamEntityType.DirectMessage:
                var dm = strm.Entity as DirectMessage;
                Console.WriteLine("Direct Message - Sender: {0}, Text: {1}", dm.Sender, dm.Text);
                break;
            case StreamEntityType.Disconnect:
                var disconnect = strm.Entity as Disconnect;
                Console.WriteLine("Disconnect - {0}", disconnect.Reason);
                break;
            case StreamEntityType.Event:
                var evt = strm.Entity as Event;
                Console.WriteLine("Event - Event Name: {0}", evt.EventName);
                break;
            case StreamEntityType.ForUser:
                var user = strm.Entity as ForUser;
                Console.WriteLine("For User - User ID: {0}, # Friends: {1}", user.UserID, user.Friends.Count);
                break;
            case StreamEntityType.FriendsList:
                var friends = strm.Entity as FriendsList;
                Console.WriteLine("Friends List - # Friends: {0}", friends.Friends.Count);
                break;
            case StreamEntityType.GeoScrub:
                var scrub = strm.Entity as GeoScrub;
                Console.WriteLine("GeoScrub - User ID: {0}, Up to Status ID: {1}", scrub.UserID, scrub.UpToStatusID);
                break;
            case StreamEntityType.Limit:
                var limit = strm.Entity as Limit;
                Console.WriteLine("Limit - Track: {0}", limit.Track);
                break;
            case StreamEntityType.Stall:
                var stall = strm.Entity as Stall;
                Console.WriteLine("Stall - Code: {0}, Message: {1}, % Full: {2}", stall.Code, stall.Message, stall.PercentFull);
                break;
            case StreamEntityType.Status:
                var status = strm.Entity as Status;
                Console.WriteLine("Status - @{0}: {1}", status.User.ScreenNameResponse, status.Text);
                break;
            case StreamEntityType.StatusWithheld:
                var statusWithheld = strm.Entity as StatusWithheld;
                Console.WriteLine("Status Withheld - Status ID: {0}, # Countries: {1}", statusWithheld.StatusID, statusWithheld.WithheldInCountries.Count);
                break;
            case StreamEntityType.TooManyFollows:
                var follows = strm.Entity as TooManyFollows;
                Console.WriteLine("Too Many Follows - Message: {0}", follows.Message);
                break;
            case StreamEntityType.UserWithheld:
                var userWithheld = strm.Entity as UserWithheld;
                Console.WriteLine("User Withheld - User ID: {0}, # Countries: {1}", userWithheld.UserID, userWithheld.WithheldInCountries.Count);
                break;
            case StreamEntityType.ParseError:
                var unparsedJson = strm.Entity as string;
                Console.WriteLine("Parse Error - {0}", unparsedJson);
                break;
            case StreamEntityType.Unknown:
            default:
                Console.WriteLine("Unknown - " + strm.Content + "\n");
                break;
        }

        return await Task.FromResult(0);
    }