spring boot API - 文档处理和在文档上并行执行 python 脚本
spring boot API - document processing and executing python script on documents in parallel
场景:
- 在我的应用程序中,有 3 个进程正在将共享驱动器上的文档复制到各自的文件夹中。
只要(通过任何进程)将任何文档复制到共享驱动器上,目录观察程序 (Java) 代码就会拾取该文档并调用 Python 脚本使用"Process" 并对文档进行一些处理。代码片段如下:
Process pr = Runtime.getRuntime().exec(pythonCommand);
// retrieve output from python script
BufferedReader bfr = new BufferedReader(new InputStreamReader(pr.getInputStream()));
String line = "";
while ((line = bfr.readLine()) != null) {
// display each output line from python script
logger.info(line);
}
pr.waitFor();
目前我的代码等待 python 文档上的代码执行完成。只有在那之后,它才会拿起下一个文件。 Python 代码需要 30 秒才能完成。
- 处理文档后,文档从当前文件夹移动到存档或错误文件夹。
- 请在下面找到该场景的屏幕截图:
问题是什么?
- 我的代码按顺序处理文档,我需要并行处理文档。
- 由于 Python 代码大约需要 30 秒,目录观察程序创建的一些事件也会丢失。
- 如果短时间内收到大约 400 个文档,文档处理将停止。
我在找什么?
- 并行处理文档的设计解决方案。
- 在任何文档处理失败的情况下,必须自动处理待处理的文档。
- 我也尝试了 spring 启动计划,但仍然只能按顺序处理文档。
- 是否可以作为后台进程并行调用Python代码。
抱歉问了这么长的问题,但我已经坚持了很多天,并且已经看过很多类似的问题。
谢谢!
您可以尝试 python here
中的多处理模块
由于 GIL,Python 的线程不会加速计算
CPU 绑定。
这个问题可能重复Solving embarassingly parallel problems using Python multiprocessing
创建两个队列(阻塞队列):
执行队列
错误队列
创建两个线程(可以根据需要创建任意多个线程):
第一线程
SecondThread
概念:
生产者-消费者
生产者(目录观察线程):
目录观察者
消费者:
第一线程
SecondThread
详情:
- 两个队列的增删方法必须同步。一瞬间只有一个线程会访问 method.If 一个线程正在访问关键区域(生产者或消费者),其余线程将等待轮到他们。
首先Producer会开始工作,最初,consumer处于休眠阶段。
为什么?同步 运行 整个系统。
你将如何得到它?处理后休眠生产者线程,如果消费者在作业开始时休眠。
第一个生产者或消费者将获取队列中的锁,处理工作并释放它。在此期间,如果任何线程(生产者或消费者)来获取数据,他们将等待轮到他们(使用线程池的概念)。
只要(通过任何进程)将任何文档复制到共享驱动器上,目录观察程序(生产者)代码就会获取该文档的路径并同步存储在执行队列中。
现在Consumer会来取数据,FirstThread先被唤醒,去executionQueue取数据。 FirstThread会获取锁定的executionQueue,然后取数据释放其中的锁。如果在 SecondThread 之间来获取数据,它将等待轮到他。
从执行队列中获取数据后,FirstThread 将从位置获取文档并使用获取的文档调用 Python 脚本。
在 SecondThread 获取锁和获取路径之间,开始处理与 FirstThread 相同的概念。
几秒钟后,FirstThread 将完成他的工作,然后它将转到 executionQueue 并再次获取锁并获取文件路径并释放锁并开始处理相同的工作和休息SecondThread 也一样...
在该文件的处理中,如果发生任何错误,则将该路径信息发送到 errorQueue 方法,并在一天结束时或当您的系统空闲时使用相同的概念或手动分析该 errorQueue 信息.
如果executionQueue中没有可用数据,此时producer threads(Directory watcher)已经处于休眠阶段。然后consumer线程会来executionQueue取数据,取不到数据就进入休眠状态,比如1分钟,1分钟后又会醒来去取数据等等...
在每个步骤日志中,信息将帮助您更好地分析。
使用这个概念你可以运行整个系统并行。
一个选项是使用 JDK 提供的 ExecutorService
,它可以执行 Runnable
和 Callable
任务。您将需要创建一个实现 Runnable
的 class,它将执行您的 Python 脚本,并且在收到新文档后,您需要创建此 [=35= 的新实例] 并将其传递给 ExecutorService
.
为了展示这是如何工作的,我们将使用一个简单的 Python 脚本,该脚本将线程名称作为参数,打印其执行的开始时间,休眠 10 秒并打印结束时间:
import time
import sys
print "%s start : %s" % (sys.argv[1], time.ctime())
time.sleep(10)
print "%s end : %s" % (sys.argv[1], time.ctime())
首先,我们实现 class ,即 运行 脚本,并将在构造函数中获得的名称传递给它:
class ScriptRunner implements Runnable {
private String thread;
ScriptRunner(String thread) {
this.thread = thread;
}
@Override
public void run() {
try {
ProcessBuilder ps = new ProcessBuilder("py", "test.py", thread);
ps.redirectErrorStream(true);
Process pr = ps.start();
try (BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()))) {
String line;
while ((line = in.readLine()) != null) {
System.out.println(line);
}
}
pr.waitFor();
} catch (Exception e) {
e.printStackTrace();
}
}
}
然后我们创建 main
方法,该方法创建 ExecutorService
具有固定数量的 5 个并行线程,并向其传递 10 个 ScriptRunner
实例,中断时间为 1 秒:
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 1; i <= 10; i++) {
executor.submit(new ScriptRunner("Thread_" + i));
Thread.sleep(1000);
}
executor.shutdown();
}
如果我们运行这个方法,我们会看到服务,由于指定的限制,最多有5个parallel-运行ning任务,其余进入队列并从释放的线程开始:
Thread_1 start : Sat Nov 23 11:40:14 2019
Thread_1 end : Sat Nov 23 11:40:24 2019 // the first task is completed..
Thread_2 start : Sat Nov 23 11:40:15 2019
...
Thread_5 end : Sat Nov 23 11:40:28 2019
Thread_6 start : Sat Nov 23 11:40:24 2019 // ..and the sixth is started
...
Thread_10 end : Sat Nov 23 11:40:38 2019
场景:
- 在我的应用程序中,有 3 个进程正在将共享驱动器上的文档复制到各自的文件夹中。
只要(通过任何进程)将任何文档复制到共享驱动器上,目录观察程序 (Java) 代码就会拾取该文档并调用 Python 脚本使用"Process" 并对文档进行一些处理。代码片段如下:
Process pr = Runtime.getRuntime().exec(pythonCommand); // retrieve output from python script BufferedReader bfr = new BufferedReader(new InputStreamReader(pr.getInputStream())); String line = ""; while ((line = bfr.readLine()) != null) { // display each output line from python script logger.info(line); } pr.waitFor();
目前我的代码等待 python 文档上的代码执行完成。只有在那之后,它才会拿起下一个文件。 Python 代码需要 30 秒才能完成。
- 处理文档后,文档从当前文件夹移动到存档或错误文件夹。
- 请在下面找到该场景的屏幕截图:
问题是什么?
- 我的代码按顺序处理文档,我需要并行处理文档。
- 由于 Python 代码大约需要 30 秒,目录观察程序创建的一些事件也会丢失。
- 如果短时间内收到大约 400 个文档,文档处理将停止。
我在找什么?
- 并行处理文档的设计解决方案。
- 在任何文档处理失败的情况下,必须自动处理待处理的文档。
- 我也尝试了 spring 启动计划,但仍然只能按顺序处理文档。
- 是否可以作为后台进程并行调用Python代码。
抱歉问了这么长的问题,但我已经坚持了很多天,并且已经看过很多类似的问题。 谢谢!
您可以尝试 python here
中的多处理模块由于 GIL,Python 的线程不会加速计算 CPU 绑定。
这个问题可能重复Solving embarassingly parallel problems using Python multiprocessing
创建两个队列(阻塞队列):
执行队列
错误队列创建两个线程(可以根据需要创建任意多个线程):
第一线程
SecondThread概念:
生产者-消费者生产者(目录观察线程):
目录观察者消费者:
第一线程
SecondThread
详情:
- 两个队列的增删方法必须同步。一瞬间只有一个线程会访问 method.If 一个线程正在访问关键区域(生产者或消费者),其余线程将等待轮到他们。
首先Producer会开始工作,最初,consumer处于休眠阶段。
为什么?同步 运行 整个系统。
你将如何得到它?处理后休眠生产者线程,如果消费者在作业开始时休眠。第一个生产者或消费者将获取队列中的锁,处理工作并释放它。在此期间,如果任何线程(生产者或消费者)来获取数据,他们将等待轮到他们(使用线程池的概念)。
只要(通过任何进程)将任何文档复制到共享驱动器上,目录观察程序(生产者)代码就会获取该文档的路径并同步存储在执行队列中。
现在Consumer会来取数据,FirstThread先被唤醒,去executionQueue取数据。 FirstThread会获取锁定的executionQueue,然后取数据释放其中的锁。如果在 SecondThread 之间来获取数据,它将等待轮到他。
从执行队列中获取数据后,FirstThread 将从位置获取文档并使用获取的文档调用 Python 脚本。
在 SecondThread 获取锁和获取路径之间,开始处理与 FirstThread 相同的概念。
几秒钟后,FirstThread 将完成他的工作,然后它将转到 executionQueue 并再次获取锁并获取文件路径并释放锁并开始处理相同的工作和休息SecondThread 也一样...
在该文件的处理中,如果发生任何错误,则将该路径信息发送到 errorQueue 方法,并在一天结束时或当您的系统空闲时使用相同的概念或手动分析该 errorQueue 信息.
如果executionQueue中没有可用数据,此时producer threads(Directory watcher)已经处于休眠阶段。然后consumer线程会来executionQueue取数据,取不到数据就进入休眠状态,比如1分钟,1分钟后又会醒来去取数据等等...
在每个步骤日志中,信息将帮助您更好地分析。
使用这个概念你可以运行整个系统并行。
一个选项是使用 JDK 提供的 ExecutorService
,它可以执行 Runnable
和 Callable
任务。您将需要创建一个实现 Runnable
的 class,它将执行您的 Python 脚本,并且在收到新文档后,您需要创建此 [=35= 的新实例] 并将其传递给 ExecutorService
.
为了展示这是如何工作的,我们将使用一个简单的 Python 脚本,该脚本将线程名称作为参数,打印其执行的开始时间,休眠 10 秒并打印结束时间:
import time
import sys
print "%s start : %s" % (sys.argv[1], time.ctime())
time.sleep(10)
print "%s end : %s" % (sys.argv[1], time.ctime())
首先,我们实现 class ,即 运行 脚本,并将在构造函数中获得的名称传递给它:
class ScriptRunner implements Runnable {
private String thread;
ScriptRunner(String thread) {
this.thread = thread;
}
@Override
public void run() {
try {
ProcessBuilder ps = new ProcessBuilder("py", "test.py", thread);
ps.redirectErrorStream(true);
Process pr = ps.start();
try (BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()))) {
String line;
while ((line = in.readLine()) != null) {
System.out.println(line);
}
}
pr.waitFor();
} catch (Exception e) {
e.printStackTrace();
}
}
}
然后我们创建 main
方法,该方法创建 ExecutorService
具有固定数量的 5 个并行线程,并向其传递 10 个 ScriptRunner
实例,中断时间为 1 秒:
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 1; i <= 10; i++) {
executor.submit(new ScriptRunner("Thread_" + i));
Thread.sleep(1000);
}
executor.shutdown();
}
如果我们运行这个方法,我们会看到服务,由于指定的限制,最多有5个parallel-运行ning任务,其余进入队列并从释放的线程开始:
Thread_1 start : Sat Nov 23 11:40:14 2019
Thread_1 end : Sat Nov 23 11:40:24 2019 // the first task is completed..
Thread_2 start : Sat Nov 23 11:40:15 2019
...
Thread_5 end : Sat Nov 23 11:40:28 2019
Thread_6 start : Sat Nov 23 11:40:24 2019 // ..and the sixth is started
...
Thread_10 end : Sat Nov 23 11:40:38 2019