Pyspark 与 Joblib 给我模棱两可的结果
Pyspark with Joblib giving me ambiguous result
我正在尝试从 teradata 获取数据--
select ... from table1_1
union all
select .. from table_2
union all
select ... from table_3
注意:一个或多个 select ..
可能会失败,但不应导致整个合并失败。
from .base import Base
from joblib import Parallel, delayed
import re
import pandas as pd
class TeradataWithSpark(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
self._reader = self._spark.read.format("jdbc") \
.option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
.option("user", self._username) \
.option("password", self._password) \
.option("driver", "com.teradata.jdbc.TeraDriver")
def run_query(self, query, return_pandasDF=True):
spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
if return_pandasDF:
return spark_df.toPandas()
else:
return spark_df
def run_queries_and_union_all(self, queries, return_pandasDF=True):
def run(query):
try:
return self._reader.option('dbtable', f"({query}) as tbl").load().toPandas()
except Exception as e:
return None
dfs = Parallel(n_jobs=10, prefer='threads')(delayed(run)(q) for q in queries)
concat_df = pd.concat(dfs).reset_index(drop=True)
if return_pandasDF:
return concat_df
else:
return self._spark.createDataFrame(concat_df)
def split_query_and_run_individually(self, query, separator='union all', return_pandasDF=True):
queries = re.split(separator, query, flags=re.IGNORECASE)
return self.run_queries_and_union_all(queries, return_pandasDF)
如您所见,split_query_and_run_individually
方法根据 union all
拆分查询,然后在并行线程 n_jobs=10
.
中运行所有子查询
但我面临的问题是数据损坏是这样的
n_jobs = 1
src_tbl total_count data_date
0 dsl_dim_mdm_.................... 61 2022-03-17
1 dsl_agg_call.................... 3992202 2022-03-27
2 dsl_call_ac.................... 924719 2022-03-27
3 dsl_dim_acc.................... 4762 2022-03-31
4 .................... 6821 2022-03-31
5 dsl_dim_geo_.................... 8610038 2022-04-05
6 dsl.................... 67116 2022-03-31
7 dsl_rl.................... 2087669 2022-04-06
8 dsl_.................... 154 2022-04-01
9 dsl_.................... 85630 2022-03-27
10 dsl_selling_da.................... 53 2021-03-03
11 dsl_speaker_ev.................... 17765 2022-03-31
12 dsl_speak.................... 26269 2022-08-24
13 dsl_speaker_e.................... 4202 2022-04-05
14 ds.................... 268 2022-03-31
15 dsl_rltn_r.................... 255794 2022-03-18
16 dsl_rltn_nr.................... 12088 2022-03-18
17 dsl_rapp.................... 81182 2022-01-01
18 dsl_dim_physi.................... 109299 2022-03-31
19 dsl.................... 4265 2022-02-01
20 dsl_fac.................... 117978 2022-04-03
21 dsl_coachi.................... 242 2022-03-31
22 dsl_speaker_e.................... 16653 2022-03-31
23 dsl_dim_cal.................... 17817 2099-12-31
24 dsl_rltn_nrt.................... 3304 2022-02-01
Time took: 3.4742537260055544 minutes
-----------
n_jobs=10
src_tbl total_count data_date
0 dsl_sel................ 85630 2022-03-27
1 dsl_sel................ 85630 2022-03-27
2 dsl_sel................ 85630 2022-03-27
3 dsl_sel................ 85630 2022-03-27
4 dsl_sel................ 85630 2022-03-27
5 dsl_sel................ 85630 2022-03-27
6 dsl_sel................ 85630 2022-03-27
7 dsl_sel................ 85630 2022-03-27
8 dsl_sel................ 85630 2022-03-27
9 dsl_sel................ 85630 2022-03-27
10 dsl_speaker_event................ 17765 2022-03-31
11 dsl_speaker_even................ 4202 2022-04-05
12 dsl_speaker_even................ 4202 2022-04-05
13 dsl_s................ 268 2022-03-31
14 dsl_rapper_................ 81182 2022-01-01
15 dsl_rapper_................ 81182 2022-01-01
16 dsl_rltn_nrtl_................ 12088 2022-03-18
17 dsl_rapper_................ 81182 2022-01-01
18 dsl_dim_physicia................ 109299 2022-03-31
19 dsl_cu................ 4265 2022-02-01
20 dsl_fact_f................ 117978 2022-04-03
21 dsl_coaching_................ 242 2022-03-31
22 dsl_speaker_even................ 16653 2022-03-31
23 dsl_dim_call_c................ 17817 2099-12-31
24 dsl_rltn_nrtl_r................ 3304 2022-02-01
Time took: 1.8048373858133953 minutes
-----------
n_jobs=-1
src_tbl total_count data_date
0 dsl_dim_acc.................... 4762 2022-03-31
1 dsl_dim_acc.................... 4762 2022-03-31
2 dsl_dim_acc.................... 4762 2022-03-31
3 dsl_dim_acc.................... 4762 2022-03-31
4 dsl_dim_acc.................... 4762 2022-03-31
5 dsl_dim_acc.................... 4762 2022-03-31
6 dsl_dim_acc.................... 4762 2022-03-31
7 dsl_dim_acc.................... 4762 2022-03-31
8 dsl_dim_acc.................... 4762 2022-03-31
9 dsl_dim_acc.................... 4762 2022-03-31
10 dsl_dim_acc.................... 4762 2022-03-31
11 dsl_dim_acc.................... 4762 2022-03-31
12 dsl_dim_acc.................... 4762 2022-03-31
13 dsl_dim_acc.................... 4762 2022-03-31
14 dsl_dim_acc.................... 4762 2022-03-31
15 dsl_dim_acc.................... 4762 2022-03-31
16 dsl_dim_acc.................... 4762 2022-03-31
17 dsl_dim_acc.................... 4762 2022-03-31
18 dsl_dim_acc.................... 4762 2022-03-31
19 dsl_dim_acc.................... 4762 2022-03-31
20 dsl_dim_acc.................... 4762 2022-03-31
21 dsl_dim_acc.................... 4762 2022-03-31
22 dsl_dim_acc.................... 4762 2022-03-31
23 dsl_dim_acc.................... 4762 2022-03-31
24 dsl_dim_acc.................... 4762 2022-03-31
25 dsl_dim_acc.................... 4762 2022-03-31
-----------
如您所见,随着线程数的增加,结果变得不明确。发生的情况是每个查询的结果相互重叠。
我还使用 teradatasql
库实现了相同的 class,它与 n_jobs=-1 一起工作得很好。我认为 self._reader.option('dbtable', f"({query}) as tbl").load()
在线程中变得一团糟。我试过 ThreadpoolExecutor
但结果相似。有谁知道如何解决这个问题?
版本
Python 3.6.8
Spark 2.4.0-cdh6.3.4
感谢@pltc,这是一个解决方案。尽管与 teradatasql
多线程库相比它非常慢,尽管 FAIR 调度程序在
from .base import Base
import re
import pandas as pd
from pyspark.sql import DataFrame
from functools import reduce
class TeradataWithSpark(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
self._reader = self._spark.read.format("jdbc") \
.option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
.option("user", self._username) \
.option("password", self._password) \
.option("driver", "com.teradata.jdbc.TeraDriver")
def run_query(self, query, return_pandasDF=True):
# spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
# if return_pandasDF:
# return spark_df.toPandas()
# else:
# return spark_df
return self.split_query_and_run_individually(query, r'union all', return_pandasDF)
def run_queries_and_union_all(self, queries, return_pandasDF=True):
dataframes = []
for each_query in queries:
try:
spark_df = self._reader.option('dbtable', f"({each_query}) as tbl").load()
dataframes.append(spark_df)
except Exception as e:
# simply ignoring the query
print(f'Error while reading the query {each_query}')
concat_sparkDf = reduce(DataFrame.unionAll, dataframes)
if return_pandasDF:
return concat_sparkDf.toPandas()
else:
return concat_sparkDf
def split_query_and_run_individually(self, query, separator=r'union all', return_pandasDF=True):
queries = re.split(separator, query, flags=re.IGNORECASE)
return self.run_queries_and_union_all(queries, return_pandasDF)
我正在尝试从 teradata 获取数据--
select ... from table1_1
union all
select .. from table_2
union all
select ... from table_3
注意:一个或多个 select ..
可能会失败,但不应导致整个合并失败。
from .base import Base
from joblib import Parallel, delayed
import re
import pandas as pd
class TeradataWithSpark(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
self._reader = self._spark.read.format("jdbc") \
.option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
.option("user", self._username) \
.option("password", self._password) \
.option("driver", "com.teradata.jdbc.TeraDriver")
def run_query(self, query, return_pandasDF=True):
spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
if return_pandasDF:
return spark_df.toPandas()
else:
return spark_df
def run_queries_and_union_all(self, queries, return_pandasDF=True):
def run(query):
try:
return self._reader.option('dbtable', f"({query}) as tbl").load().toPandas()
except Exception as e:
return None
dfs = Parallel(n_jobs=10, prefer='threads')(delayed(run)(q) for q in queries)
concat_df = pd.concat(dfs).reset_index(drop=True)
if return_pandasDF:
return concat_df
else:
return self._spark.createDataFrame(concat_df)
def split_query_and_run_individually(self, query, separator='union all', return_pandasDF=True):
queries = re.split(separator, query, flags=re.IGNORECASE)
return self.run_queries_and_union_all(queries, return_pandasDF)
如您所见,split_query_and_run_individually
方法根据 union all
拆分查询,然后在并行线程 n_jobs=10
.
但我面临的问题是数据损坏是这样的
n_jobs = 1
src_tbl total_count data_date
0 dsl_dim_mdm_.................... 61 2022-03-17
1 dsl_agg_call.................... 3992202 2022-03-27
2 dsl_call_ac.................... 924719 2022-03-27
3 dsl_dim_acc.................... 4762 2022-03-31
4 .................... 6821 2022-03-31
5 dsl_dim_geo_.................... 8610038 2022-04-05
6 dsl.................... 67116 2022-03-31
7 dsl_rl.................... 2087669 2022-04-06
8 dsl_.................... 154 2022-04-01
9 dsl_.................... 85630 2022-03-27
10 dsl_selling_da.................... 53 2021-03-03
11 dsl_speaker_ev.................... 17765 2022-03-31
12 dsl_speak.................... 26269 2022-08-24
13 dsl_speaker_e.................... 4202 2022-04-05
14 ds.................... 268 2022-03-31
15 dsl_rltn_r.................... 255794 2022-03-18
16 dsl_rltn_nr.................... 12088 2022-03-18
17 dsl_rapp.................... 81182 2022-01-01
18 dsl_dim_physi.................... 109299 2022-03-31
19 dsl.................... 4265 2022-02-01
20 dsl_fac.................... 117978 2022-04-03
21 dsl_coachi.................... 242 2022-03-31
22 dsl_speaker_e.................... 16653 2022-03-31
23 dsl_dim_cal.................... 17817 2099-12-31
24 dsl_rltn_nrt.................... 3304 2022-02-01
Time took: 3.4742537260055544 minutes
-----------
n_jobs=10
src_tbl total_count data_date
0 dsl_sel................ 85630 2022-03-27
1 dsl_sel................ 85630 2022-03-27
2 dsl_sel................ 85630 2022-03-27
3 dsl_sel................ 85630 2022-03-27
4 dsl_sel................ 85630 2022-03-27
5 dsl_sel................ 85630 2022-03-27
6 dsl_sel................ 85630 2022-03-27
7 dsl_sel................ 85630 2022-03-27
8 dsl_sel................ 85630 2022-03-27
9 dsl_sel................ 85630 2022-03-27
10 dsl_speaker_event................ 17765 2022-03-31
11 dsl_speaker_even................ 4202 2022-04-05
12 dsl_speaker_even................ 4202 2022-04-05
13 dsl_s................ 268 2022-03-31
14 dsl_rapper_................ 81182 2022-01-01
15 dsl_rapper_................ 81182 2022-01-01
16 dsl_rltn_nrtl_................ 12088 2022-03-18
17 dsl_rapper_................ 81182 2022-01-01
18 dsl_dim_physicia................ 109299 2022-03-31
19 dsl_cu................ 4265 2022-02-01
20 dsl_fact_f................ 117978 2022-04-03
21 dsl_coaching_................ 242 2022-03-31
22 dsl_speaker_even................ 16653 2022-03-31
23 dsl_dim_call_c................ 17817 2099-12-31
24 dsl_rltn_nrtl_r................ 3304 2022-02-01
Time took: 1.8048373858133953 minutes
-----------
n_jobs=-1
src_tbl total_count data_date
0 dsl_dim_acc.................... 4762 2022-03-31
1 dsl_dim_acc.................... 4762 2022-03-31
2 dsl_dim_acc.................... 4762 2022-03-31
3 dsl_dim_acc.................... 4762 2022-03-31
4 dsl_dim_acc.................... 4762 2022-03-31
5 dsl_dim_acc.................... 4762 2022-03-31
6 dsl_dim_acc.................... 4762 2022-03-31
7 dsl_dim_acc.................... 4762 2022-03-31
8 dsl_dim_acc.................... 4762 2022-03-31
9 dsl_dim_acc.................... 4762 2022-03-31
10 dsl_dim_acc.................... 4762 2022-03-31
11 dsl_dim_acc.................... 4762 2022-03-31
12 dsl_dim_acc.................... 4762 2022-03-31
13 dsl_dim_acc.................... 4762 2022-03-31
14 dsl_dim_acc.................... 4762 2022-03-31
15 dsl_dim_acc.................... 4762 2022-03-31
16 dsl_dim_acc.................... 4762 2022-03-31
17 dsl_dim_acc.................... 4762 2022-03-31
18 dsl_dim_acc.................... 4762 2022-03-31
19 dsl_dim_acc.................... 4762 2022-03-31
20 dsl_dim_acc.................... 4762 2022-03-31
21 dsl_dim_acc.................... 4762 2022-03-31
22 dsl_dim_acc.................... 4762 2022-03-31
23 dsl_dim_acc.................... 4762 2022-03-31
24 dsl_dim_acc.................... 4762 2022-03-31
25 dsl_dim_acc.................... 4762 2022-03-31
-----------
如您所见,随着线程数的增加,结果变得不明确。发生的情况是每个查询的结果相互重叠。
我还使用 teradatasql
库实现了相同的 class,它与 n_jobs=-1 一起工作得很好。我认为 self._reader.option('dbtable', f"({query}) as tbl").load()
在线程中变得一团糟。我试过 ThreadpoolExecutor
但结果相似。有谁知道如何解决这个问题?
版本
Python 3.6.8
Spark 2.4.0-cdh6.3.4
感谢@pltc,这是一个解决方案。尽管与 teradatasql
多线程库相比它非常慢,尽管 FAIR 调度程序在
from .base import Base
import re
import pandas as pd
from pyspark.sql import DataFrame
from functools import reduce
class TeradataWithSpark(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
self._reader = self._spark.read.format("jdbc") \
.option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
.option("user", self._username) \
.option("password", self._password) \
.option("driver", "com.teradata.jdbc.TeraDriver")
def run_query(self, query, return_pandasDF=True):
# spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
# if return_pandasDF:
# return spark_df.toPandas()
# else:
# return spark_df
return self.split_query_and_run_individually(query, r'union all', return_pandasDF)
def run_queries_and_union_all(self, queries, return_pandasDF=True):
dataframes = []
for each_query in queries:
try:
spark_df = self._reader.option('dbtable', f"({each_query}) as tbl").load()
dataframes.append(spark_df)
except Exception as e:
# simply ignoring the query
print(f'Error while reading the query {each_query}')
concat_sparkDf = reduce(DataFrame.unionAll, dataframes)
if return_pandasDF:
return concat_sparkDf.toPandas()
else:
return concat_sparkDf
def split_query_and_run_individually(self, query, separator=r'union all', return_pandasDF=True):
queries = re.split(separator, query, flags=re.IGNORECASE)
return self.run_queries_and_union_all(queries, return_pandasDF)