influxdb-java:org.influxdb.InfluxDBIOException:java.net.SocketException:对等方重置连接:套接字写入错误
influxdb-java: org.influxdb.InfluxDBIOException: java.net.SocketException: Connection reset by peer: socket write error
我正在测试 InfluxDB 以存储传感器时间序列。我正在使用 influxdb-java 客户端库(版本 2.15)并且我 运行 在本地使用 InfluxDB 1.7.6 进行测试。
我所有的点都存储在 .csv 文件中(每个传感器一个),这些文件本身存储在 .zip 文件中(每个数据集一个)。我的代码 运行 通过每个 csv 文件的每一行。分批写入。
/**
* Get the connection to the database
*/
InfluxDB influxDB = InfluxDBFactory.connect("http://192.168.51.51:8086");
influxDB.query(new Query("CREATE DATABASE theia_in_situ"));
influxDB.setDatabase("theia_in_situ");
influxDB.enableBatch();
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
/**
* Create batch point to write each measure of the time serie more efficiently
*/
BatchPoints batchPoints = BatchPoints
.database("theia_in_situ")
.build();
对于每个 CSV 数据文件,执行以下方法:
public static void createAndImportTimeSeriesDocuments(InputStream txtFileIn, String observationId, String producerId,
InfluxDB influxDB, BatchPoints batchPoints) throws IOException, ParseException {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
/**
* Store the variable name
*/
String observedProperty = null;
try (BufferedReader br = new BufferedReader(new InputStreamReader(txtFileIn));) {
String line = null;
/**
* Read the headers
*/
while ((line = br.readLine()).substring(0, 1).equals("#")) {
if (line.substring(0, 15).equals("#Variable_name;")) {
observedProperty = line.split(";")[1];
}
}
/**
* Read the data
*/
while ((line = br.readLine()) != null) {
String[] lineSplitted = line.split(";", -1);
Point point = Point.measurement(observedProperty)
.tag("producerId", producerId)
.tag("observationId", observationId)
.time(df.parse(lineSplitted[1]).getTime(), TimeUnit.MILLISECONDS)
.addField("value", lineSplitted[5])
.addField("flag", lineSplitted[6])
.build();
batchPoints.point(point);
}
influxDB.write(batchPoints);
}
}
我可以写一个或几个测量值,但很快我就得到以下异常:
Exception in thread "main" org.influxdb.InfluxDBIOException: java.net.SocketException: Connection reset by peer: socket write error
at org.influxdb.impl.InfluxDBImpl.execute(InfluxDBImpl.java:812)
at org.influxdb.impl.InfluxDBImpl.write(InfluxDBImpl.java:463)
我已经禁用了 max-concurrent-write-limit、max-enqueued-write-limit、enqueued-write-timeout(在 /etc/influxdb/influxdb.conf
中将每个值设置为 0),如前所述 here .
尽管此问题在 Github 页面中作为常见问题解答提及,但我找不到任何问题重现我的问题。
如有任何帮助,我们将不胜感激。
尝试以批处理模式写入 BatchPoint
时似乎会出现此异常。
The influxdb-java client is storing your writes into an internal
buffer and flushes them asynchronously to InfluxDB at a fixed flush
interval to achieve good performance on both client and server side.
这是更新后的代码。
/**
* Read the data
*/
while ((line = br.readLine()) != null) {
String[] lineSplitted = line.split(";", -1);
Point point = Point.measurement(observedProperty)
.tag("producerId", producerId)
.tag("observationId", observationId)
.time(df.parse(lineSplitted[1]).getTime(), TimeUnit.MILLISECONDS)
.addField("value", lineSplitted[5])
.addField("flag", lineSplitted[6])
.build();
influxDB.write(point);
// batchPoints.point(point);
}
//influxDB.write(batchPoints);
我正在测试 InfluxDB 以存储传感器时间序列。我正在使用 influxdb-java 客户端库(版本 2.15)并且我 运行 在本地使用 InfluxDB 1.7.6 进行测试。
我所有的点都存储在 .csv 文件中(每个传感器一个),这些文件本身存储在 .zip 文件中(每个数据集一个)。我的代码 运行 通过每个 csv 文件的每一行。分批写入。
/**
* Get the connection to the database
*/
InfluxDB influxDB = InfluxDBFactory.connect("http://192.168.51.51:8086");
influxDB.query(new Query("CREATE DATABASE theia_in_situ"));
influxDB.setDatabase("theia_in_situ");
influxDB.enableBatch();
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
/**
* Create batch point to write each measure of the time serie more efficiently
*/
BatchPoints batchPoints = BatchPoints
.database("theia_in_situ")
.build();
对于每个 CSV 数据文件,执行以下方法:
public static void createAndImportTimeSeriesDocuments(InputStream txtFileIn, String observationId, String producerId,
InfluxDB influxDB, BatchPoints batchPoints) throws IOException, ParseException {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
/**
* Store the variable name
*/
String observedProperty = null;
try (BufferedReader br = new BufferedReader(new InputStreamReader(txtFileIn));) {
String line = null;
/**
* Read the headers
*/
while ((line = br.readLine()).substring(0, 1).equals("#")) {
if (line.substring(0, 15).equals("#Variable_name;")) {
observedProperty = line.split(";")[1];
}
}
/**
* Read the data
*/
while ((line = br.readLine()) != null) {
String[] lineSplitted = line.split(";", -1);
Point point = Point.measurement(observedProperty)
.tag("producerId", producerId)
.tag("observationId", observationId)
.time(df.parse(lineSplitted[1]).getTime(), TimeUnit.MILLISECONDS)
.addField("value", lineSplitted[5])
.addField("flag", lineSplitted[6])
.build();
batchPoints.point(point);
}
influxDB.write(batchPoints);
}
}
我可以写一个或几个测量值,但很快我就得到以下异常:
Exception in thread "main" org.influxdb.InfluxDBIOException: java.net.SocketException: Connection reset by peer: socket write error at org.influxdb.impl.InfluxDBImpl.execute(InfluxDBImpl.java:812) at org.influxdb.impl.InfluxDBImpl.write(InfluxDBImpl.java:463)
我已经禁用了 max-concurrent-write-limit、max-enqueued-write-limit、enqueued-write-timeout(在 /etc/influxdb/influxdb.conf
中将每个值设置为 0),如前所述 here .
尽管此问题在 Github 页面中作为常见问题解答提及,但我找不到任何问题重现我的问题。
如有任何帮助,我们将不胜感激。
尝试以批处理模式写入 BatchPoint
时似乎会出现此异常。
The influxdb-java client is storing your writes into an internal buffer and flushes them asynchronously to InfluxDB at a fixed flush interval to achieve good performance on both client and server side.
这是更新后的代码。
/**
* Read the data
*/
while ((line = br.readLine()) != null) {
String[] lineSplitted = line.split(";", -1);
Point point = Point.measurement(observedProperty)
.tag("producerId", producerId)
.tag("observationId", observationId)
.time(df.parse(lineSplitted[1]).getTime(), TimeUnit.MILLISECONDS)
.addField("value", lineSplitted[5])
.addField("flag", lineSplitted[6])
.build();
influxDB.write(point);
// batchPoints.point(point);
}
//influxDB.write(batchPoints);