Return 来自 Ruby 方法,但将其 运行 保留在后台

Return from Ruby method but keep it running in background

我需要在 MongoDB 中创建一个包含超过 100,000 个联系人的大列表(将 List_Id 插入每个联系人记录)。所以我的解决方案是:先添加 100 个联系人,然后 return 到客户端,这样 UI 可以显示前 100 个联系人。然后剩下的联系人稍后再添加。

我的问题是我希望 process/method 在后台保持 运行 而我 return 到客户端。我的直觉告诉我线程会是一个很好的解决方案。

示例代码:

def add_contacts_to_list
    count = 0
    thread = Thread.new{
      @contacts.each do |contact|
         add_to_list(contact, list_id)
         count = count + 1
         #what I want
            when count = 100, return to client, and the thread is still running like nothing happens
         #what I want
      end
    }
    thread.join
end

就使用单独的线程作为较大进程的后台而言,听起来您走在了正确的轨道上。您可能希望获得最初的 100 个结果,然后将作业提交给 sidekiq 或 resque 到 运行 更大的操作。线程是一个复杂的话题,不同的服务器表现不同。受人尊敬的 Aaron Patterson (tenderlove) 在此线程中对此进行了讨论:http://tenderlovemaking.com/2012/06/18/removing-config-threadsafe.html - 这实际上是他的域的名称,几乎完全安全工作,物有所值。如果您要 运行 如此大的进程,您肯定需要将它们移出阻塞请求线程,因此我再次建议您查看上面提到的某种作业队列。

对于非常小的负载应用程序,您的方法似乎没问题...

...但是,如果您的应用程序将 运行 承受重负载,您应该解决通常使用作业队列解决的问题,例如:

  • 多线程可能会破坏数据(即使使用 MRI - 全局锁的保护是有限的)。

  • 创建太多线程可能会导致速度显着下降,从而使应用程序无响应。

话虽如此,我将尝试为这两种方法编写示例,将您的方法用于每个请求一个单独的线程,并使用一个非常简单的自制查询。

使用您的代码并稍微修改它以适应这样一个事实,即线程在创建后 return 会立即 运行 在后台运行,您的代码可能如下所示:

def add_contacts_to_list
    # create a proc, so the code doesn't repeat itself (DRY)
    the_job = Proc.new do |contact_list|
        contact_list.each {|c| add_to_list(c, @list_id)}
    end
    # get 100 contacts first
    the_job.call @contacts[0..99]
    # Send the rest to a thread
    thread = Thread.new { the_job.call @contacts[100..-1] }
    # that's it. we now return and the thread works in the background.
end

另一方面,这里有一个简单的 Que 模块(仅用于演示),它可以更好地工作:

module SimpleQue
    QUE = []
    QUE_LOCKER = Mutex.new
    @kill_thread = false

    def self.que_job *args, &job
        raise "Cannot que jobs after que was set to finish!" if @kill_thread
        raise "Missing a job to que!" unless job
        QUE_LOCKER.synchronize { QUE << [job, args] }
        true 
    end

    THREAD = Thread.new do
        begin
            until @kill_thread && QUE.empty?
                sleep 0.5 while QUE_LOCKER.synchronize { QUE.empty? }
                job, args = QUE_LOCKER.synchronize { QUE.shift }
                job.call(*args)
            end
        rescue => e
            # change this to handle errors
            puts e
            retry
        end
    end

    def self.join
        @kill_thread = true
        THREAD.join
    end

end

# test it:
SimpleQue.que_job("hi!") {|s| sleep 1; puts s}
SimpleQue.que_job("nice!") {|s| sleep 1; puts s}
SimpleQue.que_job("hi!") {|s| sleep 1; puts s}
SimpleQue.que_job("hi!") {|s| sleep 1; puts s}
SimpleQue.que_job("yo!") {|s| sleep 1; puts s}
SimpleQue.que_job("bye!") {|s| sleep 1; puts s}
puts "sent everything to the que, now about to wait using #join."
SimpleQue.join
SimpleQue.que_job("hi?") {|s| sleep 1; puts s}

# adjusting your code, ignoring multithreading issues:

def add_contacts_to_list
    # create a proc, so the code doesn't repeat itself (DRY)
    the_job = Proc.new do |contact_list|
        contact_list.each {|c| add_to_list(c, @list_id)}
    end
    # get 100 contacts first
    the_job.call @contacts[0..99]
    # Send the rest to the que
    SimpleQue.que_job(@contacts[100..-1], &the_job)
    # that's it. we now return and the que works in the background.
end


# adjusting your code, adding basic multithreading safety:

def add_contacts_to_list
    # sending a job to the que:
    SimpleQue.que_job(@contacts) do |contact_list|
        contact_list.each {|c| add_to_list(c, @list_id)}
    end
    # I removed: the_job.call @contacts[0..99]
    # it's better if you didn't even start the first 100 contacts...
    # ...it might cause data corruption when different threads do it.
end

我会将其安排到后台(sidekiq 或其他)并将客户端重定向到可以显示联系人的页面。在我看来,对此类功能进行线程管理是不必要的风险。

@contacts.take(100).each { |contact| add_to_list(contact) }
perform_async
return_to_client