在 Apache Zeppelin 中使用 pyspark 读取 DataFrame 时出现问题:对象中缺少某些方法时异常不明确
Problems when reading DataFrame with pyspark in Apache Zeppelin: very unclear exception on missing some methods in the objects
首先,加载 python 库以便与 Pyspark 一起使用并使用 bokeh 库:
%spark.pyspark
import bkzep
import numpy as np
from bokeh.io import output_notebook, show
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.layouts import gridplot
from pyspark.sql.functions import col, coalesce, lit, monotonically_increasing_id
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
output_notebook(notebook_type='zeppelin')
然后准备df:
%pyspark
from pyspark.sql.functions import *
def plot_summaries(sensor, dfName):
df = DataFrame(z.get(dfName), sqlContext)
pdf = df.toPandas()
#.select("*") \
#.orderBy("index") \
#.limit(1000) \
#.withColumn("id", col("index")) \
#sample = pdf.sample(50)
source = ColumnDataSource(pdf)
#print(pdf)
TOOLTIPS = [
("month", "@month"),
("day", "@day"),
("hour", "@hour"),
("min", "@{min}{0.3f}"),
("avg", "@{avg}{0.3f}"),
("max", "@{max}{0.3f}"),
("median", "@{median}{0.3f}"),
("stddev", "@{stddev}{0.3f}"),
]
TOOLTIPS2 = [
("month", "@month"),
("day", "@day"),
("count", "@{count}{0.3f}"),
]
fig = figure(title="Hourly summaries of '{}'".format(sensor), tooltips=TOOLTIPS)
#fig.line(x='id', y='avg', source=source, color="orange")
#fig.line(x='id', y='min', source=source, color="green")
#fig.line(x='id', y='max', source=source, color="red")
fig.line(x='id', y='median', source=source, color="blue")
#fig.line(x='id', y='stddev', source=source, color="aquamarine")
#fig2 = figure(title="Hourly summaries of '{}' counters".format(sensor), tooltips=TOOLTIPS2)
#fig2.line(x='id', y='count', source=source, color="orange")
show(gridplot([fig], ncols=1, plot_width=1000, plot_height=400))
#show(fig)
sensors = [
"Water_Level_Sensor_stddev",
"Water_Level_Sensor_mean"
]
然后调用该函数以获得bokeh
plot:
%pyspark
from pyspark.sql.functions import *
keyCol = "month_day_hour"
#for sensor in sensors:
plot_summaries("Water_Level_Sensor_stddev", "pivoted")
然后得到以下异常:
---------1------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-118-bda5385b9d44> in <module>
4
5 #for sensor in sensors:
----> 6 plot_summaries("Water_Level_Sensor_stddev", "resultIndexed")
<ipython-input-106-d6669aca8991> in plot_summaries(sensor, dfName)
3 def plot_summaries(sensor, dfName):
4 df = DataFrame(z.get(dfName), sqlContext)
----> 5 pdf = df.toPandas()
6 #.select("*") \
7 #.orderBy("index") \
/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py in toPandas(self)
136
137 # Below is toPandas without Arrow optimization.
--> 138 pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
139 column_counter = Counter(self.columns)
140
/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py in collect(self)
594 """
595 with SCCallSiteSync(self._sc) as css:
--> 596 sock_info = self._jdf.collectToPython()
597 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
598
AttributeError: 'NoneType' object has no attribute 'collectToPython'
原因是“pivoted”是上述段落中提出的 scala DataFrame
。我错了,可以在不在某处序列化 Scala DataFrame 并将其作为 Pyspark 数据帧读取的情况下使用它。然而,对象似乎被拾取了,但是它有错误和意外的类型,pyspark 无法处理。
首先,加载 python 库以便与 Pyspark 一起使用并使用 bokeh 库:
%spark.pyspark
import bkzep
import numpy as np
from bokeh.io import output_notebook, show
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.layouts import gridplot
from pyspark.sql.functions import col, coalesce, lit, monotonically_increasing_id
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
output_notebook(notebook_type='zeppelin')
然后准备df:
%pyspark
from pyspark.sql.functions import *
def plot_summaries(sensor, dfName):
df = DataFrame(z.get(dfName), sqlContext)
pdf = df.toPandas()
#.select("*") \
#.orderBy("index") \
#.limit(1000) \
#.withColumn("id", col("index")) \
#sample = pdf.sample(50)
source = ColumnDataSource(pdf)
#print(pdf)
TOOLTIPS = [
("month", "@month"),
("day", "@day"),
("hour", "@hour"),
("min", "@{min}{0.3f}"),
("avg", "@{avg}{0.3f}"),
("max", "@{max}{0.3f}"),
("median", "@{median}{0.3f}"),
("stddev", "@{stddev}{0.3f}"),
]
TOOLTIPS2 = [
("month", "@month"),
("day", "@day"),
("count", "@{count}{0.3f}"),
]
fig = figure(title="Hourly summaries of '{}'".format(sensor), tooltips=TOOLTIPS)
#fig.line(x='id', y='avg', source=source, color="orange")
#fig.line(x='id', y='min', source=source, color="green")
#fig.line(x='id', y='max', source=source, color="red")
fig.line(x='id', y='median', source=source, color="blue")
#fig.line(x='id', y='stddev', source=source, color="aquamarine")
#fig2 = figure(title="Hourly summaries of '{}' counters".format(sensor), tooltips=TOOLTIPS2)
#fig2.line(x='id', y='count', source=source, color="orange")
show(gridplot([fig], ncols=1, plot_width=1000, plot_height=400))
#show(fig)
sensors = [
"Water_Level_Sensor_stddev",
"Water_Level_Sensor_mean"
]
然后调用该函数以获得bokeh
plot:
%pyspark
from pyspark.sql.functions import *
keyCol = "month_day_hour"
#for sensor in sensors:
plot_summaries("Water_Level_Sensor_stddev", "pivoted")
然后得到以下异常:
---------1------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-118-bda5385b9d44> in <module>
4
5 #for sensor in sensors:
----> 6 plot_summaries("Water_Level_Sensor_stddev", "resultIndexed")
<ipython-input-106-d6669aca8991> in plot_summaries(sensor, dfName)
3 def plot_summaries(sensor, dfName):
4 df = DataFrame(z.get(dfName), sqlContext)
----> 5 pdf = df.toPandas()
6 #.select("*") \
7 #.orderBy("index") \
/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py in toPandas(self)
136
137 # Below is toPandas without Arrow optimization.
--> 138 pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
139 column_counter = Counter(self.columns)
140
/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py in collect(self)
594 """
595 with SCCallSiteSync(self._sc) as css:
--> 596 sock_info = self._jdf.collectToPython()
597 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
598
AttributeError: 'NoneType' object has no attribute 'collectToPython'
原因是“pivoted”是上述段落中提出的 scala DataFrame
。我错了,可以在不在某处序列化 Scala DataFrame 并将其作为 Pyspark 数据帧读取的情况下使用它。然而,对象似乎被拾取了,但是它有错误和意外的类型,pyspark 无法处理。