从 Java SDK 创建的 Cloud Scheduler 作业出错

Error in Cloud Scheduler Job Created From Java SDK

我有一个自定义训练作业,我 运行 使用 Cloud Scheduler 按照固定的时间表进行训练。当我使用 Python 客户端或 gcp 创建作业时,作业 运行 没问题。但是,当我使用 Java SDK 创建云调度程序作业时,该作业已创建但失败了。我在 Cloud Logging 中收到的错误消息摘要是:

{"@type":"type.googleapis.com/google.cloud.scheduler.logging.AttemptFinished", "jobName":"projects/{my_project_id}/locations/us-central1/jobs/java_job", "status":"INVALID_ARGUMENT", "targetType":"HTTP", "url":"https://us-central1-aiplatform.googleapis.com/v1/projects/{my_project_id}/locations/us-central1/customJobs"}

我查看了在 gcp 中创建的作业,三个作业的所有字段(使用 python 客户端创建的一个,使用 java SDK 创建的一个和直接在 gcp 中创建的一个)是相同的。我不明白为什么使用 Java SDK 创建的作业总是失败。

Java SDK代码:

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

import com.google.cloud.scheduler.v1.Job;
import com.google.cloud.scheduler.v1.LocationName;
import com.google.cloud.scheduler.v1.OAuthToken;
import com.google.protobuf.ByteString;
import com.google.cloud.scheduler.v1.CloudSchedulerClient;
import com.google.cloud.scheduler.v1.HttpMethod;
import com.google.cloud.scheduler.v1.HttpTarget;


public class Temp 
{
    
    static String projectId = "...";
    static String location = "...";
    static String serviceAccountEmail = "...-compute@developer.gserviceaccount.com";
    static String outputUriPrefix = "gs://.../.../";
    static String imageUri = String.format("%s-docker.pkg.dev/%s/.../...", location, projectId);
    
    static String trainingJobName = "custom_training_job";
    static String schedulerJobName = String.format("projects/%s/locations/%s/jobs/java_job", projectId, location);
    static String scope = "https://www.googleapis.com/auth/cloud-platform";
    static String httpTargetUri = String.format("https://%s-aiplatform.googleapis.com/v1/projects/%s/locations/%s/customJobs", 
            location, projectId, location);
    static String machineType = "n1-standard-4";
    static long replicaCount = 1;
    
    
    static String getJobBody() throws JSONException {
        JSONObject jobBody = new JSONObject();
        jobBody.put("display_name", trainingJobName);
        JSONObject base_output_directory = new JSONObject();
        base_output_directory.put("output_uri_prefix", outputUriPrefix);
        jobBody.put("base_output_directory", base_output_directory);
        JSONObject jobSpec = new JSONObject();
        JSONArray worker_pool_specs = new JSONArray();
        JSONObject spec = new JSONObject();
        spec.put("replica_count", replicaCount);
        JSONObject machine_spec = new JSONObject();
        machine_spec.put("machine_type", machineType);
        spec.put("machine_spec", machine_spec);
        JSONObject container_spec = new JSONObject();
        container_spec.put( "image_uri", imageUri);
        JSONArray args = new JSONArray();
        args.put("--msg=hello!");
        container_spec.put( "args", args);
        spec.put("container_spec", container_spec);
        worker_pool_specs.put(spec);
        jobSpec.put("worker_pool_specs", worker_pool_specs);
        jobBody.put("job_spec", jobSpec);
        return jobBody.toString();
    }
    
    public static void main( String[] args ) throws IOException, JSONException
    {
        System.out.println(String.format("=======STARTING APPLICATION, version %s =======", "v5"));
        
        CloudSchedulerClient client = CloudSchedulerClient.create();
        
        String parent = LocationName.of(projectId, location).toString();
        
        Map<String, String> headers = new HashMap<String, String>();
        headers.put("User-Agent", "Google-Cloud-Scheduler");
        headers.put("Content-Type", "application/json; charset=utf-8");
        
        OAuthToken token = OAuthToken.newBuilder()
                .setServiceAccountEmail(serviceAccountEmail)
                .setScope(scope)
                .build();       
                
        HttpTarget httpTarget = HttpTarget.newBuilder()
                .setUri(httpTargetUri)
                .setHttpMethod(HttpMethod.POST)
                .putAllHeaders(headers)
                .setBody(ByteString.copyFromUtf8(getJobBody()))
                .setOauthToken(token)
                .build();   
        
        Job job = Job.newBuilder()
                .setName(schedulerJobName)
                .setDescription("test java job")
                .setSchedule("* * * * *")
                .setTimeZone("Africa/Abidjan")
                .setHttpTarget(httpTarget)
                .build();
        
        client.createJob(parent, job);
        client.close();
    }
}

Python 客户代码:

from google.cloud import scheduler
import json


project_id = "..."
location = "..."
service_account_email = "...-compute@developer.gserviceaccount.com"
output_uri_prefix="gs://.../.../"
image_uri=f'{location}-docker.pkg.dev/{project_id}/.../...'

traning_job__name ="custom_training_job"
scheduler_job_name = f'projects/{project_id}/locations/{location}/jobs/python_job'
scope = "https://www.googleapis.com/auth/cloud-platform"
http_target_uri = f'https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/customJobs'
machine_type = "n1-standard-4"
replica_count = 1


job_spec = {
    "display_name": traning_job__name,
    "job_spec": {
            "worker_pool_specs": [
                {
                    "machine_spec": {
                        "machine_type": machine_type,
                    },
                    "replica_count": replica_count,
                    "container_spec": {
                        "image_uri": image_uri,
                        "args": [
                            "--msg=hello!"
                        ]
                    }
                }
            ],
        "base_output_directory": {
            "output_uri_prefix": output_uri_prefix
        }
    }
}


job = {
  "name": scheduler_job_name,
  "description": "Created from Python client",
  "http_target": {
    "uri": http_target_uri,
    "http_method": "POST",
    "headers": {
      "User-Agent": "Google-Cloud-Scheduler",
      "Content-Type": "application/json; charset=utf-8"
    },
    "body": json.dumps(job_spec).encode('utf-8'),
    "oauth_token": {
      "service_account_email": service_account_email,
      "scope": scope
    }
  },
  "schedule": "* * * * *",
  "time_zone": "Africa/Abidjan"
}


client = scheduler.CloudSchedulerClient()
parent = f'projects/{project_id}/locations/{location}' 
response = client.create_job(parent = parent, job = job)

编辑

问题是在 getJobBody 函数中,我将 base_output_directory 设置为顶级字段,而它应该是 job_spec 中的嵌套字段。问题解决了,但是有更好的方法吗?我知道有一个 CustomJobSpec class,但找不到将其转换为 Json 样式字符串的方法。

如编辑中所述,问题是在 getJobBody 函数中,base_output_directory 被设置为顶级字段,而它应该是 job_spec 中的嵌套字段。所以目前,据我所知,避免这个错误的方法是仔细设置 jobBody,我不知道有什么方法可以更结构化地做到这一点。