使用行中的第 n 个元素从 RDD 创建对

Create pairs from RDD by using nth element in the row

我用过这个代码:

def process_row(row):
words = row.replace('"', '').split(' ')
for i in range(len(words)):
      #if we find ‘-’ we will replace it with ‘0’
      if(words[-1]=='-'):
          words[i]='0'
return words
return [words(0),words(1), words(2), words(3), words(4), int(words(5))]

nasa = (
nasa_raw.flatMap(process_row)
)
nasa.persist()
for row in nasa.take(10):
print(row)

转换此数据:

in24.inetnebr.com [01/Aug/1995:00:00:01] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt" 200 
1839
 uplherc.upl.com [01/Aug/1995:00:00:07] "GET /" 304 0
uplherc.upl.com [01/Aug/1995:00:00:08] "GET /images/ksclogo-medium.gif" 304 0
uplherc.upl.com [01/Aug/1995:00:00:08] "GET /images/MOSAIC-logosmall.gif" 304 0
 uplherc.upl.com [01/Aug/1995:00:00:08] "GET /images/USA-logosmall.gif" 304 0
ix-esc-ca2-07.ix.netcom.com [01/Aug/1995:00:00:09] "GET /images/launch-logo.gif" 200 1713
uplherc.upl.com [01/Aug/1995:00:00:10] "GET /images/WORLD-logosmall.gif" 304 0
slppp6.intermind.net [01/Aug/1995:00:00:10] "GET /history/skylab/skylab.html" 200 1687
piweba4y.prodigy.com [01/Aug/1995:00:00:10] "GET /images/launchmedium.gif" 200 11853
slppp6.intermind.net [01/Aug/1995:00:00:11] "GET /history/skylab/skylab-small.gif" 200 9202

进入这个流水线 rdd:

in24.inetnebr.com
[01/Aug/1995:00:00:01]
 GET
 /shuttle/missions/sts-68/news/sts-68-mcc-05.txt
 200
 1839
 uplherc.upl.com
 [01/Aug/1995:00:00:07]
 GET
 /

我想创建地址的频率,例如:uplherc.upl.com 通过使用对:

pairs = nasa.map(lambda x: (x , 1))
count_by_resource = pairs.reduceByKey(lambda x, y : x + y)
count_by_resource =  count_by_resource.takeOrdered(10, key = lambda x: -x[1])
spark.createDataFrame(count_by_resource, ['Resource_location','Count']).show(10)

但结果是每个元素频率的东西:

   --------------------+-------+
   |   Resource_location|  Count|
   +--------------------+-------+
   |                 GET|1551681|
   |                 200|1398910|
   |                   0| 225418|

我应该如何引用我感兴趣的元素?

将每一行拆分 space 秒,然后在您主要对域计数感兴趣时创建所有这些值的平面图可能会带来额外的工作,并且肯定会带来额外的开销和处理。

根据提供的示例数据,域是每行的第一项。我还注意到您的某些行以空 space 开头,因此会产生额外的字符串片段。您可以考虑使用 strip 函数来 trim 进程之前的行。

您可以考虑将进程修改为 return 仅字符串的第一位或创建另一个 map 操作。


def extract_domain_from_row(row):
    # if row is a string
    domain = row.strip().split(' ')[0]
    # if you send a list, you could always extract the first item from that list as the domain name
    # domain = row[0]
    return domain.lower()

#intermediary rdd 
nasa_domains = nasa_raw.map(extract_domain_from_row)

# continue operations as desired with `nasa`
pairs = nasa_domains.map(lambda x: (x , 1))
count_by_resource = pairs.reduceByKey(lambda x, y : x + y)
count_by_resource =  count_by_resource.takeOrdered(10, key = lambda x: -x[1])
spark.createDataFrame(count_by_resource, ['Resource_location','Count']).show(10)

输出

+--------------------+-----+
|   Resource_location|Count|
+--------------------+-----+
|     uplherc.upl.com|    5|
|slppp6.intermind.net|    2|
|   in24.inetnebr.com|    1|
|ix-esc-ca2-07.ix....|    1|
|piweba4y.prodigy.com|    1|
+--------------------+-----+

如果第一项不是域,您可能希望使用匹配域的模式来过滤您的集合,请参阅此处的建议 domain regex suggestions