如何通过 pandas 编写 csv 文件并定期在 R 中读取它?
How to write a csv file via pandas and read it in R at regular intervals?
背景
我实验室中的驾驶模拟器 PC 生成我通过 python socket
接收的数据。数据每 1/60 秒生成一次。我不断地将它保存到一个名为 position.csv
的 csv 文件中。我还想阅读 R 中的 position.csv
以在 shiny
应用程序中使用。我每0.2秒读一次。
问题
当我 运行 R 中的 shiny
应用程序时,python 抛出 PermissionError: [Errno 13] Permission denied: 'position.csv'
Python 将数据保存到 csv 文件的脚本:
import socket
import struct
import pandas as pd
UDP_IP = "127.0.0.1"
UDP_PORT = 9000
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind((UDP_IP, UDP_PORT))
while True:
data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
fields = struct.unpack_from('=ddd', data)
print(fields[0],fields[1],fields[2])
dict = {'y': fields[0], 'x': fields[1], 'z': fields[2]}
my_data = pd.DataFrame([dict], columns=dict.keys())
open("position.csv", "w")
my_data.to_csv("position.csv", index=False)
用于读取 csv 文件并在应用程序中使用的 R 脚本:
library(shinydashboard)
library(dplyr)
library(ggplot2)
library(shiny)
library(data.table)
# ui----
ui <- dashboardPage(skin = "black",
dashboardHeader(title = "Dashboard"),
dashboardSidebar(
sidebarMenu(
menuItem("Maps", tabName = "navigation", icon = icon("compass"))
)),
dashboardBody(
tabItems(
# First tab content
tabItem(tabName = "navigation",
fluidRow(
tags$style(type="text/css", ".recalculating {opacity: 1.0;}"),
plotOutput("plot1")
)
)
)
)
)
# server----
server <- function(input, output, session) {
position <- reactivePoll(200, session,
# This function returns the time that log_file was last modified
checkFunc = function() {
if (file.exists("position.csv"))
file.info("position.csv")$mtime[1]
else
""
},
# This function returns the content of log_file
valueFunc = function() {
data.table::fread("position.csv")
}
)
xl1 <- reactive({position()$x - 1000})
xl2 <- reactive({position()$x + 1000})
yl1 <- reactive({position()$y - 800})
yl2 <- reactive({position()$y + 800})
output$plot1 <- renderPlot({
ggplot() +
geom_point(data = position(),
aes(x, y),
color = "red", size = 5) +
coord_equal( xlim = c(xl1(), xl2()),
ylim = c(yl1(), yl2())) +
theme_void()
})
cancel.onSessionEnded <- session$onSessionEnded(function() {
stopApp()
})
cancel.onSessionEnded()
}
shinyApp(ui, server)
问题
如何成功读取和写入 position.csv
文件?
即使不看这个闪亮的部分,每 0.2 秒访问一次 CSV 文件的文件系统肯定是一个巨大的瓶颈,不太可能是考虑性能的最佳方式。
您获得权限被拒绝的最可能原因可能(我尚未测试)是由于 file-locking,其中 pandas 暂时在写入文件时锁定了文件,R 试图过早地读取它。坦率地说,即使您没有收到“拒绝”错误,尝试在文件 mid-write 时读取文件也是可行的,这意味着数据不完整。写入和读取事件应该有一些协调,这样就不会发生这种情况。
一些想法,未经测试(但有mutual-file-access经验):
不是Filesystem-based
一种替代方法是使用某种形式的 streaming-data 机制,例如 Redis。这可以是一个简单的“主题”(fifo 队列)或经过更多思考(并取决于您的需要)Pub/Sub 设置。有了这个,pandas 会将其新数据推送到主题或 pubsub 主题,并且一个(如果是普通主题)或 one-or-more(如果是 pubsub)消费者将获得完整的数据。
优点:
- 不使用文件系统,因此最大的瓶颈将是网络带宽,可能比文件系统的延迟低得多,并且写入和读取始终是原子的(这意味着没有 read-while-being-written 您面临的问题);
- 使用 pub/sub,任何客户端都可以“延迟”启动并获取所有过去的数据(如果需要),而不会影响任何其他消费者。意识到“另一个消费者”可能只是您在监视事物,不一定是 full-time 处理程序。
缺点:
- 需要 Redis(或 Apache Kafka 或 RabbitMQ 或类似的东西)作为网络上某处 的服务,越近(拓扑越好)。
- 需要对 pandas 和 R 之间的合作架构进行更多思考。这将带来好处。
这实际上很容易在您的 dev-computer 上使用 Docker 完成:Redis 映像是免费的并且性能非常好,我经常将它用于与此类似的目的。 (Docker 不是必需的,没有它 Redis 安装也很好,交给你了。)
(Python 有 redis-py
, R has redux
.)
Filesystem-based
如果您必须使用 file-based,那么您需要使用一种方法来完全降低 reading-while-writing 的风险。虽然 file-writing 不是原子的(这就是你遇到问题的原因),但 file-renaming 是。将文件写入临时文件(在同一个文件系统上,但不在 R 将读取的位置或名称),然后一旦 written/closed,重命名它以便 R 可以看到它。
例如,假设您的惯例是使用 /some/path/file1234.csv
,其中 1234
可能会随着每次写入而递增。 (你可能有时间,没关系。)让我们限制 R,让它只看到以文字 .csv
结尾的文件(不难)。在 pandas 中写入 /some/path/file1234.csv.temp
,完成后(你 close()
在 python 中!),将其重命名为 /some/path/file1234.csv
。一旦文件被重命名,R 应该能够毫无干扰地阅读它。
优点:
- 体系结构没有变化,可能是实施和测试最快的。
缺点:
- 仍然基于文件系统,这意味着复合延迟来自:网络(如果使用网络文件系统)、OS、HDD 等
如果您真的很好奇,MailDir 是我用于相同目的的目录结构,尽管它是在一个巨大的 GPFS (NFS-like) 上工作的,其中延迟为 file-creation 可以超过 10-15 秒, file-locking 不受支持(可靠),如果没有我上面提到的 file-renaming 原子性,我会沉没。当然,您不需要 maildir 结构的“复杂性”(不多,但相对更复杂)来在 pandas 和 R 之间传递文件,但是...... atomic file-renaming 的前提具有优先权并且已经有很多人在做这件事。 (Maildir 的规模 非常 嗯,afaict。我唯一没有尝试弄清楚的是 filesystem-based pubsub in maildir ...)
(Python 和 R 都以原子方式执行 file-renaming,不需要 non-standard modules/packages。)
可能更改为 python(未经测试):
+ import os
# ...
dict = {'y': fields[0], 'x': fields[1], 'z': fields[2]}
my_data = pd.DataFrame([dict], columns=dict.keys())
- open("position.csv", "w")
- my_data.to_csv("position.csv", index=False)
+ my_data.to_csv("position.csv.temp", index=False)
+ try:
+ os.remove("position.csv")
+ except:
+ pass
+ os.rename("position.csv.temp", "position.csv")
在重命名之前 os.remove("position.csv")
可能就足够了,而不是使用 .old
,我还没有测试过什么最有效。我不太关心 reading-processes,因为在大多数系统上,文件本身(不管文件系统上的索引节点如何)应该允许 R 继续读取,即使文件名已被删除。同样,没有经过很好的测试。
其他注意事项
文件格式:虽然 CSV 是标准且简单的格式,但您可能需要考虑读写速度更快的格式,例如 feather
。 python 和 R 都有 modules/packages 来支持这个。我没有这方面的经验,但也许https://rstudio-pubs-static.s3.amazonaws.com/207316_edcc0ea0a7c04ea5a63833aaea7051fb.html 是一个有用的开始。 (还有“镶木地板”,我也没有这方面的经验。)
Down-sampling:你真的需要每0.2秒读取一次数据吗? Shiny (R!) 并不能保证立即读取每个文件,所以无论如何都会有一些“溢出”。我建议 down-sampling 为每秒 1 次或每 2-3 秒一次,具体取决于您真正打算如何使用它。我发现有些 use-cases 与此建议不兼容,交给您了。
Redis 选项:
Redis 运行 在我的笔记本电脑上使用默认端口 6379。我正在使用 docker,所以我开始使用它。 (请注意,如果您有特殊的网络设置 and/or 无法使用 host-mode 监听端口,这可能需要调整。它应该可以正常工作,但不需要其他配置。)
$ docker run -p "6379:6379" --name some-redis -d redis
在python中:
import pandas as pd
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
df1 = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]})
df2 = pd.DataFrame(data={'col1': [11, 12], 'col2': [13, 14]})
r.rpush('carsim', df1.to_json(orient='records'))
r.rpush('carsim', df2.to_json(orient='records'))
在 R 中:
R <- redux::hiredis()
popped <- R$LPOP("carsim")
popped
# [1] "[{\"col1\":1,\"col2\":3},{\"col1\":2,\"col2\":4}]"
jsonlite::fromJSON(popped)
# col1 col2
# 1 1 3
# 2 2 4
popped <- R$LPOP("carsim")
jsonlite::fromJSON(popped)
# col1 col2
# 1 11 13
# 2 12 14
popped <- R$LPOP("carsim")
popped
# NULL
改编 maildir-like 个目录
这个技巧使用了 maildirs 的前提并做了一点妥协(只要只有一个 file-writer,car-sim 应该没问题)。
在python中:
- my_data.to_csv("position.csv", index=False)
+ filename = '{:.3f}.csv'.format(time.time())
+ my_data.to_csv('tmp/' + filename, index=False)
+ os.rename('tmp/' + filename, 'new/' + filename)
在 R 中:
files <- list.files("new/", full.names = TRUE)
dat <- rbindlist(lapply(files, fread))
file.rename(files, "cur/")
背景
我实验室中的驾驶模拟器 PC 生成我通过 python socket
接收的数据。数据每 1/60 秒生成一次。我不断地将它保存到一个名为 position.csv
的 csv 文件中。我还想阅读 R 中的 position.csv
以在 shiny
应用程序中使用。我每0.2秒读一次。
问题
当我 运行 R 中的 shiny
应用程序时,python 抛出 PermissionError: [Errno 13] Permission denied: 'position.csv'
Python 将数据保存到 csv 文件的脚本:
import socket
import struct
import pandas as pd
UDP_IP = "127.0.0.1"
UDP_PORT = 9000
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind((UDP_IP, UDP_PORT))
while True:
data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
fields = struct.unpack_from('=ddd', data)
print(fields[0],fields[1],fields[2])
dict = {'y': fields[0], 'x': fields[1], 'z': fields[2]}
my_data = pd.DataFrame([dict], columns=dict.keys())
open("position.csv", "w")
my_data.to_csv("position.csv", index=False)
用于读取 csv 文件并在应用程序中使用的 R 脚本:
library(shinydashboard)
library(dplyr)
library(ggplot2)
library(shiny)
library(data.table)
# ui----
ui <- dashboardPage(skin = "black",
dashboardHeader(title = "Dashboard"),
dashboardSidebar(
sidebarMenu(
menuItem("Maps", tabName = "navigation", icon = icon("compass"))
)),
dashboardBody(
tabItems(
# First tab content
tabItem(tabName = "navigation",
fluidRow(
tags$style(type="text/css", ".recalculating {opacity: 1.0;}"),
plotOutput("plot1")
)
)
)
)
)
# server----
server <- function(input, output, session) {
position <- reactivePoll(200, session,
# This function returns the time that log_file was last modified
checkFunc = function() {
if (file.exists("position.csv"))
file.info("position.csv")$mtime[1]
else
""
},
# This function returns the content of log_file
valueFunc = function() {
data.table::fread("position.csv")
}
)
xl1 <- reactive({position()$x - 1000})
xl2 <- reactive({position()$x + 1000})
yl1 <- reactive({position()$y - 800})
yl2 <- reactive({position()$y + 800})
output$plot1 <- renderPlot({
ggplot() +
geom_point(data = position(),
aes(x, y),
color = "red", size = 5) +
coord_equal( xlim = c(xl1(), xl2()),
ylim = c(yl1(), yl2())) +
theme_void()
})
cancel.onSessionEnded <- session$onSessionEnded(function() {
stopApp()
})
cancel.onSessionEnded()
}
shinyApp(ui, server)
问题
如何成功读取和写入 position.csv
文件?
即使不看这个闪亮的部分,每 0.2 秒访问一次 CSV 文件的文件系统肯定是一个巨大的瓶颈,不太可能是考虑性能的最佳方式。
您获得权限被拒绝的最可能原因可能(我尚未测试)是由于 file-locking,其中 pandas 暂时在写入文件时锁定了文件,R 试图过早地读取它。坦率地说,即使您没有收到“拒绝”错误,尝试在文件 mid-write 时读取文件也是可行的,这意味着数据不完整。写入和读取事件应该有一些协调,这样就不会发生这种情况。
一些想法,未经测试(但有mutual-file-access经验):
不是Filesystem-based
一种替代方法是使用某种形式的 streaming-data 机制,例如 Redis。这可以是一个简单的“主题”(fifo 队列)或经过更多思考(并取决于您的需要)Pub/Sub 设置。有了这个,pandas 会将其新数据推送到主题或 pubsub 主题,并且一个(如果是普通主题)或 one-or-more(如果是 pubsub)消费者将获得完整的数据。
优点:
- 不使用文件系统,因此最大的瓶颈将是网络带宽,可能比文件系统的延迟低得多,并且写入和读取始终是原子的(这意味着没有 read-while-being-written 您面临的问题);
- 使用 pub/sub,任何客户端都可以“延迟”启动并获取所有过去的数据(如果需要),而不会影响任何其他消费者。意识到“另一个消费者”可能只是您在监视事物,不一定是 full-time 处理程序。
缺点:
- 需要 Redis(或 Apache Kafka 或 RabbitMQ 或类似的东西)作为网络上某处 的服务,越近(拓扑越好)。
- 需要对 pandas 和 R 之间的合作架构进行更多思考。这将带来好处。
这实际上很容易在您的 dev-computer 上使用 Docker 完成:Redis 映像是免费的并且性能非常好,我经常将它用于与此类似的目的。 (Docker 不是必需的,没有它 Redis 安装也很好,交给你了。)
(Python 有 redis-py
, R has redux
.)
Filesystem-based
如果您必须使用 file-based,那么您需要使用一种方法来完全降低 reading-while-writing 的风险。虽然 file-writing 不是原子的(这就是你遇到问题的原因),但 file-renaming 是。将文件写入临时文件(在同一个文件系统上,但不在 R 将读取的位置或名称),然后一旦 written/closed,重命名它以便 R 可以看到它。
例如,假设您的惯例是使用 /some/path/file1234.csv
,其中 1234
可能会随着每次写入而递增。 (你可能有时间,没关系。)让我们限制 R,让它只看到以文字 .csv
结尾的文件(不难)。在 pandas 中写入 /some/path/file1234.csv.temp
,完成后(你 close()
在 python 中!),将其重命名为 /some/path/file1234.csv
。一旦文件被重命名,R 应该能够毫无干扰地阅读它。
优点:
- 体系结构没有变化,可能是实施和测试最快的。
缺点:
- 仍然基于文件系统,这意味着复合延迟来自:网络(如果使用网络文件系统)、OS、HDD 等
如果您真的很好奇,MailDir 是我用于相同目的的目录结构,尽管它是在一个巨大的 GPFS (NFS-like) 上工作的,其中延迟为 file-creation 可以超过 10-15 秒, file-locking 不受支持(可靠),如果没有我上面提到的 file-renaming 原子性,我会沉没。当然,您不需要 maildir 结构的“复杂性”(不多,但相对更复杂)来在 pandas 和 R 之间传递文件,但是...... atomic file-renaming 的前提具有优先权并且已经有很多人在做这件事。 (Maildir 的规模 非常 嗯,afaict。我唯一没有尝试弄清楚的是 filesystem-based pubsub in maildir ...)
(Python 和 R 都以原子方式执行 file-renaming,不需要 non-standard modules/packages。)
可能更改为 python(未经测试):
+ import os
# ...
dict = {'y': fields[0], 'x': fields[1], 'z': fields[2]}
my_data = pd.DataFrame([dict], columns=dict.keys())
- open("position.csv", "w")
- my_data.to_csv("position.csv", index=False)
+ my_data.to_csv("position.csv.temp", index=False)
+ try:
+ os.remove("position.csv")
+ except:
+ pass
+ os.rename("position.csv.temp", "position.csv")
在重命名之前 os.remove("position.csv")
可能就足够了,而不是使用 .old
,我还没有测试过什么最有效。我不太关心 reading-processes,因为在大多数系统上,文件本身(不管文件系统上的索引节点如何)应该允许 R 继续读取,即使文件名已被删除。同样,没有经过很好的测试。
其他注意事项
文件格式:虽然 CSV 是标准且简单的格式,但您可能需要考虑读写速度更快的格式,例如
feather
。 python 和 R 都有 modules/packages 来支持这个。我没有这方面的经验,但也许https://rstudio-pubs-static.s3.amazonaws.com/207316_edcc0ea0a7c04ea5a63833aaea7051fb.html 是一个有用的开始。 (还有“镶木地板”,我也没有这方面的经验。)Down-sampling:你真的需要每0.2秒读取一次数据吗? Shiny (R!) 并不能保证立即读取每个文件,所以无论如何都会有一些“溢出”。我建议 down-sampling 为每秒 1 次或每 2-3 秒一次,具体取决于您真正打算如何使用它。我发现有些 use-cases 与此建议不兼容,交给您了。
Redis 选项:
Redis 运行 在我的笔记本电脑上使用默认端口 6379。我正在使用 docker,所以我开始使用它。 (请注意,如果您有特殊的网络设置 and/or 无法使用 host-mode 监听端口,这可能需要调整。它应该可以正常工作,但不需要其他配置。)
$ docker run -p "6379:6379" --name some-redis -d redis
在python中:
import pandas as pd
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
df1 = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]})
df2 = pd.DataFrame(data={'col1': [11, 12], 'col2': [13, 14]})
r.rpush('carsim', df1.to_json(orient='records'))
r.rpush('carsim', df2.to_json(orient='records'))
在 R 中:
R <- redux::hiredis()
popped <- R$LPOP("carsim")
popped
# [1] "[{\"col1\":1,\"col2\":3},{\"col1\":2,\"col2\":4}]"
jsonlite::fromJSON(popped)
# col1 col2
# 1 1 3
# 2 2 4
popped <- R$LPOP("carsim")
jsonlite::fromJSON(popped)
# col1 col2
# 1 11 13
# 2 12 14
popped <- R$LPOP("carsim")
popped
# NULL
改编 maildir-like 个目录
这个技巧使用了 maildirs 的前提并做了一点妥协(只要只有一个 file-writer,car-sim 应该没问题)。
在python中:
- my_data.to_csv("position.csv", index=False)
+ filename = '{:.3f}.csv'.format(time.time())
+ my_data.to_csv('tmp/' + filename, index=False)
+ os.rename('tmp/' + filename, 'new/' + filename)
在 R 中:
files <- list.files("new/", full.names = TRUE)
dat <- rbindlist(lapply(files, fread))
file.rename(files, "cur/")