Splunk 模块化输入更新参数

Splunk modular input update parameter

有没有办法用 Splunks Python SDK 更新模块化输入脚本的输入参数?

def stream_events(self, inputs, ew):
    twitter = OAuth1Session(api_key, api_secret, access_token, access_token_secret)

    for input_name, input_item in inputs.inputs.iteritems():
        hashtag = input_item["hashtag"]
        since_id = input_item["since_id"]

        if since_id == "0":
            url = "https://api.twitter.com/1.1/search/tweets.json?q=%%23%s" % hashtag
        else:
            url = "https://api.twitter.com/1.1/search/tweets.json?since_id=%s&q=%%23%s" % (since_id, hashtag)

        r = twitter.get(url)

        output = json.loads(r.content)
        if len(output["statuses"]) != 0:

            for tweet in output["statuses"]:
                print_xml_stream(input_name, json.dumps(tweet), tweet["created_at"])

我把since_id参数设置为0,然后我第一次调用推特API。得到结果后,我想用 Twitter 搜索元数据 (output["search_metadata"]["max_id"])

中的 max_id 字段更新 since_id 参数

感谢您的帮助

使用检查点存储 since_id 在这种情况下您的最佳选择。这是您的代码大致的外观差异。我为文件 IO 部分留下了 TODO。对于 JavaScript 和 C# SDK 中的 GitHub 提交示例,我们在检查点文件中每行存储 1 个提交 sha。对于您的情况,您只需要将 since_id 存储在文件中。请注意,您输入的每个实例都应该有自己的检查点文件(即:每个输入都映射到一个主题标签,因此您应该为要索引的每个主题标签创建一个检查点文件)。

diff --git a/program.py b/program.py
index 95f69de..defb535 100644
--- a/program.py
+++ b/program.py
@@ -1,14 +1,17 @@
 def stream_events(self, inputs, ew):
     twitter = OAuth1Session(api_key, api_secret, access_token, access_token_secret)

+    checkpoint_dir = inputs.metadata['checkpoint_dir']
+
     for input_name, input_item in inputs.inputs.iteritems():
         hashtag = input_item["hashtag"]
         since_id = input_item["since_id"]

-        if since_id == "0":
-            url = "https://api.twitter.com/1.1/search/tweets.json?q=%%23%s" % hashtag
-        else:
-            url = "https://api.twitter.com/1.1/search/tweets.json?since_id=%s&q=%%23%s" % (since_id, hashtag)
+        checkpoint_file_path = os.path.join(checkpoint_dir, hashtag + ".txt")
+
+        since_id = "" # TODO: read from the file
+        
+        url = "https://api.twitter.com/1.1/search/tweets.json?since_id=%s&q=%%23%s" % (since_id, hashtag)

         r = twitter.get(url)

@@ -16,4 +19,6 @@ def stream_events(self, inputs, ew):
         if len(output["statuses"]) != 0:

             for tweet in output["statuses"]:
-                print_xml_stream(input_name, json.dumps(tweet), tweet["created_at"])
\ No newline at end of file
+                print_xml_stream(input_name, json.dumps(tweet), tweet["created_at"])
+
+        # TODO: on success, update the contents of the checkpoint file
\ No newline at end of file