在 dask 和 pandas 数据帧中应用不兼容
Incompatibility of apply in dask and pandas dataframes
我的 Dask 数据帧 中 triggers
列的示例如下所示:
0 [Total Traffic, DNS, UDP]
1 [TCP RST]
2 [Total Traffic]
3 [IP Private]
4 [ICMP]
Name: triggers, dtype: object
我希望通过执行以下操作创建上述数组的单热编码版本(例如,将 1
放在第 1 行的 DNS
列上)。 pop_triggers
包含 triggers
.
的所有可能值
for trig in pop_triggers:
df[trig] = df.triggers.apply(lambda x: 1 if trig in x else 0)
但是,Total Traffic
、DNS
等列的相关值都包含值 0 而不是 1。当我将它复制到 pandas 数据框并执行相同的操作时,它们会得到预期值。
a = df[[ 'Total Traffic', 'UDP', 'NTP Amplification', 'triggers', 'ICMP']].head()
for trig in pop_triggers:
a[trig] = a.triggers.apply(lambda x: 1 if trig in x else 0)
我在这里错过了什么?是不是因为 dask 懒惰,所以它没有按预期填写值?
编辑 1:
我调查了一些最初设置标志的地方(结果远远低于我的预期,并得到了一些非常奇怪的结果。见下文:
df2 = df[df['Total Traffic']==1]
df2[['triggers']+pop_triggers].head()
输出:
triggers Total Traffic UDP DNS
9380 [ICMP, IP null, IP Private, TCP null, TCP SYN,... 1 1 1
9388 [ICMP, IP null, IP Private, TCP null, TCP SYN,... 1 1 1
19714 [ICMP, IP null, IP Private, UDP, NTP Amplifica... 1 1 1
21556 [IP null] 1 1 1
21557 [IP null] 1 1 1
可能是错误?
编辑 2:
最小工作示例:
triggers = [['Total Traffic', 'DNS', 'UDP'],['TCP RST'],['Total Traffic'],['IP Private'],['ICMP']]*10
df2 = dd.from_pandas(pd.DataFrame({'triggers':triggers}), npartitions=16)
pop_triggers= ['Total Traffic', 'UDP', 'DNS', 'TCP SYN', 'TCP null', 'ICMP']
for trig in pop_triggers:
df2[trig] = df2.triggers.apply(lambda x: 1 if trig in x else 0)
df2.head()
输出:
triggers Total Traffic UDP DNS TCP SYN TCP null ICMP
0 [Total Traffic, DNS, UDP] 0 0 0 0 0 0
1 [TCP RST] 0 0 0 0 0 0
2 [Total Traffic] 0 0 0 0 0 0
3 [IP Private] 0 0 0 0 0 0
注意:我更关心事物的 Dask 方面而不是 Pandas
根据我的经验,dask
中的 apply
使用显式 metadata
效果更好。有一些功能让 dask
尝试猜测 metadata
但我发现它很慢而且并不总是可靠。指南还指定 meta
.
根据我的经验,还有一点是 assign
比 df[col] = ...
效果更好。不确定这是我这边的错误、限制还是误用(我前一段时间研究过,我认为这不是错误)。
编辑: 第一个模式不起作用,用于循环中前一列的 trig
值似乎用后来的值更新,因此在计算时, 这只给出了所有列的最后一个值的结果!
这不是错误,而是不立即计算而延迟计算的 lambda 结果对尚未评估的闭包的组合。请参阅 this discussion 了解它为何不起作用。
我给你的模式是:
cols = {}
for trig in pop_triggers:
meta = (trig, int)
cols[trig] = df.triggers.apply(lambda x: 1 if trig in x else 0, meta=meta)
df = df.assign(**cols)
正确的模式:
(抱歉之前没有测试,因为我 运行 相同的模式,只是我没有在应用函数中使用循环值,所以没有遇到这种行为)
cols = {}
for trig in pop_triggers:
meta = (trig, int)
def fn(x, t):
return 1 if t in x else 0
cols[trig] = ddf.triggers.apply(fn, args=(trig,), meta=meta)
ddf = ddf.assign(**cols)
我的 Dask 数据帧 中 triggers
列的示例如下所示:
0 [Total Traffic, DNS, UDP]
1 [TCP RST]
2 [Total Traffic]
3 [IP Private]
4 [ICMP]
Name: triggers, dtype: object
我希望通过执行以下操作创建上述数组的单热编码版本(例如,将 1
放在第 1 行的 DNS
列上)。 pop_triggers
包含 triggers
.
for trig in pop_triggers:
df[trig] = df.triggers.apply(lambda x: 1 if trig in x else 0)
但是,Total Traffic
、DNS
等列的相关值都包含值 0 而不是 1。当我将它复制到 pandas 数据框并执行相同的操作时,它们会得到预期值。
a = df[[ 'Total Traffic', 'UDP', 'NTP Amplification', 'triggers', 'ICMP']].head()
for trig in pop_triggers:
a[trig] = a.triggers.apply(lambda x: 1 if trig in x else 0)
我在这里错过了什么?是不是因为 dask 懒惰,所以它没有按预期填写值?
编辑 1: 我调查了一些最初设置标志的地方(结果远远低于我的预期,并得到了一些非常奇怪的结果。见下文:
df2 = df[df['Total Traffic']==1]
df2[['triggers']+pop_triggers].head()
输出:
triggers Total Traffic UDP DNS
9380 [ICMP, IP null, IP Private, TCP null, TCP SYN,... 1 1 1
9388 [ICMP, IP null, IP Private, TCP null, TCP SYN,... 1 1 1
19714 [ICMP, IP null, IP Private, UDP, NTP Amplifica... 1 1 1
21556 [IP null] 1 1 1
21557 [IP null] 1 1 1
可能是错误?
编辑 2: 最小工作示例:
triggers = [['Total Traffic', 'DNS', 'UDP'],['TCP RST'],['Total Traffic'],['IP Private'],['ICMP']]*10
df2 = dd.from_pandas(pd.DataFrame({'triggers':triggers}), npartitions=16)
pop_triggers= ['Total Traffic', 'UDP', 'DNS', 'TCP SYN', 'TCP null', 'ICMP']
for trig in pop_triggers:
df2[trig] = df2.triggers.apply(lambda x: 1 if trig in x else 0)
df2.head()
输出:
triggers Total Traffic UDP DNS TCP SYN TCP null ICMP
0 [Total Traffic, DNS, UDP] 0 0 0 0 0 0
1 [TCP RST] 0 0 0 0 0 0
2 [Total Traffic] 0 0 0 0 0 0
3 [IP Private] 0 0 0 0 0 0
注意:我更关心事物的 Dask 方面而不是 Pandas
根据我的经验,dask
中的 apply
使用显式 metadata
效果更好。有一些功能让 dask
尝试猜测 metadata
但我发现它很慢而且并不总是可靠。指南还指定 meta
.
根据我的经验,还有一点是 assign
比 df[col] = ...
效果更好。不确定这是我这边的错误、限制还是误用(我前一段时间研究过,我认为这不是错误)。
编辑: 第一个模式不起作用,用于循环中前一列的 trig
值似乎用后来的值更新,因此在计算时, 这只给出了所有列的最后一个值的结果!
这不是错误,而是不立即计算而延迟计算的 lambda 结果对尚未评估的闭包的组合。请参阅 this discussion 了解它为何不起作用。
我给你的模式是:
cols = {}
for trig in pop_triggers:
meta = (trig, int)
cols[trig] = df.triggers.apply(lambda x: 1 if trig in x else 0, meta=meta)
df = df.assign(**cols)
正确的模式:
(抱歉之前没有测试,因为我 运行 相同的模式,只是我没有在应用函数中使用循环值,所以没有遇到这种行为)
cols = {}
for trig in pop_triggers:
meta = (trig, int)
def fn(x, t):
return 1 if t in x else 0
cols[trig] = ddf.triggers.apply(fn, args=(trig,), meta=meta)
ddf = ddf.assign(**cols)