尝试 运行 在 Apache Spark 上进行 sklearn 文本分类。获取预期的序列或类似数组,在 PythonRDD.scala:43 处的 RDD 处获得 PythonRDD[1]

Trying to run sklearn text classification on Apache Spark..GETTING Expected sequence or array-like, got PythonRDD[1] at RDD at PythonRDD.scala:43

我正在尝试 运行 对 Twitter 数据进行 sklearn SDG 分类器,这些数据被手动标记为两个 类 0 和 1。

我是 spark 的新手,希望得到你的帮助。

我在网上看到了一些代码,并尝试为我的示例进行模拟,但不幸的是它似乎不起作用,我也不知道为什么。

非常感谢您的帮助。

import sys
sys.path.append('/home/userName/Downloads/spark-1.2.1/python')

from pyspark import SparkContext

import numpy as np

from sklearn.cross_validation import train_test_split, Bootstrap
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.tree import DecisionTreeClassifier


import numpy as np
from sklearn.metrics import hamming_loss
from sklearn import cross_validation
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.svm import LinearSVC
from sklearn.linear_model import SGDClassifier
from sklearn.svm import LinearSVC
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.multiclass import OneVsRestClassifier
from sklearn import preprocessing
import pandas as pd;
from sklearn import metrics
from sklearn.utils import shuffle
from sklearn.linear_model import RidgeClassifier
from sklearn.linear_model import Perceptron
from sklearn.linear_model import PassiveAggressiveClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import MultinomialNB
from sklearn.naive_bayes import BernoulliNB
from time import time
from sklearn.externals import joblib
import re
from HTMLParser import HTMLParser
from sklearn.grid_search import GridSearchCV
import pickle
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report
from sklearn.metrics import recall_score
from sklearn.metrics import precision_score
%matplotlib inline


def run(sc):
  u_cols = ['CLASS','USER_RATING', 'REVIEW_TEXT']
df =
pd.read_csv('/home/userName/Desktop/input_file.csv',header=1,names=u_cols)

#Cleaning the data
lenn = len(df['REVIEW_TEXT'])
tag_remove = re.compile(r'<[^>]+>')
for i in range(0,lenn):
    #Removing code block
    df['REVIEW_TEXT'][i] = re.sub('<code>.*?</code>', '', df['REVIEW_TEXT'][i])
    #Removeing html tags
    df['REVIEW_TEXT'][i] = tag_remove.sub('', df['REVIEW_TEXT'][i])



X_train = df['REVIEW_TEXT']
y_train = df['CLASS']


X_train_final = X_train
y_train_final = y_train

#Validation Set Approach
X_train_final, X_test_final, y_train_final, y_test_final =     cross_validation.train_test_split(
X_train_final, y_train_final, test_size=0.05, random_state=15)

vectorizer = HashingVectorizer(decode_error='ignore', n_features=2 ** 20, 
                           non_negative=True, stop_words = 'english', ngram_range = (1,2))


X_train_final = vectorizer.transform(X_train_final)
X_test_final = vectorizer.transform(X_test_final)


model = (SGDClassifier(alpha=1e-05, class_weight=None, epsilon=0.1, eta0=0.0,fit_intercept=True, 
                                           l1_ratio=0.15, learning_rate='optimal',loss='hinge', n_iter=5, n_jobs=1, 
                                           penalty='l1', power_t=0.5,random_state=None, shuffle=False, verbose=0,
                                           warm_start=False))

samples = sc.parallelize(Bootstrap(y_train_final.shape[0]))

vote_tally = samples.map(lambda (index, _):
    model.fit(X[index], y[index]).predict(X_test)
)

return accuracy_score(y_test_final, vote_tally)


if __name__ == '__main__':
    print run(SparkContext("local", "Boost"))

出现以下错误:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-1-be25c966218e> in <module>()
    107 
    108 if __name__ == '__main__':
--> 109     print run(SparkContext("local", "Boost"))
    110 

<ipython-input-1-be25c966218e> in run(sc)
    102     )
    103 
--> 104     return accuracy_score(y_test_final, vote_tally)
    105     #print vote_tally.count()
    106     #return vote_tally

/usr/local/lib/python2.7/dist-packages/sklearn/metrics/metrics.pyc in accuracy_score(y_true, y_pred, normalize, sample_weight)
   1295 
   1296     # Compute accuracy for each possible representation
-> 1297     y_type, y_true, y_pred = _check_clf_targets(y_true, y_pred)
   1298     if y_type == 'multilabel-indicator':
   1299         score = (y_pred != y_true).sum(axis=1) == 0

/usr/local/lib/python2.7/dist-packages/sklearn/metrics/metrics.pyc in _check_clf_targets(y_true, y_pred)
    107     y_pred : array or indicator matrix
    108     """
--> 109     y_true, y_pred = check_arrays(y_true, y_pred, allow_lists=True)
    110     type_true = type_of_target(y_true)
    111     type_pred = type_of_target(y_pred)

/usr/local/lib/python2.7/dist-packages/sklearn/utils/validation.pyc in check_arrays(*arrays, **options)
    248             checked_arrays.append(array)
    249             continue
--> 250         size = _num_samples(array)
    251 
    252         if size != n_samples:

/usr/local/lib/python2.7/dist-packages/sklearn/utils/validation.pyc in _num_samples(x)
    172             x = np.asarray(x)
    173         else:
--> 174             raise TypeError("Expected sequence or array-like, got %r" % x)
    175     return x.shape[0] if hasattr(x, 'shape') else len(x)
    176 

**TypeError: Expected sequence or array-like, got PythonRDD[1] at RDD at PythonRDD.scala:43**

问题是 sklearn 组件需要 sequences/array-like/sparse/etc。要处理的数据,但您在 pyspark 中使用 RDD。
我们有一个图书馆可以帮助您解决问题。它被称为sparkit-learn
试一试。