从 spark-shell (pyspark) 查询 spark 流应用程序

Querying a spark streaming application from spark-shell (pyspark)

我在 pyspark 控制台中关注此 example,一切正常。

之后我将其编写为 PySpark 应用程序如下:

# -*- coding: utf-8 -*-

import sys

import click

import logging

from pyspark.sql import SparkSession

from pyspark.sql.types import *


@click.command()
@click.option('--master')
def most_idiotic_bi_query(master):
    spark = SparkSession \
            .builder \
            .master(master)\
            .appName("stream-test")\
            .getOrCreate()

    spark.sparkContext.setLogLevel('ERROR')

    some_schema = ....  # Schema removed 

    some_stream    = spark\
                     .readStream\
                     .option("sep", ",")\
                     .schema(some_schema)\
                     .option("maxFilesPerTrigger", 1)\
                     .csv("/data/some_stream", header=True)

    streaming_counts = (
        linkage_stream.groupBy(some_stream.field_1).count()
    )

    query = streaming_counts.writeStream\
                            .format("memory")\
                            .queryName("counts")\
                            .outputMode("complete")\
                            .start()



    query.awaitTermination()

if __name__ == "__main__":
    logging.getLogger("py4j").setLevel(logging.ERROR)
    most_idiotic_bi_query()

应用执行为:

spark-submit test_stream.py --master spark://master:7077

现在,如果我在另一个终端打开一个新的 spark 驱动程序:

pyspark --master spark://master:7077

并尝试 运行:

spark.sql("select * from counts")

它失败了:

During handling of the above exception, another exception occurred:

AnalysisExceptionTraceback (most recent call last)
<ipython-input-3-732b22f02ef6> in <module>()
----> 1 spark.sql("select * from id_counts").show()

/usr/spark-2.0.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
    541         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    542         """
--> 543         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    544 
    545     @since(2.0)

/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/spark-2.0.2/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: 'Table or view not found: counts; line 1 pos 14'

我不明白发生了什么。

这是预期的行为。如果您检查 the documentation 内存槽:

The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.

如您所见,内存接收器不会创建持久性 table 或全局临时视图,而是仅限于驱动程序的本地结构。因此无法从另一个 Spark 应用程序查询它。

因此必须从驱动程序查询内存输出,并在其中写入。例如,您可以模仿 console 模式,如下所示。

虚拟作家:

import pandas as pd
import numpy as np
import tempfile
import shutil

def producer(path):
    temp_path = tempfile.mkdtemp()

    def producer(i):
        df = pd.DataFrame({
          "group": np.random.randint(10, size=1000)
        }) 
        df["val"] = (
            np.random.randn(1000) + 
            np.random.random(1000) * df["group"] + 
            np.random.random(1000) * i % 7
        )
        f = tempfile.mktemp(dir=temp_path)
        df.to_csv(f, index=False)
        shutil.move(f, path)
    return producer

Spark 应用程序:

from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField

schema = StructType([
   StructField("group", IntegerType()),
   StructField("val", DoubleType())
])

path = tempfile.mkdtemp()
query_name = "foo"

stream = (spark.readStream
    .schema(schema)
    .format("csv")
    .option("header", "true")
    .load(path))

query = (stream
    .groupBy("group")
    .avg("val")
    .writeStream
    .format("memory")
    .queryName(query_name)
    .outputMode("complete")
    .start())

还有一些事件:

from rx import Observable

timer = Observable.timer(5000, 5000)
timer.subscribe(producer(path))
timer.skip(1).subscribe(lambda *_: spark.table(query_name).show())

query.awaitTermination()