Pyspark 同时从数据框中的 2 个列表中删除

Pyspark simultaneously remove from 2 lists in dataframe

我在处理庞大的数据集时遇到了问题,我正在寻找从 2 个列表中删除相同索引项的方法。

让我描述一个例子。

想象一下 Google 搜索 - 页面中包含 12 个 URL 的列表。第一个是广告,最后一个也是,第二个和第七个是图片link。现在我只想要 有机 link

类型可以在列表中随机定位。我正在检查 array_remove 这非常好,但它只能从 1 个列表中删除特定项目,而且我还不够先进,无法弄清楚如何同时为 2 个列表执行此操作。遗憾的是,数据集真的很大,恐怕 posexplode 不适合我。

请记住,这是一列 列表,而不是一列单个项目。

我正在寻找类似

的东西
if "adlink" or "picture" in typelist:
   remove it from typelist and remove same indexed item from urls list
  urls  |  type 
-----------------
[url1,  | [adlink, 
 url2,  |  picture,
 url3,  |  link,
 url4,  |  link,
 url5,  |  link, 
 url6,  |  link,
 url7,  |  picture,
 url8,  |  link,
 url9,  |  link,
 url10, |  link,
 url11, |  link,
 url12] |  adlink]

期望的输出:

  urls  |  type 
-----------------
[url3,  | [link,
 url4,  |  link,
 url5,  |  link, 
 url6,  |  link,
 url8,  |  link,
 url9,  |  link,
 url10, |  link,
 url11] |  link]
df.show()#your dataframe
+---------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|urls                                                                       |type                                                                              |
+---------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|[url1, url2, url3, url4, url5, url6, url7, url8, url9, url10, url11, url12]|[adlink, picture, link, link, link, link, picture, link, link, link, link, adlink]|
+---------------------------------------------------------------------------+----------------------------------------------------------------------------------+ 

您可以像使用 spark2.4 一样使用 higher order functions(我看出来是因为您使用了 array_remove)。首先,您可以使用 arrays_zip zip 数组,然后使用 filterzipped array(type_urls) 过滤掉 type is 'adlink' and 'picture' ,然后使用 columname.arrayname 从压缩列中 select 您想要的列。

Filter(高阶函数),基本上允许您将过滤器应用于高阶数据,而不必分解它(因为您提到 posexplode)Higher order functions

arrays_zip Returns 合并的结构数组,其中第 N 个结构包含输入的所有第 N 个值数组。 arrays_zip Pyspark API docs

from pyspark.sql import functions as F
df.withColumn("type_urls", F.arrays_zip(F.col("urls"),F.col("type"))).select("type_urls")\
  .withColumn("urls1", F.expr("""filter(type_urls, x-> x.type!='adlink' and x.type!='picture')"""))\
  .select(F.col("urls1.urls"), F.col("urls1.type")).show(truncate=False)

+--------------------------------------------------+------------------------------------------------+
|urls                                              |type                                            |
+--------------------------------------------------+------------------------------------------------+
|[url3, url4, url5, url6, url8, url9, url10, url11]|[link, link, link, link, link, link, link, link]|
+--------------------------------------------------+------------------------------------------------+