将许多请求线程化发布到同一 url
Threaded posting of many requests to same url
我构建了一个这样的 json 块:
data = {
"url" : "http://www.example.com/post",
"postdata" : [
{ "key1" : "value1" },
{ "key2" : "value2" },
{ "key3" : "value3" },
{ "key4" : "value4" },
{ "key5" : "value5" },
{ "key6" : "value6" }
]
}
我正在尝试 post 'postdata' 中的每个块并行(比如 2 个池)到相同的 'url'。我曾尝试使用 Faraday、Typhoeus 和 Parallel,但在这样做时我没有遇到有效的用例。
理想情况下,我想使用 Typhoeus::Hydra 或法拉第,将 'data' 对象传递给它,并使用数据 ['url' 汇集数据 ['postdata'] ] 作为终点,但我空手而归。大多数情况下,我 运行 遇到需要像这样的数据数组的情况:
[
{ "url" : "http://...",
"data" : { "key1" : "value1" },
{ "url" : "http://...",
"data" : { "key2" : "value2" },
{ "url" : "http://...",
"data" : { "key3" : "value3" }
]
但我显然希望避免这种重复。
最终目标:Post 可能有 100 多个 json 实体并行到相同的 url,一次汇集有限的(比如 10 个)。谁能帮我指引正确的道路?
免责声明:这是一个内部端点,因此没有恶意。
基于 tadman 的解决方案:
class BatchWrapper
def initialize(data, offset)
@data = data
@offset = offset
end
def as_json
@data['postdata'][@offset]
end
end
q = Queue.new
data['postdata'].each_index do |i|
q << BatchWrapper.new(data, i)
end
t = Thread.new do
n = q.size
n.times do |i|
value = q.pop
res = http.post(data['url'], value.as_json)
puts res
puts "consumed #{value.as_json}"
end
end
t.join
通常的策略是将源数据重新映射为更易于消化的内容,然后将其拆分到多个工作线程或通过某种异步 event-driven 方法,例如您在 EventMachine 中使用。
线程更容易理解,然后可以使用Queue结构来批处理每个线程消费的工作。
将您的作业分解为一系列对象,将这些对象塞入队列,然后启动 N 个线程来处理这些队列。
既然你在使用线程,那么使用共享数据是没问题的。例如,您可以有一个薄包装对象,给定该格式的结构,捕获您发送的偏移量:
class BatchWrapper
def initialize(data, offset)
@data = data
@offset = offset
end
def as_json
@data['postdata'][@offset]
end
end
然后为您打算发出的每个请求填充这些对象之一:
q = Queue.new
data['postdata'].each_index do |i|
q << BatchWrapper.new(data, i)
end
然后你可以在 worker 中旋转队列:
Thread.new do
while (wrapper = q.pop)
# Send it...
make_request(wrapper.to_json)
end
end
包装器方法允许您准确组合从该主要对象共享的数据以及 request-specific 共享的数据,并对数据本身进行任何必要的重组。 as_json
方法 returns to_json
方法最终编码的内容,因此您可以完全控制那里。
我构建了一个这样的 json 块:
data = {
"url" : "http://www.example.com/post",
"postdata" : [
{ "key1" : "value1" },
{ "key2" : "value2" },
{ "key3" : "value3" },
{ "key4" : "value4" },
{ "key5" : "value5" },
{ "key6" : "value6" }
]
}
我正在尝试 post 'postdata' 中的每个块并行(比如 2 个池)到相同的 'url'。我曾尝试使用 Faraday、Typhoeus 和 Parallel,但在这样做时我没有遇到有效的用例。
理想情况下,我想使用 Typhoeus::Hydra 或法拉第,将 'data' 对象传递给它,并使用数据 ['url' 汇集数据 ['postdata'] ] 作为终点,但我空手而归。大多数情况下,我 运行 遇到需要像这样的数据数组的情况:
[
{ "url" : "http://...",
"data" : { "key1" : "value1" },
{ "url" : "http://...",
"data" : { "key2" : "value2" },
{ "url" : "http://...",
"data" : { "key3" : "value3" }
]
但我显然希望避免这种重复。
最终目标:Post 可能有 100 多个 json 实体并行到相同的 url,一次汇集有限的(比如 10 个)。谁能帮我指引正确的道路?
免责声明:这是一个内部端点,因此没有恶意。
基于 tadman 的解决方案:
class BatchWrapper
def initialize(data, offset)
@data = data
@offset = offset
end
def as_json
@data['postdata'][@offset]
end
end
q = Queue.new
data['postdata'].each_index do |i|
q << BatchWrapper.new(data, i)
end
t = Thread.new do
n = q.size
n.times do |i|
value = q.pop
res = http.post(data['url'], value.as_json)
puts res
puts "consumed #{value.as_json}"
end
end
t.join
通常的策略是将源数据重新映射为更易于消化的内容,然后将其拆分到多个工作线程或通过某种异步 event-driven 方法,例如您在 EventMachine 中使用。
线程更容易理解,然后可以使用Queue结构来批处理每个线程消费的工作。
将您的作业分解为一系列对象,将这些对象塞入队列,然后启动 N 个线程来处理这些队列。
既然你在使用线程,那么使用共享数据是没问题的。例如,您可以有一个薄包装对象,给定该格式的结构,捕获您发送的偏移量:
class BatchWrapper
def initialize(data, offset)
@data = data
@offset = offset
end
def as_json
@data['postdata'][@offset]
end
end
然后为您打算发出的每个请求填充这些对象之一:
q = Queue.new
data['postdata'].each_index do |i|
q << BatchWrapper.new(data, i)
end
然后你可以在 worker 中旋转队列:
Thread.new do
while (wrapper = q.pop)
# Send it...
make_request(wrapper.to_json)
end
end
包装器方法允许您准确组合从该主要对象共享的数据以及 request-specific 共享的数据,并对数据本身进行任何必要的重组。 as_json
方法 returns to_json
方法最终编码的内容,因此您可以完全控制那里。