如何通过 pandas 编写 csv 文件并定期在 R 中读取它?

How to write a csv file via pandas and read it in R at regular intervals?

背景

我实验室中的驾驶模拟器 PC 生成我通过 python socket 接收的数据。数据每 1/60 秒生成一次。我不断地将它保存到一个名为 position.csv 的 csv 文件中。我还想阅读 R 中的 position.csv 以在 shiny 应用程序中使用。我每0.2秒读一次。

问题

当我 运行 R 中的 shiny 应用程序时,python 抛出 PermissionError: [Errno 13] Permission denied: 'position.csv'

Python 将数据保存到 csv 文件的脚本:

import socket
import struct
import pandas as pd

UDP_IP = "127.0.0.1"
UDP_PORT = 9000

sock = socket.socket(socket.AF_INET, # Internet
                     socket.SOCK_DGRAM) # UDP
sock.bind((UDP_IP, UDP_PORT))

while True:
    data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
    fields = struct.unpack_from('=ddd', data)
    print(fields[0],fields[1],fields[2])
    
    
    dict = {'y': fields[0], 'x': fields[1], 'z': fields[2]}
    my_data = pd.DataFrame([dict], columns=dict.keys())
    open("position.csv", "w")
    my_data.to_csv("position.csv", index=False)

用于读取 csv 文件并在应用程序中使用的 R 脚本:

library(shinydashboard)
library(dplyr)
library(ggplot2)
library(shiny)
library(data.table)



# ui----
ui <- dashboardPage(skin = "black", 
  dashboardHeader(title = "Dashboard"),
  dashboardSidebar(
    sidebarMenu(
      menuItem("Maps", tabName = "navigation", icon = icon("compass"))
  )),
  dashboardBody(
    tabItems(
      # First tab content
      tabItem(tabName = "navigation",
              fluidRow(
                tags$style(type="text/css", ".recalculating {opacity: 1.0;}"),
                plotOutput("plot1")
              )
      )
    )
  )
)


# server----
server <- function(input, output, session) {
 
  

  position <- reactivePoll(200, session,
                       # This function returns the time that log_file was last modified
                       checkFunc = function() {
                         if (file.exists("position.csv"))
                           file.info("position.csv")$mtime[1]
                         else
                           ""
                       },
                       # This function returns the content of log_file
                       valueFunc = function() {
                         data.table::fread("position.csv")
                       }
  )
  

  
  
  xl1 <-  reactive({position()$x - 1000})
  xl2 <-  reactive({position()$x + 1000})
  
  yl1 <-  reactive({position()$y - 800})
  yl2 <-  reactive({position()$y + 800})
  
  
  
 output$plot1 <- renderPlot({
    
    
    ggplot() +
      geom_point(data = position(),
                 aes(x, y),
                 color = "red", size = 5) +
      coord_equal(    xlim = c(xl1(), xl2()),
                      ylim = c(yl1(), yl2())) +
      theme_void()
    
  })
  
  
  
  
  cancel.onSessionEnded <- session$onSessionEnded(function() {

    stopApp()
  })
  
  cancel.onSessionEnded()
  

  
  
}

shinyApp(ui, server)

问题

如何成功读取和写入 position.csv 文件?

即使不看这个闪亮的部分,每 0.2 秒访问一次 CSV 文件的文件系统肯定是一个巨大的瓶颈,不太可能是考虑性能的最佳方式。

您获得权限被拒绝的最可能原因可能(我尚未测试)是由于 file-locking,其中 pandas 暂时在写入文件时锁定了文件,R 试图过早地读取它。坦率地说,即使您没有收到“拒绝”错误,尝试在文件 mid-write 时读取文件也是可行的,这意味着数据不完整。写入和读取事件应该有一些协调,这样就不会发生这种情况。

一些想法,未经测试(但有mutual-file-access经验):

不是Filesystem-based

一种替代方法是使用某种形式的 streaming-data 机制,例如 Redis。这可以是一个简单的“主题”(fifo 队列)或经过更多思考(并取决于您的需要)Pub/Sub 设置。有了这个,pandas 会将其新数据推送到主题或 pubsub 主题,并且一个(如果是普通主题)或 one-or-more(如果是 pubsub)消费者将获得完整的数据。

优点:

  • 不使用文件系统,因此最大的瓶颈将是网络带宽,可能比文件系统的延迟低得多,并且写入和读取始终是原子的(这意味着没有 read-while-being-written 您面临的问题);
  • 使用 pub/sub,任何客户端都可以“延迟”启动并获取所有过去的数据(如果需要),而不会影响任何其他消费者。意识到“另一个消费者”可能只是您在监视事物,不一定是 full-time 处理程序。

缺点:

  • 需要 Redis(或 Apache Kafka 或 RabbitMQ 或类似的东西)作为网络上某处 的服务,越近(拓扑越好)。
  • 需要对 pandas 和 R 之间的合作架构进行更多思考。这将带来好处。

这实际上很容易在您的 dev-computer 上使用 Docker 完成:Redis 映像是免费的并且性能非常好,我经常将它用于与此类似的目的。 (Docker 不是必需的,没有它 Redis 安装也很好,交给你了。)

