Splunk 模块化输入更新参数
Splunk modular input update parameter
有没有办法用 Splunks Python SDK 更新模块化输入脚本的输入参数?
def stream_events(self, inputs, ew):
twitter = OAuth1Session(api_key, api_secret, access_token, access_token_secret)
for input_name, input_item in inputs.inputs.iteritems():
hashtag = input_item["hashtag"]
since_id = input_item["since_id"]
if since_id == "0":
url = "https://api.twitter.com/1.1/search/tweets.json?q=%%23%s" % hashtag
else:
url = "https://api.twitter.com/1.1/search/tweets.json?since_id=%s&q=%%23%s" % (since_id, hashtag)
r = twitter.get(url)
output = json.loads(r.content)
if len(output["statuses"]) != 0:
for tweet in output["statuses"]:
print_xml_stream(input_name, json.dumps(tweet), tweet["created_at"])
我把since_id
参数设置为0,然后我第一次调用推特API。得到结果后,我想用 Twitter 搜索元数据 (output["search_metadata"]["max_id"]
)
中的 max_id
字段更新 since_id
参数
感谢您的帮助
使用检查点存储 since_id
在这种情况下您的最佳选择。这是您的代码大致的外观差异。我为文件 IO 部分留下了 TODO。对于 JavaScript 和 C# SDK 中的 GitHub 提交示例,我们在检查点文件中每行存储 1 个提交 sha。对于您的情况,您只需要将 since_id
存储在文件中。请注意,您输入的每个实例都应该有自己的检查点文件(即:每个输入都映射到一个主题标签,因此您应该为要索引的每个主题标签创建一个检查点文件)。
diff --git a/program.py b/program.py
index 95f69de..defb535 100644
--- a/program.py
+++ b/program.py
@@ -1,14 +1,17 @@
def stream_events(self, inputs, ew):
twitter = OAuth1Session(api_key, api_secret, access_token, access_token_secret)
+ checkpoint_dir = inputs.metadata['checkpoint_dir']
+
for input_name, input_item in inputs.inputs.iteritems():
hashtag = input_item["hashtag"]
since_id = input_item["since_id"]
- if since_id == "0":
- url = "https://api.twitter.com/1.1/search/tweets.json?q=%%23%s" % hashtag
- else:
- url = "https://api.twitter.com/1.1/search/tweets.json?since_id=%s&q=%%23%s" % (since_id, hashtag)
+ checkpoint_file_path = os.path.join(checkpoint_dir, hashtag + ".txt")
+
+ since_id = "" # TODO: read from the file
+
+ url = "https://api.twitter.com/1.1/search/tweets.json?since_id=%s&q=%%23%s" % (since_id, hashtag)
r = twitter.get(url)
@@ -16,4 +19,6 @@ def stream_events(self, inputs, ew):
if len(output["statuses"]) != 0:
for tweet in output["statuses"]:
- print_xml_stream(input_name, json.dumps(tweet), tweet["created_at"])
\ No newline at end of file
+ print_xml_stream(input_name, json.dumps(tweet), tweet["created_at"])
+
+ # TODO: on success, update the contents of the checkpoint file
\ No newline at end of file
有没有办法用 Splunks Python SDK 更新模块化输入脚本的输入参数?
def stream_events(self, inputs, ew):
twitter = OAuth1Session(api_key, api_secret, access_token, access_token_secret)
for input_name, input_item in inputs.inputs.iteritems():
hashtag = input_item["hashtag"]
since_id = input_item["since_id"]
if since_id == "0":
url = "https://api.twitter.com/1.1/search/tweets.json?q=%%23%s" % hashtag
else:
url = "https://api.twitter.com/1.1/search/tweets.json?since_id=%s&q=%%23%s" % (since_id, hashtag)
r = twitter.get(url)
output = json.loads(r.content)
if len(output["statuses"]) != 0:
for tweet in output["statuses"]:
print_xml_stream(input_name, json.dumps(tweet), tweet["created_at"])
我把since_id
参数设置为0,然后我第一次调用推特API。得到结果后,我想用 Twitter 搜索元数据 (output["search_metadata"]["max_id"]
)
max_id
字段更新 since_id
参数
感谢您的帮助
使用检查点存储 since_id
在这种情况下您的最佳选择。这是您的代码大致的外观差异。我为文件 IO 部分留下了 TODO。对于 JavaScript 和 C# SDK 中的 GitHub 提交示例,我们在检查点文件中每行存储 1 个提交 sha。对于您的情况,您只需要将 since_id
存储在文件中。请注意,您输入的每个实例都应该有自己的检查点文件(即:每个输入都映射到一个主题标签,因此您应该为要索引的每个主题标签创建一个检查点文件)。
diff --git a/program.py b/program.py
index 95f69de..defb535 100644
--- a/program.py
+++ b/program.py
@@ -1,14 +1,17 @@
def stream_events(self, inputs, ew):
twitter = OAuth1Session(api_key, api_secret, access_token, access_token_secret)
+ checkpoint_dir = inputs.metadata['checkpoint_dir']
+
for input_name, input_item in inputs.inputs.iteritems():
hashtag = input_item["hashtag"]
since_id = input_item["since_id"]
- if since_id == "0":
- url = "https://api.twitter.com/1.1/search/tweets.json?q=%%23%s" % hashtag
- else:
- url = "https://api.twitter.com/1.1/search/tweets.json?since_id=%s&q=%%23%s" % (since_id, hashtag)
+ checkpoint_file_path = os.path.join(checkpoint_dir, hashtag + ".txt")
+
+ since_id = "" # TODO: read from the file
+
+ url = "https://api.twitter.com/1.1/search/tweets.json?since_id=%s&q=%%23%s" % (since_id, hashtag)
r = twitter.get(url)
@@ -16,4 +19,6 @@ def stream_events(self, inputs, ew):
if len(output["statuses"]) != 0:
for tweet in output["statuses"]:
- print_xml_stream(input_name, json.dumps(tweet), tweet["created_at"])
\ No newline at end of file
+ print_xml_stream(input_name, json.dumps(tweet), tweet["created_at"])
+
+ # TODO: on success, update the contents of the checkpoint file
\ No newline at end of file