无法在 AWS Glue PySpark Dev Endpoint 中正确 运行 脚本

Unable to run scripts properly in AWS Glue PySpark Dev Endpoint

我已经配置了一个 AWS Glue 开发端点并且可以在 pyspark REPL 中成功连接到它 shell - 像这样 https://docs.aws.amazon.com/glue/latest/dg/dev-endpoint-tutorial-repl.html

与 AWS 文档中给出的示例不同,我在开始会话时收到警告,后来对 AWS Glue DynamicFrame 结构的各种操作失败。这是启动会话的完整日志 - 请注意有关 spark.yarn.jars 和 PyGlue.zip:

的错误
Python 2.7.12 (default, Sep  1 2016, 22:14:00)
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/aws/glue/etl/jars/glue-assembly.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
18/03/02 14:18:58 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
18/03/02 14:19:03 WARN Client: Same path resource file:/usr/share/aws/glue/etl/python/PyGlue.zip added multiple times to distributed cache.
18/03/02 14:19:13 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.12 (default, Sep  1 2016 22:14:00)
SparkSession available as 'spark'.
>>>

许多操作都按我的预期工作,但我也收到一些不受欢迎的异常,例如,我可以从我的 Glue 目录加载数据,检查其结构和其中的数据,但我无法对其应用 Map 或转换它到一个DF。这是我的完整执行 运行 日志(除了最长的错误消息)。前几个命令和设置都运行良好,但最后两个操作失败:

>>> import sys
>>> from awsglue.transforms import *
>>> from awsglue.utils import getResolvedOptions
>>> from pyspark.context import SparkContext
>>> from awsglue.context import GlueContext
>>> from awsglue.job import Job
>>>
>>> glueContext = GlueContext(spark)
>>> # Receives a string of the format yyyy-mm-dd hh:mi:ss.nnn and returns the first 10 characters: yyyy-mm-dd
... def TruncateTimestampString(ts):
...   ts = ts[:10]
...   return ts
...
>>> TruncateTimestampString('2017-03-05 06:12:08.376')
'2017-03-05'
>>>
>>> # Given a record with a timestamp property returns a record with a new property, day, containing just the date portion of the timestamp string, expected to be yyyy-mm-dd.
... def TruncateTimestamp(rec):
...   rec[day] = TruncateTimestampString(rec[timestamp])
...   return rec
...
>>> # Get the history datasource - WORKS WELL BUT LOGS log4j2 ERROR
>>> datasource_history_1 = glueContext.create_dynamic_frame.from_catalog(database = "dev", table_name = "history", transformation_ctx = "datasource_history_1")
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
>>> # Tidy the history datasource - WORKS WELL
>>> history_tidied = datasource_history_1.drop_fields(['etag', 'jobmaxid', 'jobminid', 'filename']).rename_field('id', 'history_id')
>>> history_tidied.printSchema()
root
|-- jobid: string
|-- spiderid: long
|-- timestamp: string
|-- history_id: long

>>> # Trivial observation of the SparkSession objects
>>> SparkSession
<class 'pyspark.sql.session.SparkSession'>
>>> spark
<pyspark.sql.session.SparkSession object at 0x7f8668f3b650>
>>> 
>>> 
>>> # Apply a mapping to the tidied history datasource. FAILS
>>> history_mapped = history_tidied.map(TruncateTimestamp)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/mnt/tmp/spark-1f0341db-5de6-4008-974f-a1d194524a86/userFiles-6a67bdee-7c44-46d6-a0dc-9daa7177e7e2/PyGlue.zip/awsglue/dynamicframe.py", line 101, in map
File "/mnt/tmp/spark-1f0341db-5de6-4008-974f-a1d194524a86/userFiles-6a67bdee-7c44-46d6-a0dc-9daa7177e7e2/PyGlue.zip/awsglue/dynamicframe.py", line 105, in mapPartitionsWithIndex
File "/usr/lib/spark/python/pyspark/rdd.py", line 2419, in __init__
    self._jrdd_deserializer = self.ctx.serializer
AttributeError: 'SparkSession' object has no attribute 'serializer'
>>> history_tidied.toDF()
ERROR
Huge error log and stack trace follows, longer than my console can remember. Here's how it finishes:
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/mnt/tmp/spark-1f0341db-5de6-4008-974f-a1d194524a86/userFiles-6a67bdee-7c44-46d6-a0dc-9daa7177e7e2/PyGlue.zip/awsglue/dynamicframe.py", line 128, in toDF
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 79, in deco
    raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':"

我想我正在按照亚马逊在其 Dev Endpoint REPL 说明中给出的说明进行操作,但是这些相当简单的操作(DynamicFrame.join 和 DynamicFrame.toDF)失败了,我在黑暗中工作当我想要 运行 真正的工作时(这似乎成功了,但我的 DynamicFrame.printSchema() 和 DynamicFrame.show() 命令没有出现在 CloudWatch 日志中以供执行) .

有谁知道我需要做什么来修复我的 REPL 环境,以便我可以正确测试 pyspark AWS Glue 脚本?

开发端点是在Windows上配置的还是Unix环境,貌似下载的文件没有复制到linux的正确位置,或者代码片段找不到路径罐子。请确保文件在那里。

我在我的 windows 机器上安装了 Zeppelin notebook,并且能够成功连接到 glue catalog。也许你也可以试试,如果你需要任何帮助,请告诉我。

AWS Support 终于回复了我关于这个问题的查询。这是回复:

On researching further, I found that this is a known issue with the PySpark shell and glue service team is already on it. The fix should be deployed soon, however currently there's no ETA that I can share with you.

Meanwhile here's a workaround: before initializing Glue context, you can do

>> newconf = sc._conf.set("spark.sql.catalogImplementation", "in-memory")
>> sc.stop()
>> sc = sc.getOrCreate(newconf)

and then instantiate glueContext from that sc.

我可以确认这对我有用,这是我能够使用的脚本 运行:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# New recommendation from AWS Support 2018-03-22
newconf = sc._conf.set("spark.sql.catalogImplementation", "in-memory")
sc.stop()
sc = sc.getOrCreate(newconf)
# End AWS Support Workaround

glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

datasource_history_1 = glueContext.create_dynamic_frame.from_catalog(database = "dev", table_name = "history", transformation_ctx = "datasource_history_1")

def DoNothingMap(rec):
  return rec

history_mapped = datasource_history_1.map(DoNothingMap)
history_df = history_mapped.toDF()
history_df.show()
history_df.printSchema()

以前 .map().toDF() 调用会失败。

我已要求 AWS Support 在解决此问题时通知我,以便不再需要解决方法。