Pyspark 与 Joblib 给我模棱两可的结果

Pyspark with Joblib giving me ambiguous result

我正在尝试从 teradata 获取数据--

select ... from table1_1
union all
select .. from table_2
union all
select ... from table_3

注意:一个或多个 select .. 可能会失败,但不应导致整个合并失败。

from .base import Base
from joblib import Parallel, delayed
import re
import pandas as pd

class TeradataWithSpark(Base):
    def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
        super().__init__(spark, host, port, database, username, password)
        self._reader = self._spark.read.format("jdbc") \
                .option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
                .option("user", self._username) \
                .option("password", self._password) \
                .option("driver", "com.teradata.jdbc.TeraDriver")

    def run_query(self, query, return_pandasDF=True):
        spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
        if return_pandasDF:
            return spark_df.toPandas()
        else:
            return spark_df

    def run_queries_and_union_all(self, queries, return_pandasDF=True):
        def run(query):
            try:
                return self._reader.option('dbtable', f"({query}) as tbl").load().toPandas()
            except Exception as e:
                return None
        
        dfs = Parallel(n_jobs=10, prefer='threads')(delayed(run)(q) for q in queries)
        concat_df = pd.concat(dfs).reset_index(drop=True)
        if return_pandasDF:
            return concat_df
        else:
            return self._spark.createDataFrame(concat_df)

    def split_query_and_run_individually(self, query, separator='union all', return_pandasDF=True):
        queries = re.split(separator, query, flags=re.IGNORECASE)
        return self.run_queries_and_union_all(queries, return_pandasDF)

如您所见,split_query_and_run_individually 方法根据 union all 拆分查询,然后在并行线程 n_jobs=10.

中运行所有子查询

但我面临的问题是数据损坏是这样的

n_jobs = 1
                               src_tbl  total_count   data_date
0     dsl_dim_mdm_....................           61  2022-03-17
1     dsl_agg_call....................      3992202  2022-03-27
2      dsl_call_ac....................       924719  2022-03-27
3      dsl_dim_acc....................         4762  2022-03-31
4                 ....................         6821  2022-03-31
5     dsl_dim_geo_....................      8610038  2022-04-05
6              dsl....................        67116  2022-03-31
7           dsl_rl....................      2087669  2022-04-06
8             dsl_....................          154  2022-04-01
9             dsl_....................        85630  2022-03-27
10  dsl_selling_da....................           53  2021-03-03
11  dsl_speaker_ev....................        17765  2022-03-31
12       dsl_speak....................        26269  2022-08-24
13   dsl_speaker_e....................         4202  2022-04-05
14              ds....................          268  2022-03-31
15      dsl_rltn_r....................       255794  2022-03-18
16     dsl_rltn_nr....................        12088  2022-03-18
17        dsl_rapp....................        81182  2022-01-01
18   dsl_dim_physi....................       109299  2022-03-31
19             dsl....................         4265  2022-02-01
20         dsl_fac....................       117978  2022-04-03
21      dsl_coachi....................          242  2022-03-31
22   dsl_speaker_e....................        16653  2022-03-31
23     dsl_dim_cal....................        17817  2099-12-31
24    dsl_rltn_nrt....................         3304  2022-02-01
Time took: 3.4742537260055544 minutes
-----------
n_jobs=10
                              src_tbl  total_count   data_date
0             dsl_sel................        85630  2022-03-27
1             dsl_sel................        85630  2022-03-27
2             dsl_sel................        85630  2022-03-27
3             dsl_sel................        85630  2022-03-27
4             dsl_sel................        85630  2022-03-27
5             dsl_sel................        85630  2022-03-27
6             dsl_sel................        85630  2022-03-27
7             dsl_sel................        85630  2022-03-27
8             dsl_sel................        85630  2022-03-27
9             dsl_sel................        85630  2022-03-27
10  dsl_speaker_event................        17765  2022-03-31
11   dsl_speaker_even................         4202  2022-04-05
12   dsl_speaker_even................         4202  2022-04-05
13              dsl_s................          268  2022-03-31
14        dsl_rapper_................        81182  2022-01-01
15        dsl_rapper_................        81182  2022-01-01
16     dsl_rltn_nrtl_................        12088  2022-03-18
17        dsl_rapper_................        81182  2022-01-01
18   dsl_dim_physicia................       109299  2022-03-31
19             dsl_cu................         4265  2022-02-01
20         dsl_fact_f................       117978  2022-04-03
21      dsl_coaching_................          242  2022-03-31
22   dsl_speaker_even................        16653  2022-03-31
23     dsl_dim_call_c................        17817  2099-12-31
24    dsl_rltn_nrtl_r................         3304  2022-02-01
Time took: 1.8048373858133953 minutes
-----------
n_jobs=-1
                            src_tbl  total_count   data_date
