如何让 PySpark 在 Google Cloud Composer 上工作

How to get PySpark working on Google Cloud Composer

我发现 Google Cloud Composer 是非常有前途的托管 Apache Airflow 服务,但我无法弄清楚如何使用 Cloud Composer 通过 PySpark 代码执行管道。我能够安装其他 Python 软件包,例如Pandas 并且使用 Cloud Composer 就好了。

非常感谢任何指点。

Cloud Composer 用于安排管道。

因此,对于 Cloud Composer 中的 运行ning PySpark 代码,您需要在 Dataproc 集群中创建一个 Dataproc 集群作为 PySpark 作业 运行。 在 DAG 中,使用 DataprocCreateClusterOperator you can schedule to create a Dataproc Cluster. After the Cluster is created, you can submit your PySpark job to the Dataproc cluster using the DataprocSubmitJobOperator。要向集群提交作业,您需要提供作业源文件。 您可以参考下面的一段代码。

PySpark 代码:


import pyspark
from operator import add
sc = pyspark.SparkContext()

data = sc.parallelize(list("Hello World"))
counts = data.map(lambda x: 
    (x, 1)).reduceByKey(add).sortBy(lambda x: x[1],
     ascending=False).collect()

for (word, count) in counts:
    print("{}: {}".format(word, count))


DAG代码:


import os
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
   DataprocCreateClusterOperator,
   DataprocSubmitJobOperator
)
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
from airflow.utils.dates import days_ago

PROJECT_ID = "give your project id"
CLUSTER_NAME =  "your dataproc cluster name that you want to create"
REGION = "us-central1"
ZONE = "us-central1-a"
PYSPARK_URI = "GCS location of your PySpark Code i.e gs://[input file]"

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_dag_args = {
    'start_date': YESTERDAY,
}

# Cluster definition
# [START how_to_cloud_dataproc_create_cluster]

CLUSTER_CONFIG = {
   "master_config": {
       "num_instances": 1,
       "machine_type_uri": "n1-standard-4",
       "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
   },
   "worker_config": {
       "num_instances": 2,
       "machine_type_uri": "n1-standard-4",
       "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
   },
}

with models.DAG(
   "dataproc",

   schedule_interval=datetime.timedelta(days=1),
   default_args=default_dag_args) as dag:

   # [START how_to_cloud_dataproc_create_cluster_operator]
   create_cluster = DataprocCreateClusterOperator(
       task_id="create_cluster",
       project_id=PROJECT_ID,
       cluster_config=CLUSTER_CONFIG,
       region=REGION,
       cluster_name=CLUSTER_NAME,
   )

   PYSPARK_JOB = {
   "reference": {"project_id": PROJECT_ID},
   "placement": {"cluster_name": CLUSTER_NAME},
   "pyspark_job": {"main_python_file_uri": PYSPARK_URI},
   }

   pyspark_task = DataprocSubmitJobOperator(
       task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
   )

   create_cluster >>  pyspark_task