(Python 有 redis-py, R has redux.)

Filesystem-based

如果您必须使用 file-based,那么您需要使用一种方法来完全降低 reading-while-writing 的风险。虽然 file-writing 不是原子的(这就是你遇到问题的原因),但 file-renaming 是。将文件写入临时文件(在同一个文件系统上,但不在 R 将读取的位置或名称),然后一旦 written/closed,重命名它以便 R 可以看到它。

例如,假设您的惯例是使用 /some/path/file1234.csv,其中 1234 可能会随着每次写入而递增。 (你可能有时间,没关系。)让我们限制 R,让它只看到以文字 .csv 结尾的文件(不难)。在 pandas 中写入 /some/path/file1234.csv.temp,完成后(你 close() 在 python 中!),将其重命名为 /some/path/file1234.csv。一旦文件被重命名,R 应该能够毫无干扰地阅读它。

优点:

  • 体系结构没有变化,可能是实施和测试最快的。

缺点:

  • 仍然基于文件系统,这意味着复合延迟来自:网络(如果使用网络文件系统)、OS、HDD 等

如果您真的很好奇,MailDir 是我用于相同目的的目录结构,尽管它是在一个巨大的 GPFS (NFS-like) 上工作的,其中延迟为 file-creation 可以超过 10-15 秒, file-locking 不受支持(可靠),如果没有我上面提到的 file-renaming 原子性,我会沉没。当然,您不需要 maildir 结构的“复杂性”(不多,但相对更复杂)来在 pandas 和 R 之间传递文件,但是...... atomic file-renaming 的前提具有优先权并且已经有很多人在做这件事。 (Maildir 的规模 非常 嗯,afaict。我唯一没有尝试弄清楚的是 filesystem-based pubsub in maildir ...)

(Python 和 R 都以原子方式执行 file-renaming,不需要 non-standard modules/packages。)

可能更改为 python(未经测试):

+ import os

  # ...

      dict = {'y': fields[0], 'x': fields[1], 'z': fields[2]}
      my_data = pd.DataFrame([dict], columns=dict.keys())
-     open("position.csv", "w")
-     my_data.to_csv("position.csv", index=False)
+     my_data.to_csv("position.csv.temp", index=False)
+     try:
+       os.remove("position.csv")
+     except:
+       pass
+     os.rename("position.csv.temp", "position.csv")

在重命名之前 os.remove("position.csv") 可能就足够了,而不是使用 .old,我还没有测试过什么最有效。我不太关心 reading-processes,因为在大多数系统上,文件本身(不管文件系统上的索引节点如何)应该允许 R 继续读取,即使文件名已被删除。同样,没有经过很好的测试。

其他注意事项

  • 文件格式:虽然 CSV 是标准且简单的格式,但您可能需要考虑读写速度更快的格式,例如 feather。 python 和 R 都有 modules/packages 来支持这个。我没有这方面的经验,但也许https://rstudio-pubs-static.s3.amazonaws.com/207316_edcc0ea0a7c04ea5a63833aaea7051fb.html 是一个有用的开始。 (还有“镶木地板”,我也没有这方面的经验。)

  • Down-sampling:你真的需要每0.2秒读取一次数据吗? Shiny (R!) 并不能保证立即读取每个文件,所以无论如何都会有一些“溢出”。我建议 down-sampling 为每秒 1 次或每 2-3 秒一次,具体取决于您真正打算如何使用它。我发现有些 use-cases 与此建议不兼容,交给您了。


Redis 选项:

Redis 运行 在我的笔记本电脑上使用默认端口 6379。我正在使用 docker,所以我开始使用它。 (请注意,如果您有特殊的网络设置 and/or 无法使用 host-mode 监听端口,这可能需要调整。它应该可以正常工作,但不需要其他配置。)

$ docker run -p "6379:6379" --name some-redis -d redis

在python中:

import pandas as pd
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
df1 = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]})
df2 = pd.DataFrame(data={'col1': [11, 12], 'col2': [13, 14]})
r.rpush('carsim', df1.to_json(orient='records'))
r.rpush('carsim', df2.to_json(orient='records'))

在 R 中:

R <- redux::hiredis()
popped <- R$LPOP("carsim")
popped
# [1] "[{\"col1\":1,\"col2\":3},{\"col1\":2,\"col2\":4}]"
jsonlite::fromJSON(popped)
#   col1 col2
# 1    1    3
# 2    2    4
popped <- R$LPOP("carsim")
jsonlite::fromJSON(popped)
#   col1 col2
# 1   11   13
# 2   12   14
popped <- R$LPOP("carsim")
popped
# NULL

改编 maildir-like 个目录

这个技巧使用了 maildirs 的前提并做了一点妥协(只要只有一个 file-writer,car-sim 应该没问题)。

在python中:

-     my_data.to_csv("position.csv", index=False)
+     filename = '{:.3f}.csv'.format(time.time())
+     my_data.to_csv('tmp/' + filename, index=False)
+     os.rename('tmp/' + filename, 'new/' + filename)

在 R 中:

files <- list.files("new/", full.names = TRUE)
dat <- rbindlist(lapply(files, fread))
file.rename(files, "cur/")