当数据在工作节点上可用但现在有可用于执行任务的计算资源时,Spark 会发生什么?

What happens in Spark when data is available on a worker node but there are now computation resources available to execute task?

我是 Spark 的初学者,我想了解它的架构。我知道在理想情况下,集群管理器将任务分配给 运行 在同一节点上的执行程序,其中还存在处理所需的数据。 但是,如果存在数据的节点没有任何可用的执行程序怎么办?

spark 如何处理位于同一地点的数据背后的大致思路是:

If you use the SparkSession provided tools to read a DataFrame (see DataFrameReader Documentation) then an execution graph is created which will try to read data node-local. I.e. each Spark executor will read data which resides on the local-to-this executor part of a distributed storage: For example local HDFS-blocks. This requires that you have partitioning information on the data store, and use this to create a DataFrameReader. This is the proper way to use Spark with big data, since it allows near-arbitrary scaling.

来自

的 Rick Moritza

在某些用例中,数据不与 spark 集群位于同一位置,spark 必须找出将数据传送给执行程序的最佳方法。另一个用例正是您所说的,一个节点没有足够的资源来启动执行程序。对于所有这些用例,spark 按照一些规则处理问题:

Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.

Data locality is how close data is to the code processing it. There are several levels of locality based on the data’s current location. In order from closest to farthest:

  • PROCESS_LOCAL - data is in the same JVM as the running code. This is the best locality possible
  • NODE_LOCAL - data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
  • NO_PREF - data is accessed equally quickly from anywhere and has no locality preference
  • RACK_LOCAL - data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
  • ANY - data is elsewhere on the network and not in the same rack

Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels. There are two options: a) wait until a busy CPU frees up to start a task on data on the same server, or b) immediately start a new task in a farther away place that requires moving data there.

What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback between each level can be configured individually or all together in one parameter; see thespark.locality parameters on the configuration page for details. You should increase these settings if your tasks are long and see poor locality, but the default usually works well.

And preferably a difference of local data sorting: PROCESS_LOCAL> NODE_LOCAL> NO_PREF> RACK_LOCAL

来自here

如果数据没有与您的 spark 应用程序位于同一位置,例如您从 S3 读取数据:

The main problem with S3 is that the consumers no longer have data locality and all reads need to transfer data across the network, and S3 performance tuning itself is a black box.

When using HDFS and getting perfect data locality, it is possible to get ~3GB/node local read throughput on some of the instance types (e.g. i2.8xl, roughly 90MB/s per core). DBIO, our cloud I/O optimization module, provides optimized connectors to S3 and can sustain ~600MB/s read throughput on i2.8xl (roughly 20MB/s per core).

That is to say, on a per node basis, HDFS can yield 6X higher read throughput than S3. Thus, given that the S3 is 10x cheaper than HDFS, we find that S3 is almost 2x better compared to HDFS on performance per dollar.

However, a big benefit with S3 is we can separate storage from compute, and as a result, we can just launch a larger cluster for a smaller period of time to increase throughput, up to allowable physical limits. This separation of compute and storage also allow for different Spark applications (such as a data engineering ETL job and an ad-hoc data science model training cluster) to run on their own clusters, preventing concurrency issues that affect multi-user fixed-sized Hadoop clusters. This separation (and the flexible accommodation of disparate workloads) not only lowers cost but also improves the user experience.

来自here