Read/write 由 R 中的 2 个独立线程执行的脚本中的相同对象
Read/write the same object from scripts executed by 2 separate threads in R
问题描述:
我正在尝试在连接到 Oanda REST API 的 R 中构建一个自动交易系统。我的操作系统是 Windows 10。
该程序通过 "while (TRUE)" 有两个独立的无限循环组件:"trading engine" 和 "tick data streaming engine"。
该程序的组织方式使两个组件通过使用 "R6" 包创建的队列对象进行通信。
"tick data streaming engine" 从 Oanda 服务器接收报价外汇数据。
然后,它使用 "R6" 包创建的队列实例 class 从数据和 "push" 将滴答事件创建到队列。
"trading engine" "pop" 队列对象和分析出来的事件对象。
如果是Tick数据事件,则分析是否满足交易策略逻辑设定的条件。
如果弹出的报价事件满足条件,交易引擎创建订单事件对象
这是 "pushed" 使用 "R6" 包创建的队列 class 的相同实例到队列的后面。
为此,我想运行 "trading engine" 使用一个线程,运行 "tick data streaming engine" 使用另一个线程。
这两个独立的线程应该能够推送到同一个事件队列实例,并从中弹出。
我的理解是事件队列实例对象应该是两个独立线程访问它的共享对象。
问题:
我的问题是如何在两个单独的线程上通过代码文件 运行ning 实现可以动态修改 (write/read) 的共享对象
或任何其他有助于从两个或多个线程实现同一对象的 read/write 的构造?
我怎么可能使用其他包如 "mmap" 来实现共享内存或任何其他包来实现我的 objective?
尝试次数:
为了测试程序的可行性,我尝试了以下方法:
为了简单性和可重复性,我创建了一个名为 "sharedMatrix" 的共享对象。
它是一个 10 x 1 矩阵,它将在我的实际 Oanda API 程序中扮演事件队列实例的角色。
我使用 "bigmemory" R 包将初始矩阵 "m" 转换为 big.matrix 对象 "x" 并附加它以便它可以成为共享对象:"sharedMatrix" .
通过这样做,我期望 "sharedMatrix" 成为 "seen" 并由每个线程 运行 修改两个单独的代码文件。
#Codefile1
for(i in 1:5){
sharedMatrix[i,1] <- i
}
#Codefile2
for(j in 6:10){
sharedMatrix[j,1] <- j
}
我通过执行以下代码使用 "foreach" 和 "doParallel" R 包获取了两个代码文件:
library(doParallel)
library(bigmemory)
library(foreach)
m <- matrix(nrow = 10) # Create 10 x 1 matrix
x <-as.big.matrix(m) #convert m to bigmatrix
mdesc <- describe(x) # get a description of the matrix
cl <- makeCluster(2,outfile = "Log.txt") # Create a cluster of two threads
#with output file "Log.txt"
registerDoParallel(cl)
clusterExport(cl=cl,varlist=ls()) #export input data to all cores
fileList <-list("Codefile1.R","Codefile2.R") # a list of script files saved
#in current working directory
foreach(f=fileList, .packages = "bigmemory") %dopar% {
sharedMatrix <- attach.big.matrix(mdesc) # attach the matrix via shared
#memory
source(f) # Source the script files for parallel execution
}
令我惊讶的是,这是执行上述代码时的控制台输出:
Error in { : task 1 failed - "object 'sharedMatrix' not found"
在检查了 sharedMatrix 的内容之后,我期待看到这样的内容:
sharedMatrix[]
1 2 3 4 5 6 7 8 9 10
然而这是我看到的:
sharedMatrix[]
Error: object 'sharedMatrix' not found
在我看来,工作线程不 "see" 共享对象 "sharedMatrix"。
任何帮助将不胜感激。谢谢。
使用
library(doParallel)
library(bigmemory)
library(foreach)
m <- matrix(nrow = 10) # Create 10 x 1 matrix
x <-as.big.matrix(m) #convert m to bigmatrix
mdesc <- describe(x) # get a description of the matrix
cl <- makeCluster(2,outfile = "Log.txt") # Create a cluster of two threads
#with output file "Log.txt"
registerDoParallel(cl)
clusterExport(cl=cl,varlist=ls()) #export input data to all cores
fileList <-list("Codefile1.R","Codefile2.R") # a list of script files saved
#in current working directory
foreach(f=fileList, .packages = "bigmemory") %dopar% {
sharedMatrix <- attach.big.matrix(mdesc) # attach the matrix via shared
#memory
source(f, local = TRUE) # Source the script files for parallel execution
NULL
}
parallel::stopCluster(cl)
基本上,您需要 source()
函数中的选项 local = TRUE
。
PS:另外,确保停止集群。
问题描述:
我正在尝试在连接到 Oanda REST API 的 R 中构建一个自动交易系统。我的操作系统是 Windows 10。 该程序通过 "while (TRUE)" 有两个独立的无限循环组件:"trading engine" 和 "tick data streaming engine"。 该程序的组织方式使两个组件通过使用 "R6" 包创建的队列对象进行通信。 "tick data streaming engine" 从 Oanda 服务器接收报价外汇数据。 然后,它使用 "R6" 包创建的队列实例 class 从数据和 "push" 将滴答事件创建到队列。 "trading engine" "pop" 队列对象和分析出来的事件对象。 如果是Tick数据事件,则分析是否满足交易策略逻辑设定的条件。 如果弹出的报价事件满足条件,交易引擎创建订单事件对象 这是 "pushed" 使用 "R6" 包创建的队列 class 的相同实例到队列的后面。
为此,我想运行 "trading engine" 使用一个线程,运行 "tick data streaming engine" 使用另一个线程。 这两个独立的线程应该能够推送到同一个事件队列实例,并从中弹出。 我的理解是事件队列实例对象应该是两个独立线程访问它的共享对象。
问题:
我的问题是如何在两个单独的线程上通过代码文件 运行ning 实现可以动态修改 (write/read) 的共享对象 或任何其他有助于从两个或多个线程实现同一对象的 read/write 的构造? 我怎么可能使用其他包如 "mmap" 来实现共享内存或任何其他包来实现我的 objective?
尝试次数:
为了测试程序的可行性,我尝试了以下方法: 为了简单性和可重复性,我创建了一个名为 "sharedMatrix" 的共享对象。 它是一个 10 x 1 矩阵,它将在我的实际 Oanda API 程序中扮演事件队列实例的角色。 我使用 "bigmemory" R 包将初始矩阵 "m" 转换为 big.matrix 对象 "x" 并附加它以便它可以成为共享对象:"sharedMatrix" . 通过这样做,我期望 "sharedMatrix" 成为 "seen" 并由每个线程 运行 修改两个单独的代码文件。
#Codefile1
for(i in 1:5){
sharedMatrix[i,1] <- i
}
#Codefile2
for(j in 6:10){
sharedMatrix[j,1] <- j
}
我通过执行以下代码使用 "foreach" 和 "doParallel" R 包获取了两个代码文件:
library(doParallel)
library(bigmemory)
library(foreach)
m <- matrix(nrow = 10) # Create 10 x 1 matrix
x <-as.big.matrix(m) #convert m to bigmatrix
mdesc <- describe(x) # get a description of the matrix
cl <- makeCluster(2,outfile = "Log.txt") # Create a cluster of two threads
#with output file "Log.txt"
registerDoParallel(cl)
clusterExport(cl=cl,varlist=ls()) #export input data to all cores
fileList <-list("Codefile1.R","Codefile2.R") # a list of script files saved
#in current working directory
foreach(f=fileList, .packages = "bigmemory") %dopar% {
sharedMatrix <- attach.big.matrix(mdesc) # attach the matrix via shared
#memory
source(f) # Source the script files for parallel execution
}
令我惊讶的是,这是执行上述代码时的控制台输出:
Error in { : task 1 failed - "object 'sharedMatrix' not found"
在检查了 sharedMatrix 的内容之后,我期待看到这样的内容:
sharedMatrix[]
1 2 3 4 5 6 7 8 9 10
然而这是我看到的:
sharedMatrix[]
Error: object 'sharedMatrix' not found
在我看来,工作线程不 "see" 共享对象 "sharedMatrix"。 任何帮助将不胜感激。谢谢。
使用
library(doParallel)
library(bigmemory)
library(foreach)
m <- matrix(nrow = 10) # Create 10 x 1 matrix
x <-as.big.matrix(m) #convert m to bigmatrix
mdesc <- describe(x) # get a description of the matrix
cl <- makeCluster(2,outfile = "Log.txt") # Create a cluster of two threads
#with output file "Log.txt"
registerDoParallel(cl)
clusterExport(cl=cl,varlist=ls()) #export input data to all cores
fileList <-list("Codefile1.R","Codefile2.R") # a list of script files saved
#in current working directory
foreach(f=fileList, .packages = "bigmemory") %dopar% {
sharedMatrix <- attach.big.matrix(mdesc) # attach the matrix via shared
#memory
source(f, local = TRUE) # Source the script files for parallel execution
NULL
}
parallel::stopCluster(cl)
基本上,您需要 source()
函数中的选项 local = TRUE
。
PS:另外,确保停止集群。