XGBoost Spark 每个工人集成一个模型

XGBoost Spark One Model Per Worker Integration

正在尝试完成此笔记本 https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1526931011080774/3624187670661048/6320440561800420/latest.html

使用 spark 版本 2.4.3 和 xgboost 0.90

尝试执行时不断出现此错误 ValueError: bad input shape ()...

features = inputTrainingDF.select("features").collect()
lables = inputTrainingDF.select("label").collect()

X = np.asarray(map(lambda v: v[0].toArray(), features))
Y = np.asarray(map(lambda v: v[0], lables))

xgbClassifier = xgb.XGBClassifier(max_depth=3, seed=18238, objective='binary:logistic')

model = xgbClassifier.fit(X, Y)
ValueError: bad input shape () 

def trainXGbModel(partitionKey, labelAndFeatures):
  X = np.asarray(map(lambda v: v[1].toArray(), labelAndFeatures))
  Y = np.asarray(map(lambda v: v[0], labelAndFeatures))
  xgbClassifier = xgb.XGBClassifier(max_depth=3, seed=18238, objective='binary:logistic' )
  model =  xgbClassifier.fit(X, Y)
  return [partitionKey, model]

xgbModels = inputTrainingDF\
.select("education", "label", "features")\
.rdd\
.map(lambda row: [row[0], [row[1], row[2]]])\
.groupByKey()\
.map(lambda v: trainXGbModel(v[0], list(v[1])))

xgbModels.take(1)
ValueError: bad input shape ()

您可以在笔记本中看到它正在为发布它的人工作。我的猜测是它与 XY np.asarray() 映射有关,因为逻辑只是试图将标签和特征映射到函数,但形状是空的。使用此代码使其正常工作

pandasDF = inputTrainingDF.toPandas()
series = pandasDF['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
features = np.apply_along_axis(lambda x : x[0], 1, series)
target = pandasDF['label'].values
xgbClassifier = xgb.XGBClassifier(max_depth=3, seed=18238, objective='binary:logistic' )
model = xgbClassifier.fit(features, target)

不过想融入原来的函数调用&明白为什么原来的notebook不行。非常感谢能提供额外的帮助来解决此问题!

您可能正在使用 python3。问题是在 python3 map 函数中 returns 是一个迭代器对象,而不是一个集合。修复此示例所需要做的就是更改 map -> list(map(...)):

def trainXGbModel(partitionKey, labelAndFeatures):
  X = np.asarray(list(map(lambda v: v[1].toArray(), labelAndFeatures)))
  Y = np.asarray(list(map(lambda v: v[0], labelAndFeatures)))

或者您可以使用 np.fromiter 将可迭代对象转换为 numpy 数组。