你如何转置 dask 数据框(将列转换为行)以接近整洁的数据原则

How do you transpose a dask dataframe (convert columns to rows) to approach tidy data principles

TLDR:我从一个 dask 包创建了一个 dask 数据框。 dask 数据框将每个观察(事件)视为一列。因此,我没有为每个事件设置数据行,而是为每个事件设置一列。目标是将列转置为行,就像 pandas 可以使用 df.T.

转置数据帧一样

详情: 我有 sample twitter data from my timeline here。为了达到我的出发点,这里是从磁盘读取 json 到 dask.bag 然后将其转换成 dask.dataframe

的代码
import dask.bag as db
import dask.dataframe as dd
import json


b = db.read_text('./sampleTwitter.json').map(json.loads)
df = b.to_dataframe()
df.head()

问题 我所有的个人事件(即推文)都记录为列副行。为了与 tidy 原则保持一致,我希望每个事件都有行。 pandas has a transpose method for dataframes 和 dask.array 有数组的转置方法。我的目标是做同样的转置操作,但在一个 dask 数据帧上。我该怎么做?

  1. 将行转换为列

编辑解决方案

此代码解决了最初的转置问题,通过定义要保留的列并删除其余列来清理 Twitter json 文件,并通过将函数应用于系列来创建新列。然后,我们将一个更小、更干净的文件写入磁盘。

import dask.dataframe as dd
from dask.delayed import delayed
import dask.bag as db
from dask.diagnostics import ProgressBar,Profiler, ResourceProfiler, CacheProfiler
import pandas as pd
import json
import glob

# pull in all files..
filenames = glob.glob('~/sampleTwitter*.json')


# df = ... # do work with dask.dataframe
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
df = dd.from_delayed(dfs)


# see all the fields of the dataframe 
fields = list(df.columns)

# identify the fields we want to keep
keepers = ['coordinates','id','user','created_at','lang']

# remove the fields i don't want from column list
for f in keepers:
    if f in fields:
        fields.remove(f)

# drop the fields i don't want and only keep whats necessary
df = df.drop(fields,axis=1)

clean = df.coordinates.apply(lambda x: (x['coordinates'][0],x['coordinates'][1]), meta= ('coords',tuple))
df['coords'] = clean

# making new filenames from old filenames to save cleaned files
import re
newfilenames = []
for l in filenames:
    newfilenames.append(re.search('(?<=\/).+?(?=\.)',l).group()+'cleaned.json')
#newfilenames

# custom saver function for dataframes using newfilenames
def saver(frame,filename):
    return frame.to_json('./'+filename)

# converting back to a delayed object
dfs = df.to_delayed()
writes = [(delayed((saver)(df, fn))) for df, fn in zip(dfs, newfilenames)]

# writing the cleaned, MUCH smaller objects back to disk
dd.compute(*writes)

我认为你可以通过完全绕过 bag 来获得你想要的结果,代码如下

import glob

import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

filenames = glob.glob('sampleTwitter*.json')
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
ddf = dd.from_delayed(dfs)