从 Java 在 Azure Synapse 中提交 Spark 作业
Submit Spark job in Azure Synapse from Java
Azure Synapse 提供托管的火花池,可以在其中提交火花作业。
- 如何使用 Java
将 spark-job(作为 jar)连同依赖项一起提交到 pool2
- 如果提交了多个作业(每个作业都有自己的一组依赖项),那么这些依赖项将在作业之间共享。还是他们互不相识?
对于 (1):
添加以下依赖:
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-analytics-synapse-spark</artifactId>
<version>1.0.0-beta.4</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
</dependency>
使用以下示例代码:
import com.azure.analytics.synapse.spark.SparkBatchClient;
import com.azure.analytics.synapse.spark.SparkClientBuilder;
import com.azure.analytics.synapse.spark.models.SparkBatchJob;
import com.azure.analytics.synapse.spark.models.SparkBatchJobOptions;
import com.azure.identity.DefaultAzureCredentialBuilder;
import java.util.*;
public class SynapseService {
private final SparkBatchClient batchClient;
public SynapseService() {
batchClient = new SparkClientBuilder()
.endpoint("https://xxxx.dev.azuresynapse.net/")
.sparkPoolName("TestPool")
.credential(new DefaultAzureCredentialBuilder().build())
.buildSparkBatchClient();
}
public SparkBatchJob submitSparkJob(String name, String mainFile, String mainClass, List<String> arguments, List<String> jars) {
SparkBatchJobOptions options = new SparkBatchJobOptions()
.setName(name)
.setFile(mainFile)
.setClassName(mainClass)
.setArguments(arguments)
.setJars(jars)
.setExecutorCount(3)
.setExecutorCores(4)
.setDriverCores(4)
.setDriverMemory("6G")
.setExecutorMemory("6G");
return batchClient.createSparkBatchJob(options);
}
/**
* All possible Livy States: https://docs.microsoft.com/en-us/rest/api/synapse/data-plane/spark-batch/get-spark-batch-jobs#livystates
*
* Some of the values: busy, dead, error, idle, killed, not_Started, recovering, running, shutting_down, starting, success
* @param id
* @return
*/
public SparkBatchJob getSparkJob(int id, boolean detailed) {
return batchClient.getSparkBatchJob(id, detailed);
}
/**
* Cancels the ongoing synapse spark job
* @param jobId id of the synapse job
*/
public void cancelSparkJob(int jobId) {
batchClient.cancelSparkBatchJob(jobId);
}
}
最后提交 spark-job:
SynapseService synapse = new SynapseService();
synapse.submitSparkJob("TestJob",
"abfss://builds@xxxx.dfs.core.windows.net/core/jars/main-module_2.12-1.0.jar",
"com.xx.Main",
Collections.emptyList(),
Arrays.asList("abfss://builds@xxxx.dfs.core.windows.net/core/jars/*"));
最后,您需要提供必要的角色:
- 打开 Synapse Analytics Studio
- 管理 -> 访问控制
- 向调用者提供角色
Synapse Compute Operator
和Synapse Compute Operator
回答问题 2:
当作业通过 jar 在 synapse 中提交时,它们等同于 spark-submit
。所以所有的工作都是相互不可知的,并且不共享彼此的依赖关系。
Azure Synapse 提供托管的火花池,可以在其中提交火花作业。
- 如何使用 Java 将 spark-job(作为 jar)连同依赖项一起提交到 pool2
- 如果提交了多个作业(每个作业都有自己的一组依赖项),那么这些依赖项将在作业之间共享。还是他们互不相识?
对于 (1):
添加以下依赖:
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-analytics-synapse-spark</artifactId>
<version>1.0.0-beta.4</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
</dependency>
使用以下示例代码:
import com.azure.analytics.synapse.spark.SparkBatchClient;
import com.azure.analytics.synapse.spark.SparkClientBuilder;
import com.azure.analytics.synapse.spark.models.SparkBatchJob;
import com.azure.analytics.synapse.spark.models.SparkBatchJobOptions;
import com.azure.identity.DefaultAzureCredentialBuilder;
import java.util.*;
public class SynapseService {
private final SparkBatchClient batchClient;
public SynapseService() {
batchClient = new SparkClientBuilder()
.endpoint("https://xxxx.dev.azuresynapse.net/")
.sparkPoolName("TestPool")
.credential(new DefaultAzureCredentialBuilder().build())
.buildSparkBatchClient();
}
public SparkBatchJob submitSparkJob(String name, String mainFile, String mainClass, List<String> arguments, List<String> jars) {
SparkBatchJobOptions options = new SparkBatchJobOptions()
.setName(name)
.setFile(mainFile)
.setClassName(mainClass)
.setArguments(arguments)
.setJars(jars)
.setExecutorCount(3)
.setExecutorCores(4)
.setDriverCores(4)
.setDriverMemory("6G")
.setExecutorMemory("6G");
return batchClient.createSparkBatchJob(options);
}
/**
* All possible Livy States: https://docs.microsoft.com/en-us/rest/api/synapse/data-plane/spark-batch/get-spark-batch-jobs#livystates
*
* Some of the values: busy, dead, error, idle, killed, not_Started, recovering, running, shutting_down, starting, success
* @param id
* @return
*/
public SparkBatchJob getSparkJob(int id, boolean detailed) {
return batchClient.getSparkBatchJob(id, detailed);
}
/**
* Cancels the ongoing synapse spark job
* @param jobId id of the synapse job
*/
public void cancelSparkJob(int jobId) {
batchClient.cancelSparkBatchJob(jobId);
}
}
最后提交 spark-job:
SynapseService synapse = new SynapseService();
synapse.submitSparkJob("TestJob",
"abfss://builds@xxxx.dfs.core.windows.net/core/jars/main-module_2.12-1.0.jar",
"com.xx.Main",
Collections.emptyList(),
Arrays.asList("abfss://builds@xxxx.dfs.core.windows.net/core/jars/*"));
最后,您需要提供必要的角色:
- 打开 Synapse Analytics Studio
- 管理 -> 访问控制
- 向调用者提供角色
Synapse Compute Operator
和Synapse Compute Operator
回答问题 2:
当作业通过 jar 在 synapse 中提交时,它们等同于 spark-submit
。所以所有的工作都是相互不可知的,并且不共享彼此的依赖关系。