Influxdb Alpakka 连接器不写入数据库
Influxdb Alpakka connector not writing to database
我正在尝试写入 Influxdb(运行 在 docker 容器中,版本为 2.0)。
我正在使用 Scala 和 Reactive Streams。因此 Alpakka 连接器 (https://doc.akka.io/docs/alpakka/current/influxdb.html)
因为 Scala Reactive Client (https://github.com/influxdata/influxdb-client-java/tree/master/client-scala) 不支持写入数据库。
无论我如何尝试写入数据库,数据都没有写入。
Source
.tick(1.seconds, 1.seconds,
Seq(
InfluxDbWriteMessage.create(
Point
.measurement("cpu_load_short")
.addField("host", "server01")
.addField("value", 0.64)
.tag("region", "us-west")
.time(DateTime.now.getMillis, java.util.concurrent.TimeUnit.MILLISECONDS)
.build,
).withDatabaseName("database"),
)
)
.toMat(
InfluxDbSink.create()(
InfluxDBFactory.connect("http://localhost:9091", "admin", "admin123")
)
)(Keep.right)
.run.andThen { case Success(posts) => print("done") }
此外,“完成”从未打印出来,所以我假设未来永远不会完成,因此某个地方出现了问题。
唯一打印的是
Pong{version=2.1.1, responseTime=68}
我错过了什么,所以无法写作。是否因为 Alpakka 连接器是为 InfluxDB 之前的版本 2 编写的,因此它不起作用?
虽然我没有自己尝试,但官方InfluxDB Alpakka connector可能无法将记录写入InfluxDB 2.x,所以我想你的观察是正确的。
对我有用的是:
最新 influxdb 2.x Docker 图像和 sbt 导入:
"org.influxdb" % "influxdb-java" % "2.22",
"com.influxdb" %% "influxdb-client-scala" % "4.3.0",
"com.influxdb" % "flux-dsl" % "4.3.0",
用于从 influxdb-client-java, because of this discussion about write performance 写入同步 Java API WriteApiBlocking,诸如此类:
public CompletionStage<Done> writeTestPoints(int nPoints, String sensorID) {
List<Integer> range = IntStream.rangeClosed(1, nPoints).boxed().collect(Collectors.toList());
Source<Integer, NotUsed> source = Source.from(range);
CompletionStage<Done> done = source
.groupedWithin(10, Duration.ofMillis(100))
.mapAsyncUnordered(10, each -> this.eventHandlerPointBatch(each, influxDBClient.getWriteApiBlocking(), nPoints, sensorID))
.runWith(Sink.ignore(), system);
return done;
}
private CompletionStage<Done> eventHandlerPointBatch(List<Integer> hrs, WriteApiBlocking writeApi, int nPoints, String sensorID) {
LOGGER.info("Writing points: {}-{} ", sensorID, hrs);
List<Point> points = hrs.stream().map(each -> createPoint(nPoints, sensorID, System.nanoTime(), each)).collect(Collectors.toList());
writeApi.writePoints(points);
return CompletableFuture.completedFuture(Done.done());
}
完整示例:InfluxdbWriter
用于阅读您提到的 influxdb-client-scala 库作为“Scala Reactive Client”
完整示例:InfluxdbReader
集成测试 InfluxdbIT 通过测试容器引导 InfluxDB 2.x Docker 图像并运行上面的 类。
希望对您有所帮助
保罗
我正在尝试写入 Influxdb(运行 在 docker 容器中,版本为 2.0)。 我正在使用 Scala 和 Reactive Streams。因此 Alpakka 连接器 (https://doc.akka.io/docs/alpakka/current/influxdb.html) 因为 Scala Reactive Client (https://github.com/influxdata/influxdb-client-java/tree/master/client-scala) 不支持写入数据库。
无论我如何尝试写入数据库,数据都没有写入。
Source
.tick(1.seconds, 1.seconds,
Seq(
InfluxDbWriteMessage.create(
Point
.measurement("cpu_load_short")
.addField("host", "server01")
.addField("value", 0.64)
.tag("region", "us-west")
.time(DateTime.now.getMillis, java.util.concurrent.TimeUnit.MILLISECONDS)
.build,
).withDatabaseName("database"),
)
)
.toMat(
InfluxDbSink.create()(
InfluxDBFactory.connect("http://localhost:9091", "admin", "admin123")
)
)(Keep.right)
.run.andThen { case Success(posts) => print("done") }
此外,“完成”从未打印出来,所以我假设未来永远不会完成,因此某个地方出现了问题。
唯一打印的是
Pong{version=2.1.1, responseTime=68}
我错过了什么,所以无法写作。是否因为 Alpakka 连接器是为 InfluxDB 之前的版本 2 编写的,因此它不起作用?
虽然我没有自己尝试,但官方InfluxDB Alpakka connector可能无法将记录写入InfluxDB 2.x,所以我想你的观察是正确的。
对我有用的是:
最新 influxdb 2.x Docker 图像和 sbt 导入:
"org.influxdb" % "influxdb-java" % "2.22",
"com.influxdb" %% "influxdb-client-scala" % "4.3.0",
"com.influxdb" % "flux-dsl" % "4.3.0",
用于从 influxdb-client-java, because of this discussion about write performance 写入同步 Java API WriteApiBlocking,诸如此类:
public CompletionStage<Done> writeTestPoints(int nPoints, String sensorID) {
List<Integer> range = IntStream.rangeClosed(1, nPoints).boxed().collect(Collectors.toList());
Source<Integer, NotUsed> source = Source.from(range);
CompletionStage<Done> done = source
.groupedWithin(10, Duration.ofMillis(100))
.mapAsyncUnordered(10, each -> this.eventHandlerPointBatch(each, influxDBClient.getWriteApiBlocking(), nPoints, sensorID))
.runWith(Sink.ignore(), system);
return done;
}
private CompletionStage<Done> eventHandlerPointBatch(List<Integer> hrs, WriteApiBlocking writeApi, int nPoints, String sensorID) {
LOGGER.info("Writing points: {}-{} ", sensorID, hrs);
List<Point> points = hrs.stream().map(each -> createPoint(nPoints, sensorID, System.nanoTime(), each)).collect(Collectors.toList());
writeApi.writePoints(points);
return CompletableFuture.completedFuture(Done.done());
}
完整示例:InfluxdbWriter
用于阅读您提到的 influxdb-client-scala 库作为“Scala Reactive Client”
完整示例:InfluxdbReader
集成测试 InfluxdbIT 通过测试容器引导 InfluxDB 2.x Docker 图像并运行上面的 类。
希望对您有所帮助
保罗