HTTP500 操作 APPEND 失败?
Operation APPEND failed with HTTP500?
package org.apache.spark.examples.kafkaToflink;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.microsoft.azure.datalake.store.ADLException;
import com.microsoft.azure.datalake.store.ADLFileOutputStream;
import com.microsoft.azure.datalake.store.ADLStoreClient;
import com.microsoft.azure.datalake.store.IfExists;
import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider;
import scala.util.parsing.combinator.testing.Str;
public class App {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.72:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(
new FlinkKafkaConsumer010<String>("tenant", new SimpleStringSchema(), properties), "Kafka_Source");
stream.addSink(new ADLSink()).name("Custom_Sink").setParallelism(128);
env.execute("App");
}
}
class ADLSink<String> extends RichSinkFunction<String> {
private java.lang.String clientId = "***********";
private java.lang.String authTokenEndpoint = "***************";
private java.lang.String clientKey = "*****************";
private java.lang.String accountFQDN = "****************";
private java.lang.String filename = "/Bitfinex/ETHBTC/ORDERBOOK/ORDERBOOK.json";
@Override
public void invoke(String value) {
AccessTokenProvider provider = new ClientCredsTokenProvider(authTokenEndpoint, clientId, clientKey);
ADLStoreClient client = ADLStoreClient.createClient(accountFQDN, provider);
try {
client.setPermission(filename, "744");
ADLFileOutputStream stream = client.getAppendStream(filename);
System.out.println(value);
stream.write(value.toString().getBytes());
stream.close();
} catch (ADLException e) {
System.out.println(e.requestId);
} catch (Exception e) {
System.out.println(e.getMessage());
System.out.println(e.getCause());
}
}
}
我一直在尝试使用 while 循环附加一个位于 Azure 数据湖存储中的文件。但有时它会给出这个,操作附加失败,HTTP500,启动错误或有时在 10 分钟后。我正在使用 java
Anubhav,Azure Data Lake 流是单写入器流 - 也就是说,您不能从多个线程写入同一流,除非您在这些线程之间进行某种形式的同步。这是因为每次写入都指定了它写入的偏移量,而对于多个线程,偏移量并不一致。
您似乎是从多个线程编写的(.setParallelism(128)
在您的代码中调用)
对于你的情况,你有两个选择:
- 在每个线程中写入不同的文件。我不知道你的用例,但我们发现在很多情况下,不同线程的自然使用 - 写入不同的文件。
- 如果让所有线程写入同一个文件很重要,那么您将需要稍微重构接收器,以便所有实例都引用同一个
ADLFileOutputStream
,并且您将需要确保对 write()
和 close()
的调用是同步的。
现在,这里还有一个问题 - 您得到的错误应该是 HTTP 4xx 错误(表示租约冲突,因为 ADLFileOutputStream
s 获取租约),而不是 HTTP 500,它说服务器端出现问题。要解决此问题,我需要知道您的帐户名和访问时间。该信息在 Whosebug 上共享是不安全的,因此请为此打开支持票并参考这个 SO 问题,这样问题最终会转交给我。
package org.apache.spark.examples.kafkaToflink;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.microsoft.azure.datalake.store.ADLException;
import com.microsoft.azure.datalake.store.ADLFileOutputStream;
import com.microsoft.azure.datalake.store.ADLStoreClient;
import com.microsoft.azure.datalake.store.IfExists;
import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider;
import scala.util.parsing.combinator.testing.Str;
public class App {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.72:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(
new FlinkKafkaConsumer010<String>("tenant", new SimpleStringSchema(), properties), "Kafka_Source");
stream.addSink(new ADLSink()).name("Custom_Sink").setParallelism(128);
env.execute("App");
}
}
class ADLSink<String> extends RichSinkFunction<String> {
private java.lang.String clientId = "***********";
private java.lang.String authTokenEndpoint = "***************";
private java.lang.String clientKey = "*****************";
private java.lang.String accountFQDN = "****************";
private java.lang.String filename = "/Bitfinex/ETHBTC/ORDERBOOK/ORDERBOOK.json";
@Override
public void invoke(String value) {
AccessTokenProvider provider = new ClientCredsTokenProvider(authTokenEndpoint, clientId, clientKey);
ADLStoreClient client = ADLStoreClient.createClient(accountFQDN, provider);
try {
client.setPermission(filename, "744");
ADLFileOutputStream stream = client.getAppendStream(filename);
System.out.println(value);
stream.write(value.toString().getBytes());
stream.close();
} catch (ADLException e) {
System.out.println(e.requestId);
} catch (Exception e) {
System.out.println(e.getMessage());
System.out.println(e.getCause());
}
}
}
我一直在尝试使用 while 循环附加一个位于 Azure 数据湖存储中的文件。但有时它会给出这个,操作附加失败,HTTP500,启动错误或有时在 10 分钟后。我正在使用 java
Anubhav,Azure Data Lake 流是单写入器流 - 也就是说,您不能从多个线程写入同一流,除非您在这些线程之间进行某种形式的同步。这是因为每次写入都指定了它写入的偏移量,而对于多个线程,偏移量并不一致。
您似乎是从多个线程编写的(.setParallelism(128)
在您的代码中调用)
对于你的情况,你有两个选择:
- 在每个线程中写入不同的文件。我不知道你的用例,但我们发现在很多情况下,不同线程的自然使用 - 写入不同的文件。
- 如果让所有线程写入同一个文件很重要,那么您将需要稍微重构接收器,以便所有实例都引用同一个
ADLFileOutputStream
,并且您将需要确保对write()
和close()
的调用是同步的。
现在,这里还有一个问题 - 您得到的错误应该是 HTTP 4xx 错误(表示租约冲突,因为 ADLFileOutputStream
s 获取租约),而不是 HTTP 500,它说服务器端出现问题。要解决此问题,我需要知道您的帐户名和访问时间。该信息在 Whosebug 上共享是不安全的,因此请为此打开支持票并参考这个 SO 问题,这样问题最终会转交给我。