Spark Streaming:如何在流上加载管道?

Spark Streaming: How to load a Pipeline on a Stream?

我正在实现用于流处理的 lambda 架构系统。

我在 Spark Ba​​tch 中使用 GridSearch 创建管道没问题:

pipeline = Pipeline(stages=[data1_indexer, data2_indexer, ..., assembler, logistic_regressor])

paramGrid = (
ParamGridBuilder()
.addGrid(logistic_regressor.regParam, (0.01, 0.1))
.addGrid(logistic_regressor.tol, (1e-5, 1e-6))
...etcetera
).build()

cv = CrossValidator(estimator=pipeline,
                estimatorParamMaps=paramGrid,
                evaluator=BinaryClassificationEvaluator(),
                numFolds=4)

pipeline_cv = cv.fit(raw_train_df)
model_fitted = pipeline_cv.getEstimator().fit(raw_validation_df)
model_fitted.write().overwrite().save("pipeline")

但是,我似乎找不到如何在 Spark Streaming Process 中插入管道。我正在使用 kafka 作为 DStream 源,我现在的代码如下:

import json
from pyspark.ml import PipelineModel
from pyspark.streaming.kafka import KafkaUtils

从 pyspark.streaming 导入 StreamingContext

ssc = StreamingContext(sc, 1)
kafkaStream = KafkaUtils.createStream(ssc,  "localhost:2181", "spark-    streaming-consumer", {"kafka_topic": 1})

model = PipelineModel.load('pipeline/')
parsed_stream = kafkaStream.map(lambda x: json.loads(x[1]))

CODE MISSING GOES HERE    

ssc.start()
ssc.awaitTermination()

现在我需要想办法做

根据文档 here (even though it looks very very outdated) it seems like your model needs to implement the method predict 能够在 rdd 对象上使用它(并希望在 kafkastream 上使用它?)

如何在 Streaming 上下文中使用管道?重新加载的 PipelineModel 似乎只实现了 transform

这是否意味着在 Streaming 上下文中使用批处理模型的唯一方法是使用纯模型,而不是管道?

我找到了一种将 Spark Pipeline 加载到 Spark Streaming 的方法。

此解决方案适用于 Spark v2.0,因为后续版本可能会实现更好的解决方案。

我找到的解决方案使用 toDF() 方法将流式 RDD 转换为 Dataframes,然后您可以在其中应用 pipeline.transform 方法。

虽然这种做事方式效率低得可怕。

# we load the required libraries
from pyspark.sql.types import (
        StructType, StringType, StructField, LongType
        )
from pyspark.sql import Row
from pyspark.streaming.kafka import KafkaUtils

#we specify the dataframes schema, so spark does not have to do reflections on the data.

pipeline_schema = StructType(
    [
        StructField("field1",StringType(),True),
        StructField("field2",StringType(),True),
        StructField("field3", LongType(),True)
 ]
)

#We load the pipeline saved with spark batch
pipeline = PipelineModel.load('/pipeline')

#Setup usual spark context, and spark Streaming Context
sc = spark.sparkContext
ssc = StreamingContext(sc, 1)

#On my case I use kafka directKafkaStream as the DStream source
directKafkaStream = KafkaUtils.createDirectStream(ssc, suwanpos[QUEUE_NAME], {"metadata.broker.list": "localhost:9092"})

def handler(req_rdd):
    def process_point(p):
        #here goes the logic to  do after applying the pipeline
        print(p)   
    if req_rdd.count()  > 0:
        #Here is the gist of it, we turn the rdd into a Row, then into a df with the specified schema)
        req_df = req_rdd.map(lambda r: Row(**r)).toDF(schema=pipeline_schema)
        #Now we can apply the transform, yaaay
        pred = pipeline.transform(req_df)
        records = pred.rdd.map(lambda p: process_point(p)).collect()

希望对您有所帮助。