如何通过 Apache Beam 将文件上传到 Azure blob 存储?
How to upload a file to Azure blob Storage by Apache Beam?
我想通过 Apache Beam 将文件上传到 Azure blob。
但是,我做不到。为什么?
我设置了正确的环境变量。
az
命令正常:
$ az storage blob upload \
-c AZURE_STORAGE_CONTAINER_NAME \
-f example.json -n example.json \
--account-name $AZURE_STORAGE_ACCOUNT \
--account-key $AZURE_STORAGE_KEY
Finished[#############################################################] 100.0000%
{
"etag": "\"0x8D9B92C4C0BE870\"",
"lastModified": "2021-12-07T02:50:15+00:00"
}
但是,下面的命令运行:
$ mvn compile exec:java -Dexec.mainClass=jp.example.Indexer \
-Dexec.args="--runner=DirectRunner \
--destination=azfs://$AZURE_STORAGE_ACCOUNT/$AZURE_STORAGE_CONTAINER_NAME/example.json \
--source=example.json \
--azureConnectionString=$AZURE_CONNECTION_STRING \
--sasToken=$AZURE_STORAGE_SAS_TOKEN \
--accessKey=$AZURE_STORAGE_KEY \
--accountName=$AZURE_STORAGE_ACCOUNT
然后,出现以下错误:
[WARNING]
java.lang.IllegalArgumentException: PipelineOptions specified failed to serialize to JSON.
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:171)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
at jp.example.Indexer.main (Indexer.java:24)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'azureCredentialsProvider' with value 'com.azure.identity.DefaultAzureCredential@3e88886c'
at com.fasterxml.jackson.databind.JsonMappingException.fromUnexpectedIOE (JsonMappingException.java:334)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes (ObjectMapper.java:3769)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:168)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
at jp.example.Indexer.main (Indexer.java:24)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 13.564 s
[INFO] Finished at: 2021-12-07T11:58:57+09:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project Indexer: An exception occured while executing the Java class. PipelineOptions specified failed to serialize to JSON.: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'azureCredentialsProvider' with value 'com.azure.identity.DefaultAzureCredential@3e88886c' -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
Indexer.java
来了。
package jp. example;
import java.util.logging.Logger;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
public class Indexer {
private static final Logger LOG = Logger.getLogger(Indexer.class.getName());
public static void main(String[] args) {
ToAzurePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(ToAzurePipelineOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply(TextIO.read().from(options.getSource()));
lines.apply(TextIO.write().to(options.getDestination()));
p.run();
}
}
ToAzurePipelineOptions.java
在这里。
package jp.example;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
public interface ToAzurePipelineOptions extends DataflowPipelineOptions, BlobstoreOptions{
@Description("Root path of data files")
@Default.String("file://hoge")
String getSource();
void setSource(String value);
@Description("Address of Azure Storage")
@Default.String("azfs://hoge")
String getDestination();
void setDestination(String value);
}
beam-sdks-java-io-azure
和beam-sdks-java-core
的版本是2.31.0
。
我猜这可能是因为 TokenCredentialSerializer
仅在 Beam 2.33.0 中是 implemented。您能否将 Beam 依赖项至少升级到 Beam 2.33.0,看看它是否能解决问题?
我想通过 Apache Beam 将文件上传到 Azure blob。 但是,我做不到。为什么?
我设置了正确的环境变量。
az
命令正常:
$ az storage blob upload \
-c AZURE_STORAGE_CONTAINER_NAME \
-f example.json -n example.json \
--account-name $AZURE_STORAGE_ACCOUNT \
--account-key $AZURE_STORAGE_KEY
Finished[#############################################################] 100.0000%
{
"etag": "\"0x8D9B92C4C0BE870\"",
"lastModified": "2021-12-07T02:50:15+00:00"
}
但是,下面的命令运行:
$ mvn compile exec:java -Dexec.mainClass=jp.example.Indexer \
-Dexec.args="--runner=DirectRunner \
--destination=azfs://$AZURE_STORAGE_ACCOUNT/$AZURE_STORAGE_CONTAINER_NAME/example.json \
--source=example.json \
--azureConnectionString=$AZURE_CONNECTION_STRING \
--sasToken=$AZURE_STORAGE_SAS_TOKEN \
--accessKey=$AZURE_STORAGE_KEY \
--accountName=$AZURE_STORAGE_ACCOUNT
然后,出现以下错误:
[WARNING]
java.lang.IllegalArgumentException: PipelineOptions specified failed to serialize to JSON.
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:171)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
at jp.example.Indexer.main (Indexer.java:24)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'azureCredentialsProvider' with value 'com.azure.identity.DefaultAzureCredential@3e88886c'
at com.fasterxml.jackson.databind.JsonMappingException.fromUnexpectedIOE (JsonMappingException.java:334)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes (ObjectMapper.java:3769)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:168)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
at jp.example.Indexer.main (Indexer.java:24)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 13.564 s
[INFO] Finished at: 2021-12-07T11:58:57+09:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project Indexer: An exception occured while executing the Java class. PipelineOptions specified failed to serialize to JSON.: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'azureCredentialsProvider' with value 'com.azure.identity.DefaultAzureCredential@3e88886c' -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
Indexer.java
来了。
package jp. example;
import java.util.logging.Logger;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
public class Indexer {
private static final Logger LOG = Logger.getLogger(Indexer.class.getName());
public static void main(String[] args) {
ToAzurePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(ToAzurePipelineOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply(TextIO.read().from(options.getSource()));
lines.apply(TextIO.write().to(options.getDestination()));
p.run();
}
}
ToAzurePipelineOptions.java
在这里。
package jp.example;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
public interface ToAzurePipelineOptions extends DataflowPipelineOptions, BlobstoreOptions{
@Description("Root path of data files")
@Default.String("file://hoge")
String getSource();
void setSource(String value);
@Description("Address of Azure Storage")
@Default.String("azfs://hoge")
String getDestination();
void setDestination(String value);
}
beam-sdks-java-io-azure
和beam-sdks-java-core
的版本是2.31.0
。
我猜这可能是因为 TokenCredentialSerializer
仅在 Beam 2.33.0 中是 implemented。您能否将 Beam 依赖项至少升级到 Beam 2.33.0,看看它是否能解决问题?