如何将 xgboost 集成到 spark 中? (Python)

How can I integrate xgboost in spark? (Python)

我正在尝试使用 XGBoost 对我在 hive 上的数据训练模型,数据太大,我无法将其转换为 pandas df,因此我必须将 XGBoost 与 spark df 结合使用。 创建 XGBoostEstimator 时出现错误:

TypeError: 'JavaPackage' object is not callable Exception AttributeError: "'NoneType' object has no attribute '_detach'" in ignored

我没有使用 xgboost for spark 的经验,我尝试了一些在线教程,但 none 有效。 我试图隐藏到 pandas df 但数据太大,我总是从 Java 包装器中得到 OutOfMemoryException (我也尝试查找它,但解决方案对我不起作用, 提高执行者内存)。

我关注的最新教程是:

https://towardsdatascience.com/pyspark-and-xgboost-integration-tested-on-the-kaggle-titanic-dataset-4e75a568bdb

放弃XGBoost模块后,我开始使用sparkxgb

spark = create_spark_session('shai', 'dna_pipeline')
# sparkxgboost files 
spark.sparkContext.addPyFile('resources/sparkxgb.zip')

def create_spark_session(username=None, app_name="pipeline"):
    if username is not None:
        os.environ['HADOOP_USER_NAME'] = username

    return SparkSession \
        .builder \
        .master("yarn") \
        .appName(app_name) \
        .config(...) \
        .config(...) \
        .getOrCreate()

def train():
    train_df = spark.table('dna.offline_features_train_full')
    test_df = spark.table('dna.offline_features_test_full')

    from sparkxgb import XGBoostEstimator

    vectorAssembler = VectorAssembler() \
        .setInputCols(train_df.columns) \
        .setOutputCol("features")

    # This is where the program fails
    xgboost = XGBoostEstimator(
        featuresCol="features",
        labelCol="label",
        predictionCol="prediction"
    )

    pipeline = Pipeline().setStages([xgboost])
    pipeline.fit(train_df)

完整的例外是:

Traceback (most recent call last):
  File "/home/elad/DNA/dna/dna/run.py", line 283, in <module>
    main()
  File "/home/elad/DNA/dna/dna/run.py", line 247, in main
    offline_model = train_model(True, home_dir=config['home_dir'], hdfs_client=client)
  File "/home/elad/DNA/dna/dna/run.py", line 222, in train_model
    model = train(offline_mode=offline, spark=spark)
  File "/home/elad/DNA/dna/dna/model/xgboost_train.py", line 285, in train
    predictionCol="prediction"
  File "/home/elad/.conda/envs/DNAenv/lib/python2.7/site-packages/pyspark/__init__.py", line 105, in wrapper
    return func(self, **kwargs)
  File "/tmp/spark-7781039b-6821-42be-96e0-ca4005107318/userFiles-70b3d1de-a78c-4fac-b252-2f99a6761b32/sparkxgb.zip/sparkxgb/xgboost.py", line 115, in __init__
  File "/home/elad/.conda/envs/DNAenv/lib/python2.7/site-packages/pyspark/ml/wrapper.py", line 63, in _new_java_obj
    return java_obj(*java_args)
TypeError: 'JavaPackage' object is not callable
Exception AttributeError: "'NoneType' object has no attribute '_detach'" in <bound method XGBoostEstimator.__del__ of XGBoostEstimator_4f54b37156fb0a113233> ignored

我不知道为什么会出现此异常,也不知道如何将 sparkxgb 正确集成到我的代码中。

不胜感激。

谢谢

较新的 Apache Spark(2.3.0) 版本没有 XGBoost。你应该试试 Pyspark。您必须将 Spark 数据帧转换为 pandas 数据帧。

这是一篇很好的文章,提供了工作流程和解释 xgboost and spark

好的,我又看了一遍你的post,你说数据集太大了。也许你应该试试 Apache Arrow。检查这个 Speeding up Pyspark with Apache Arrow

您可以尝试使用 LightGBM,而不是使用 XGBoost,这是一种类似且可以说更好(至少更快)的算法。它在 pyspark 中开箱即用,您可以 read more here

在对这个模块进行了一天的调试之后,问题只是错误地提交了 jars。 我在本地下载了 jar 并使用 pyspark 提交它们:

PYSPARK_SUBMIT_ARGS=--jars resources/xgboost4j-0.72.jar,resources/xgboost4j-spark-0.72.jar

这解决了问题。