Fargate 上的 Spark 找不到本地 IP
Spark on Fargate can't find local IP
我有一个构建作业正在尝试在 1 个节点的 AWS Fargate 集群中设置。当我尝试 运行 Spark 构建我的数据时,我收到一个错误,似乎是关于 Java not being able to find "localHost".
我通过 运行 添加 spark-env.sh
文件、更新 /etc/hosts
文件和更新 spark-defaults.conf
文件的脚本来设置配置。
在 $SPARK_HOME/conf/spark-env.sh
文件中,我添加:
SPARK_LOCAL_IP
SPARK_MASTER_HOST
在$SPARK_HOME/conf/spark-defaults.conf
spark.jars.packages <comma separated jars>
spark.master <ip or URL>
spark.driver.bindAddress <IP or URL>
spark.driver.host <IP or URL>
在 /etc/hosts
文件中,我追加:
<IP I get from http://169.254.170.2/v2/metadata> master
通过传入带有 IP 或 URL 的 -master <IP or URL>
参数来调用 spark-submit
脚本似乎没有帮助。
我试过使用 local[*]
、spark://<ip from metadata>:<port from metadata>
、<ip>
和 <ip>:<port>
变体,但无济于事。
与使用 master
和从元数据返回的 IP 之类的东西相比,使用 127.0.0.1
和 localhost
似乎没有什么不同。
在 AWS 端,Fargate 集群 运行 位于连接了 NatGateway 的私有子网中,因此据我所知,它确实有出口和入口网络路由。我试过使用 public 网络和 ENABLED
设置 ECS 以自动将 public IP 附加到容器。
Spark 文档中的所有标准端口也在容器上打开。
在尝试收集自己的 IP 之前,它似乎 运行 正常。
我返回的错误是这样的,在栈中:
spark.jars.packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2
spark.master spark://10.0.41.190:7077
Spark Command: /docker-java-home/bin/java -cp /usr/spark/conf/:/usr/spark/jars/* -Xmx1gg org.apache.spark.deploy.SparkSubmit --master spark://10.0.41.190:7077 --verbose --jars lib/RedshiftJDBC42-1.2.12.1017.jar --packages org.apache.hadoop:hadoop-aws:2.7.3,com.amazonaws:aws-java-sdk:1.7.4,com.upplication:s3fs:2.2.1 ./build_phase.py
========================================
Using properties file: /usr/spark/conf/spark-defaults.conf
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.spark.util.Utils$.redact(Utils.scala:2653)
at org.apache.spark.deploy.SparkSubmitArguments$$anonfun$defaultSparkProperties.apply(SparkSubmitArguments.scala:93)
at org.apache.spark.deploy.SparkSubmitArguments$$anonfun$defaultSparkProperties.apply(SparkSubmitArguments.scala:86)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.deploy.SparkSubmitArguments.defaultSparkProperties$lzycompute(SparkSubmitArguments.scala:86)
at org.apache.spark.deploy.SparkSubmitArguments.defaultSparkProperties(SparkSubmitArguments.scala:82)
at org.apache.spark.deploy.SparkSubmitArguments.mergeDefaultSparkProperties(SparkSubmitArguments.scala:126)
at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:110)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.UnknownHostException: d4771b650361: d4771b650361: Name or service not known
at java.net.InetAddress.getLocalHost(InetAddress.java:1505)
at org.apache.spark.util.Utils$.findLocalInetAddress(Utils.scala:891)
at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$localIpAddress$lzycompute(Utils.scala:884)
at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$localIpAddress(Utils.scala:884)
at org.apache.spark.util.Utils$$anonfun$localHostName.apply(Utils.scala:941)
at org.apache.spark.util.Utils$$anonfun$localHostName.apply(Utils.scala:941)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.util.Utils$.localHostName(Utils.scala:941)
at org.apache.spark.internal.config.package$.<init>(package.scala:204)
at org.apache.spark.internal.config.package$.<clinit>(package.scala)
... 10 more
容器在本地尝试运行时没有问题,所以我认为这与 Fargate 的性质有关。
任何帮助或指点将不胜感激!
编辑
自 post 以来,我尝试了一些不同的东西。我正在使用 运行 与 Spark 2.3、Hadoop 2.7 和 Python 3 的图像,我尝试添加 OS 包和我已经提到的配置的不同变体。
这一切闻起来好像我在做 spark-defaults.conf
和朋友们错了,但我对这些东西太陌生了,它可能只是木星和火星的排列不当...
当前堆栈跟踪:
Getting Spark Context...
2018-06-08 22:39:40 INFO SparkContext:54 - Running Spark version 2.3.0
2018-06-08 22:39:40 INFO SparkContext:54 - Submitted application: SmashPlanner
2018-06-08 22:39:41 INFO SecurityManager:54 - Changing view acls to: root
2018-06-08 22:39:41 INFO SecurityManager:54 - Changing modify acls to: root
2018-06-08 22:39:41 INFO SecurityManager:54 - Changing view acls groups to:
2018-06-08 22:39:41 INFO SecurityManager:54 - Changing modify acls groups to:
2018-06-08 22:39:41 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
2018-06-08 22:39:41 ERROR SparkContext:91 - Error initializing SparkContext.
java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:218)
at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1283)
at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:989)
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
at io.netty.bootstrap.AbstractBootstrap.run(AbstractBootstrap.java:364)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
2018-06-08 22:39:41 INFO SparkContext:54 - Successfully stopped SparkContext
Traceback (most recent call last):
File "/usr/local/smash_planner/build_phase.py", line 13, in <module>
main()
File "/usr/local/smash_planner/build_phase.py", line 9, in main
build_all_data(pred_date)
File "/usr/local/smash_planner/DataPiping/build_data.py", line 25, in build_all_data
save_keyword(pred_date)
File "/usr/local/smash_planner/DataPiping/build_data.py", line 52, in save_keyword
df = get_dataframe(query)
File "/usr/local/smash_planner/SparkUtil/data_piping.py", line 15, in get_dataframe
sc = SparkCtx.get_sparkCtx()
File "/usr/local/smash_planner/SparkUtil/context.py", line 20, in get_sparkCtx
sc = SparkContext(conf=conf).getOrCreate()
File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 118, in __init__
File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 180, in _do_init
File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 270, in _initialize_context
File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.6-py3.4.egg/py4j/java_gateway.py", line 1428, in __call__
answer, self._gateway_client, None, self._fqn)
File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.6-py3.4.egg/py4j/protocol.py", line 320, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:218)
at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1283)
at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:989)
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
at io.netty.bootstrap.AbstractBootstrap.run(AbstractBootstrap.java:364)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
2018-06-08 22:39:41 INFO ShutdownHookManager:54 - Shutdown hook called
2018-06-08 22:39:41 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-80488ba8-2367-4fa6-8bb7-194b5ebf08cc
Traceback (most recent call last):
File "bin/smash_planner.py", line 76, in <module>
raise RuntimeError("Spark hated your config and/or invocation...")
RuntimeError: Spark hated your config and/or invocation...
SparkConf 运行时间配置:
def get_dataframe(query):
...
sc = SparkCtx.get_sparkCtx()
sql_context = SQLContext(sc)
df = sql_context.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", os.getenv('JDBC_URL')) \
.option("user", os.getenv('REDSHIFT_USER')) \
.option("password", os.getenv('REDSHIFT_PASSWORD')) \
.option("dbtable", "( " + query + " ) tmp ") \
.load()
return df
编辑 2
仅使用 spark-env
配置和 运行 来自 gettyimages/docker-spark 图像的默认值会在浏览器中出现此错误。
java.util.NoSuchElementException
at java.util.Collections$EmptyIterator.next(Collections.java:4189)
at org.apache.spark.util.kvstore.InMemoryStore$InMemoryIterator.next(InMemoryStore.java:281)
at org.apache.spark.status.AppStatusStore.applicationInfo(AppStatusStore.scala:38)
at org.apache.spark.ui.jobs.AllJobsPage.render(AllJobsPage.scala:273)
at org.apache.spark.ui.WebUI$$anonfun.apply(WebUI.scala:82)
at org.apache.spark.ui.WebUI$$anonfun.apply(WebUI.scala:82)
at org.apache.spark.ui.JettyUtils$$anon.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584)
at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:534)
at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:320)
at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
at org.spark_project.jetty.io.SelectChannelEndPoint.run(SelectChannelEndPoint.java:93)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at org.spark_project.jetty.util.thread.QueuedThreadPool.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)
转到 AWS 控制台并在您的安全组配置下,允许所有入站流量到实例。
解决方法是避免用户错误...
这是一个完全面子问题,但我希望我对 Spark 系统的误解可以帮助像我这样的可怜的傻瓜,他们花了太多时间在同一类型的问题上。
最后一次迭代的答案(gettyimages/docker-spark
Docker 图像)是我试图 运行 spark-submit
命令而没有 master 或 worker )开始。
在 gettyimages/docker-spark
存储库中,您可以找到一个 docker-compose
文件,该文件向您显示它在完成任何 spark 工作之前创建了 master
和 worker
节点。 image 创建 master 或 worker 的方式是使用 spark-class
脚本并分别传入 org.apache.spark.deploy.<master|worker>.<Master|Worker>
class。
所以,把它们放在一起,我可以使用我正在使用的配置,但我必须先创建 master
和 worker(s)
,然后执行相同的 spark-submit
命令正如我已经在做的那样。
这是一个快速而肮脏的实现,虽然我保证有更好的实现,由真正知道他们在做什么的人完成:
前 3 个步骤发生在集群引导脚本中。我在 AWS Lambda 中执行此操作,由 APIGateway
触发
- 创建集群和队列或某种消息代理系统,例如 zookeeper/kafka。 (我为此使用 API-网关 -> lambda)
- 选择一个主节点(lambda 中的逻辑)
- 创建一条包含一些基本信息的消息,例如主人的 IP 或域,并将其放入第 1 步的队列中(发生在 lambda 中)
下面的一切都发生在 Spark 节点上的启动脚本中
- 在启动脚本中创建一个步骤,让节点检查来自步骤 3 的消息队列
- 将
SPARK_MASTER_HOST
和 SPARK_LOCAL_IP
添加到 $SPARK_HOME/conf/spark-env.sh
文件,使用您在第 4 步中选择的消息中的信息
- 将
spark.driver.bindAddress
添加到 $SPARK_HOME/conf/spark-defaults.conf
文件,使用您在第 4 步中选择的消息中的信息
- 在启动脚本中使用一些逻辑来决定 "this" 节点是主节点还是工作节点
- 启动 master 或 worker。在
gettyimages/docker-spark
图像中,你可以用 $SPARK_HOME/bin/spark-class org.apache.spark.deploy.master.Master -h <the master's IP or domain>
启动一个 master,你可以用 $SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker -h spark://<master's domain or IP>:7077
启动一个 worker
- 现在您可以 运行
spark-submit
命令,它将工作部署到集群。
编辑:(部分代码供参考)
这是对 lambda
的补充
def handler(event, context):
config = BuildConfig(event)
res = create_job(config)
return build_response(res)
编辑后
def handler(event, context):
config = BuildConfig(event)
coordination_queue = config.cluster + '-coordination'
sqs = boto3.client('sqs')
message_for_master_node = {'type': 'master', 'count': config.count}
queue_urls = sqs.list_queues(QueueNamePrefix=coordination_queue)['QueueUrls']
if not queue_urls:
queue_url = sqs.create_queue(QueueName=coordination_queue)['QueueUrl']
else:
queue_url = queue_urls[0]
sqs.send_message(QueueUrl=queue_url,
MessageBody=message_for_master_node)
res = create_job(config)
return build_response(res)
然后我在脚本中添加了一些内容,即 Spark 集群 运行 中的节点在启动时:
# addition to the "main" in the Spark node's startup script
sqs = boto3.client('sqs')
boot_info_message = sqs.receive_message(
QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
MaxNumberOfMessages=1)['Messages'][0]
boot_info = boot_info_message['Body']
message_for_worker = {'type': 'worker', 'master': self_url}
if boot_info['type'] == 'master':
for i in range(int(boot_info['count'])):
sqs.send_message(QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
MessageBody=message_for_worker)
sqs.delete_message(QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
ReceiptHandle=boot_info_message['ReceiptHandle'])
...
# starts a master or worker node
startup_command = "org.apache.spark.deploy.{}.{}".format(
boot_info['type'], boot_info['type'].title())
subprocess.call(startup_command)
我有一个构建作业正在尝试在 1 个节点的 AWS Fargate 集群中设置。当我尝试 运行 Spark 构建我的数据时,我收到一个错误,似乎是关于 Java not being able to find "localHost".
我通过 运行 添加 spark-env.sh
文件、更新 /etc/hosts
文件和更新 spark-defaults.conf
文件的脚本来设置配置。
在 $SPARK_HOME/conf/spark-env.sh
文件中,我添加:
SPARK_LOCAL_IP
SPARK_MASTER_HOST
在$SPARK_HOME/conf/spark-defaults.conf
spark.jars.packages <comma separated jars>
spark.master <ip or URL>
spark.driver.bindAddress <IP or URL>
spark.driver.host <IP or URL>
在 /etc/hosts
文件中,我追加:
<IP I get from http://169.254.170.2/v2/metadata> master
通过传入带有 IP 或 URL 的 -master <IP or URL>
参数来调用 spark-submit
脚本似乎没有帮助。
我试过使用 local[*]
、spark://<ip from metadata>:<port from metadata>
、<ip>
和 <ip>:<port>
变体,但无济于事。
与使用 master
和从元数据返回的 IP 之类的东西相比,使用 127.0.0.1
和 localhost
似乎没有什么不同。
在 AWS 端,Fargate 集群 运行 位于连接了 NatGateway 的私有子网中,因此据我所知,它确实有出口和入口网络路由。我试过使用 public 网络和 ENABLED
设置 ECS 以自动将 public IP 附加到容器。
Spark 文档中的所有标准端口也在容器上打开。
在尝试收集自己的 IP 之前,它似乎 运行 正常。
我返回的错误是这样的,在栈中:
spark.jars.packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2
spark.master spark://10.0.41.190:7077
Spark Command: /docker-java-home/bin/java -cp /usr/spark/conf/:/usr/spark/jars/* -Xmx1gg org.apache.spark.deploy.SparkSubmit --master spark://10.0.41.190:7077 --verbose --jars lib/RedshiftJDBC42-1.2.12.1017.jar --packages org.apache.hadoop:hadoop-aws:2.7.3,com.amazonaws:aws-java-sdk:1.7.4,com.upplication:s3fs:2.2.1 ./build_phase.py
========================================
Using properties file: /usr/spark/conf/spark-defaults.conf
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.spark.util.Utils$.redact(Utils.scala:2653)
at org.apache.spark.deploy.SparkSubmitArguments$$anonfun$defaultSparkProperties.apply(SparkSubmitArguments.scala:93)
at org.apache.spark.deploy.SparkSubmitArguments$$anonfun$defaultSparkProperties.apply(SparkSubmitArguments.scala:86)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.deploy.SparkSubmitArguments.defaultSparkProperties$lzycompute(SparkSubmitArguments.scala:86)
at org.apache.spark.deploy.SparkSubmitArguments.defaultSparkProperties(SparkSubmitArguments.scala:82)
at org.apache.spark.deploy.SparkSubmitArguments.mergeDefaultSparkProperties(SparkSubmitArguments.scala:126)
at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:110)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.UnknownHostException: d4771b650361: d4771b650361: Name or service not known
at java.net.InetAddress.getLocalHost(InetAddress.java:1505)
at org.apache.spark.util.Utils$.findLocalInetAddress(Utils.scala:891)
at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$localIpAddress$lzycompute(Utils.scala:884)
at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$localIpAddress(Utils.scala:884)
at org.apache.spark.util.Utils$$anonfun$localHostName.apply(Utils.scala:941)
at org.apache.spark.util.Utils$$anonfun$localHostName.apply(Utils.scala:941)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.util.Utils$.localHostName(Utils.scala:941)
at org.apache.spark.internal.config.package$.<init>(package.scala:204)
at org.apache.spark.internal.config.package$.<clinit>(package.scala)
... 10 more
容器在本地尝试运行时没有问题,所以我认为这与 Fargate 的性质有关。
任何帮助或指点将不胜感激!
编辑
自 post 以来,我尝试了一些不同的东西。我正在使用 运行 与 Spark 2.3、Hadoop 2.7 和 Python 3 的图像,我尝试添加 OS 包和我已经提到的配置的不同变体。
这一切闻起来好像我在做 spark-defaults.conf
和朋友们错了,但我对这些东西太陌生了,它可能只是木星和火星的排列不当...
当前堆栈跟踪:
Getting Spark Context...
2018-06-08 22:39:40 INFO SparkContext:54 - Running Spark version 2.3.0
2018-06-08 22:39:40 INFO SparkContext:54 - Submitted application: SmashPlanner
2018-06-08 22:39:41 INFO SecurityManager:54 - Changing view acls to: root
2018-06-08 22:39:41 INFO SecurityManager:54 - Changing modify acls to: root
2018-06-08 22:39:41 INFO SecurityManager:54 - Changing view acls groups to:
2018-06-08 22:39:41 INFO SecurityManager:54 - Changing modify acls groups to:
2018-06-08 22:39:41 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
2018-06-08 22:39:41 ERROR SparkContext:91 - Error initializing SparkContext.
java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:218)
at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1283)
at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:989)
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
at io.netty.bootstrap.AbstractBootstrap.run(AbstractBootstrap.java:364)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
2018-06-08 22:39:41 INFO SparkContext:54 - Successfully stopped SparkContext
Traceback (most recent call last):
File "/usr/local/smash_planner/build_phase.py", line 13, in <module>
main()
File "/usr/local/smash_planner/build_phase.py", line 9, in main
build_all_data(pred_date)
File "/usr/local/smash_planner/DataPiping/build_data.py", line 25, in build_all_data
save_keyword(pred_date)
File "/usr/local/smash_planner/DataPiping/build_data.py", line 52, in save_keyword
df = get_dataframe(query)
File "/usr/local/smash_planner/SparkUtil/data_piping.py", line 15, in get_dataframe
sc = SparkCtx.get_sparkCtx()
File "/usr/local/smash_planner/SparkUtil/context.py", line 20, in get_sparkCtx
sc = SparkContext(conf=conf).getOrCreate()
File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 118, in __init__
File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 180, in _do_init
File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 270, in _initialize_context
File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.6-py3.4.egg/py4j/java_gateway.py", line 1428, in __call__
answer, self._gateway_client, None, self._fqn)
File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.6-py3.4.egg/py4j/protocol.py", line 320, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:218)
at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1283)
at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:989)
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
at io.netty.bootstrap.AbstractBootstrap.run(AbstractBootstrap.java:364)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
2018-06-08 22:39:41 INFO ShutdownHookManager:54 - Shutdown hook called
2018-06-08 22:39:41 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-80488ba8-2367-4fa6-8bb7-194b5ebf08cc
Traceback (most recent call last):
File "bin/smash_planner.py", line 76, in <module>
raise RuntimeError("Spark hated your config and/or invocation...")
RuntimeError: Spark hated your config and/or invocation...
SparkConf 运行时间配置:
def get_dataframe(query):
...
sc = SparkCtx.get_sparkCtx()
sql_context = SQLContext(sc)
df = sql_context.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", os.getenv('JDBC_URL')) \
.option("user", os.getenv('REDSHIFT_USER')) \
.option("password", os.getenv('REDSHIFT_PASSWORD')) \
.option("dbtable", "( " + query + " ) tmp ") \
.load()
return df
编辑 2
仅使用 spark-env
配置和 运行 来自 gettyimages/docker-spark 图像的默认值会在浏览器中出现此错误。
java.util.NoSuchElementException
at java.util.Collections$EmptyIterator.next(Collections.java:4189)
at org.apache.spark.util.kvstore.InMemoryStore$InMemoryIterator.next(InMemoryStore.java:281)
at org.apache.spark.status.AppStatusStore.applicationInfo(AppStatusStore.scala:38)
at org.apache.spark.ui.jobs.AllJobsPage.render(AllJobsPage.scala:273)
at org.apache.spark.ui.WebUI$$anonfun.apply(WebUI.scala:82)
at org.apache.spark.ui.WebUI$$anonfun.apply(WebUI.scala:82)
at org.apache.spark.ui.JettyUtils$$anon.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584)
at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:534)
at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:320)
at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
at org.spark_project.jetty.io.SelectChannelEndPoint.run(SelectChannelEndPoint.java:93)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at org.spark_project.jetty.util.thread.QueuedThreadPool.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)
转到 AWS 控制台并在您的安全组配置下,允许所有入站流量到实例。
解决方法是避免用户错误...
这是一个完全面子问题,但我希望我对 Spark 系统的误解可以帮助像我这样的可怜的傻瓜,他们花了太多时间在同一类型的问题上。
最后一次迭代的答案(gettyimages/docker-spark
Docker 图像)是我试图 运行 spark-submit
命令而没有 master 或 worker )开始。
在 gettyimages/docker-spark
存储库中,您可以找到一个 docker-compose
文件,该文件向您显示它在完成任何 spark 工作之前创建了 master
和 worker
节点。 image 创建 master 或 worker 的方式是使用 spark-class
脚本并分别传入 org.apache.spark.deploy.<master|worker>.<Master|Worker>
class。
所以,把它们放在一起,我可以使用我正在使用的配置,但我必须先创建 master
和 worker(s)
,然后执行相同的 spark-submit
命令正如我已经在做的那样。
这是一个快速而肮脏的实现,虽然我保证有更好的实现,由真正知道他们在做什么的人完成:
前 3 个步骤发生在集群引导脚本中。我在 AWS Lambda 中执行此操作,由 APIGateway
触发- 创建集群和队列或某种消息代理系统,例如 zookeeper/kafka。 (我为此使用 API-网关 -> lambda)
- 选择一个主节点(lambda 中的逻辑)
- 创建一条包含一些基本信息的消息,例如主人的 IP 或域,并将其放入第 1 步的队列中(发生在 lambda 中)
下面的一切都发生在 Spark 节点上的启动脚本中
- 在启动脚本中创建一个步骤,让节点检查来自步骤 3 的消息队列
- 将
SPARK_MASTER_HOST
和SPARK_LOCAL_IP
添加到$SPARK_HOME/conf/spark-env.sh
文件,使用您在第 4 步中选择的消息中的信息 - 将
spark.driver.bindAddress
添加到$SPARK_HOME/conf/spark-defaults.conf
文件,使用您在第 4 步中选择的消息中的信息 - 在启动脚本中使用一些逻辑来决定 "this" 节点是主节点还是工作节点
- 启动 master 或 worker。在
gettyimages/docker-spark
图像中,你可以用$SPARK_HOME/bin/spark-class org.apache.spark.deploy.master.Master -h <the master's IP or domain>
启动一个 master,你可以用$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker -h spark://<master's domain or IP>:7077
启动一个 worker
- 现在您可以 运行
spark-submit
命令,它将工作部署到集群。
编辑:(部分代码供参考) 这是对 lambda
的补充def handler(event, context):
config = BuildConfig(event)
res = create_job(config)
return build_response(res)
编辑后
def handler(event, context):
config = BuildConfig(event)
coordination_queue = config.cluster + '-coordination'
sqs = boto3.client('sqs')
message_for_master_node = {'type': 'master', 'count': config.count}
queue_urls = sqs.list_queues(QueueNamePrefix=coordination_queue)['QueueUrls']
if not queue_urls:
queue_url = sqs.create_queue(QueueName=coordination_queue)['QueueUrl']
else:
queue_url = queue_urls[0]
sqs.send_message(QueueUrl=queue_url,
MessageBody=message_for_master_node)
res = create_job(config)
return build_response(res)
然后我在脚本中添加了一些内容,即 Spark 集群 运行 中的节点在启动时:
# addition to the "main" in the Spark node's startup script
sqs = boto3.client('sqs')
boot_info_message = sqs.receive_message(
QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
MaxNumberOfMessages=1)['Messages'][0]
boot_info = boot_info_message['Body']
message_for_worker = {'type': 'worker', 'master': self_url}
if boot_info['type'] == 'master':
for i in range(int(boot_info['count'])):
sqs.send_message(QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
MessageBody=message_for_worker)
sqs.delete_message(QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
ReceiptHandle=boot_info_message['ReceiptHandle'])
...
# starts a master or worker node
startup_command = "org.apache.spark.deploy.{}.{}".format(
boot_info['type'], boot_info['type'].title())
subprocess.call(startup_command)