Alpakka KinesisSink:无法将消息推送到 Stream

Alpakka KinesisSink : Can not push messages to Stream

我正在尝试使用 alpakka kinesis connector 向 Kinesis Stream 发送消息,但没有成功。我尝试了下面的代码,但我的流中没有任何内容。

implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()


val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
    println(reqEntry)
    reqEntry
}

val entry = new PutRecordsRequestEntry()
    .withData(ByteBuffer.wrap("Hello World".getBytes))
    .withPartitionKey(Random.nextInt.toString)

Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()

// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()

我做错了什么?

我正在用 KinesisSource 检查我的流并且阅读正常(用另一个流测试)

AWS Kinesis 的监控仪表板也没有显示任何 PUT 请求。

注1:我尝试开启alpakka的调试日志,但没有效果

<logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>

在我的 logback.xml + 根级别调试

下面要考虑的一些故障排除步骤 - 希望它们有所帮助。

我怀疑您可能缺少 Kinesis 客户端的凭据 and/or 区域配置。

Kinesis Firehose

Kinesis Producer Library(Alpakka 似乎正在使用的)不适用于 Kinesis Firehose。如果您尝试写信给 Firehose,这是行不通的。

应用程序日志记录

您可能希望为 Kinesis Producer Library 启用日志记录,而不仅仅是在 Alpakka 本身。相关文档可在此处获得:

Configuring the Kinesis Producer Library

Configuration Defaults for Kinesis Producer Library

AWS 端日志记录

AWS CloudTrail 开箱即用地自动启用 Kinesis 流,默认情况下 AWS 将为您保留 90 天的 CloudTrail 日志。

https://docs.aws.amazon.com/streams/latest/dev/logging-using-cloudtrail.html

您可以使用 CloudTrail 日志查看您的应用程序代表您对 Kinesis 进行的 API 调用。请求的显示通常会有适度的延迟 - 但这会让您知道请求是否因 IAM 权限不足或您的 AWS 资源配置的其他问题而失败。

检查SDK认证

Kinesis 客户端将使用 DefaultAWSCredentialsProviderChain 凭证提供程序向 AWS 发出请求。

您需要确保提供具有 IAM 权限的有效 AWS 凭证才能向 Kinesis 发出这些请求。如果您的代码在 AWS 上是 运行,则为您的应用程序提供凭据的首选方式是使用 IAM Roles(在实例启动时指定)。

在代码中构建客户端时,您还需要指定 AWS 区域。使用您的 application.properties 进行配置,或者如果您的应用程序是位于单个区域中的 CloudFormation 堆栈的一部分 - 当您的代码为 运行 时,使用 instance metadata 服务检索当前区域在 AWS 上。

问题是对流上的操作的访问被拒绝/权限。

我必须添加用于日志记录的 akka actor 配置

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
  stdout-loglevel = "DEBUG"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  logger-startup-timeout = "30s"
}

查看调试行,我实际上 运行 在调试中并在每个阶段执行步骤。

它需要 IAM 角色"PutRecords"的权限