Influxdb Alpakka 连接器不写入数据库

Influxdb Alpakka connector not writing to database

我正在尝试写入 Influxdb(运行 在 docker 容器中,版本为 2.0)。 我正在使用 Scala 和 Reactive Streams。因此 Alpakka 连接器 (https://doc.akka.io/docs/alpakka/current/influxdb.html) 因为 Scala Reactive Client (https://github.com/influxdata/influxdb-client-java/tree/master/client-scala) 不支持写入数据库。

无论我如何尝试写入数据库,数据都没有写入。

Source
    .tick(1.seconds, 1.seconds,
      Seq(
        InfluxDbWriteMessage.create(
          Point
            .measurement("cpu_load_short")
            .addField("host", "server01")
            .addField("value", 0.64)
            .tag("region", "us-west")
            .time(DateTime.now.getMillis, java.util.concurrent.TimeUnit.MILLISECONDS)
            .build,
        ).withDatabaseName("database"),
      )
    )
    .toMat(
      InfluxDbSink.create()(
        InfluxDBFactory.connect("http://localhost:9091", "admin", "admin123")
      )
    )(Keep.right)
    .run.andThen { case Success(posts) => print("done") }
  

此外,“完成”从未打印出来,所以我假设未来永远不会完成,因此某个地方出现了问题。

唯一打印的是

Pong{version=2.1.1, responseTime=68}

我错过了什么,所以无法写作。是否因为 Alpakka 连接器是为 InfluxDB 之前的版本 2 编写的,因此它不起作用?

虽然我没有自己尝试,但官方InfluxDB Alpakka connector可能无法将记录写入InfluxDB 2.x,所以我想你的观察是正确的。

对我有用的是:

最新 influxdb 2.x Docker 图像和 sbt 导入:

  "org.influxdb" % "influxdb-java" % "2.22",
  "com.influxdb" %% "influxdb-client-scala" % "4.3.0",
  "com.influxdb" % "flux-dsl" % "4.3.0",

用于从 influxdb-client-java, because of this discussion about write performance 写入同步 Java API WriteApiBlocking,诸如此类:

public CompletionStage<Done> writeTestPoints(int nPoints, String sensorID) {
    List<Integer> range = IntStream.rangeClosed(1, nPoints).boxed().collect(Collectors.toList());
    Source<Integer, NotUsed> source = Source.from(range);
    CompletionStage<Done> done = source
            .groupedWithin(10, Duration.ofMillis(100))
            .mapAsyncUnordered(10, each -> this.eventHandlerPointBatch(each, influxDBClient.getWriteApiBlocking(), nPoints, sensorID))
            .runWith(Sink.ignore(), system);
    return done;
}


private CompletionStage<Done> eventHandlerPointBatch(List<Integer> hrs, WriteApiBlocking writeApi, int nPoints, String sensorID) {
    LOGGER.info("Writing points: {}-{} ", sensorID, hrs);
    List<Point> points = hrs.stream().map(each -> createPoint(nPoints, sensorID, System.nanoTime(), each)).collect(Collectors.toList());
    writeApi.writePoints(points);
    return CompletableFuture.completedFuture(Done.done());
}

完整示例:InfluxdbWriter

用于阅读您提到的 influxdb-client-scala 库作为“Scala Reactive Client”

完整示例:InfluxdbReader

集成测试 InfluxdbIT 通过测试容器引导 InfluxDB 2.x Docker 图像并运行上面的 类。

希望对您有所帮助

保罗