0   dsl_dim_acc....................         4762  2022-03-31
1   dsl_dim_acc....................         4762  2022-03-31
2   dsl_dim_acc....................         4762  2022-03-31
3   dsl_dim_acc....................         4762  2022-03-31
4   dsl_dim_acc....................         4762  2022-03-31
5   dsl_dim_acc....................         4762  2022-03-31
6   dsl_dim_acc....................         4762  2022-03-31
7   dsl_dim_acc....................         4762  2022-03-31
8   dsl_dim_acc....................         4762  2022-03-31
9   dsl_dim_acc....................         4762  2022-03-31
10  dsl_dim_acc....................         4762  2022-03-31
11  dsl_dim_acc....................         4762  2022-03-31
12  dsl_dim_acc....................         4762  2022-03-31
13  dsl_dim_acc....................         4762  2022-03-31
14  dsl_dim_acc....................         4762  2022-03-31
15  dsl_dim_acc....................         4762  2022-03-31
16  dsl_dim_acc....................         4762  2022-03-31
17  dsl_dim_acc....................         4762  2022-03-31
18  dsl_dim_acc....................         4762  2022-03-31
19  dsl_dim_acc....................         4762  2022-03-31
20  dsl_dim_acc....................         4762  2022-03-31
21  dsl_dim_acc....................         4762  2022-03-31
22  dsl_dim_acc....................         4762  2022-03-31
23  dsl_dim_acc....................         4762  2022-03-31
24  dsl_dim_acc....................         4762  2022-03-31
25  dsl_dim_acc....................         4762  2022-03-31
-----------

如您所见,随着线程数的增加,结果变得不明确。发生的情况是每个查询的结果相互重叠。

我还使用 teradatasql 库实现了相同的 class,它与 n_jobs=-1 一起工作得很好。我认为 self._reader.option('dbtable', f"({query}) as tbl").load() 在线程中变得一团糟。我试过 ThreadpoolExecutor 但结果相似。有谁知道如何解决这个问题? 版本

Python 3.6.8
Spark 2.4.0-cdh6.3.4

感谢@pltc,这是一个解决方案。尽管与 teradatasql 多线程库相比它非常慢,尽管 FAIR 调度程序在

from .base import Base
import re
import pandas as pd
from pyspark.sql import DataFrame
from functools import reduce

class TeradataWithSpark(Base):
    def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
        super().__init__(spark, host, port, database, username, password)
        self._reader = self._spark.read.format("jdbc") \
                .option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
                .option("user", self._username) \
                .option("password", self._password) \
                .option("driver", "com.teradata.jdbc.TeraDriver")

    def run_query(self, query, return_pandasDF=True):
        # spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
        # if return_pandasDF:
        #     return spark_df.toPandas()
        # else:
        #     return spark_df
        return self.split_query_and_run_individually(query, r'union all', return_pandasDF)

    def run_queries_and_union_all(self, queries, return_pandasDF=True):     
        dataframes = []
        for each_query in queries:
            try:
                spark_df = self._reader.option('dbtable', f"({each_query}) as tbl").load()
                dataframes.append(spark_df)
            except Exception as e:
                # simply ignoring the query
                print(f'Error while reading the query {each_query}')
            
        concat_sparkDf = reduce(DataFrame.unionAll, dataframes)
        if return_pandasDF:
            return concat_sparkDf.toPandas()
        else:
            return concat_sparkDf

    def split_query_and_run_individually(self, query, separator=r'union all', return_pandasDF=True):
        queries = re.split(separator, query, flags=re.IGNORECASE)
        return self.run_queries_and_union_all(queries, return_pandasDF)