如何扇出 AWS 运动流?

How to fanout an AWS kinesis stream?

我想fanout/chain/replicate一个输入AWS Kinesis流到N个新的Kinesis流,这样写入输入Kinesis的每条记录都会出现在每个N 条流。

是否有AWS 服务或开源解决方案

如果有现成的解决方案,我不想编写代码来执行此操作。 AWS Kinesis firehose 是一个没有解决方案,因为它不能输出到 kinesis。如果 AWS Lambda 解决方案不会太贵 运行?

您可以通过两种方式完成 fan-out Amazon Kinesis 流:

  • 使用 Amazon Kinesis Analytics 将记录复制到其他流
  • 触发 AWS Lambda 函数将记录复制到另一个流

选项 1:使用 Amazon Kinesis Analytics fan-out

您可以使用 Amazon Kinesis Analytics 从现有流生成新流。

来自Amazon Kinesis Analytics documentation:

Amazon Kinesis Analytics applications continuously read and process streaming data in real-time. You write application code using SQL to process the incoming streaming data and produce output. Then, Amazon Kinesis Analytics writes the output to a configured destination.

Fan-out在Application Code部分提到:

You can also write SQL queries that run independent of each other. For example, you can write two SQL statements that query the same in-application stream, but send output into different in-applications streams.

我设法实现如下:

  • 创建了三个流:输入、输出 1、输出 2
  • 创建了两个 Amazon Kinesis Analytics 应用程序:copy1、copy2

Amazon Kinesis Analytics SQL 应用程序如下所示:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));

CREATE OR REPLACE PUMP "COPY_PUMP1" AS
  INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";

此代码创建一个 pump(将其视为连续的 select 语句),select 来自 input 流,并且输出到 output1 流。我创建了另一个输出到 output2 流的相同应用程序。

为了测试,我将数据发送到 input 流:

#!/usr/bin/env python

import json, time
from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
i = 0

while True:
  data={}
  data['log'] =  'Record ' + str(i)
  i += 1
  print data
  kinesis.put_record("input", json.dumps(data), "key")
  time.sleep(2)

我让它 运行 一段时间,然后使用此代码显示输出:

from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]

输出是:

[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']

我再次 运行 output2 并显示相同的输出。

选项 2:使用 AWS Lambda

如果您 fanning-out 许多流,更有效的方法可能是创建 AWS Lambda 函数:

  • 由 Amazon Kinesis 流记录触发
  • 将记录写入多个 Amazon Kinesis 'output' 流

您甚至可以使用 Lambda 函数 self-discover 基于命名约定的输出流(例如任何名为 app-output-* 的流)。

Amazon 实验室有一个 github 存储库,使用 lambda 提供扇出。 https://github.com/awslabs/aws-lambda-fanout . Also read "Transforming a synchronous Lambda invocation into an asynchronous one" on https://medium.com/retailmenot-engineering/building-a-high-throughput-data-pipeline-with-kinesis-lambda-and-dynamodb-7d78e992a02d ,这对于构建真正的异步处理至关重要。

有两种不需要 AWS Firehose 或 AWS Lambda 即可展开 Kinesis 流的 AWS 本机解决方案。

  1. 与Kafka消费者群体类似,Kinesis也有应用名称。流的每个消费者都可以提供唯一的应用程序名称。如果两个消费者具有相同的应用程序名称,则消息将在它们之间分发。要扇出流,请为您希望从流中接收相同消息的那些消费者提供不同的应用程序名称。 Kinesis 将在幕后创建新的 DynamoDB 表来跟踪每个新应用程序的每个消费者,以便他们可以以不同的速率消费消息等。
  2. 使用 Kinesis Enhanced Fan-Out 以获得更高的吞吐量(每秒高达 2MiB),这不计入您的全局读取限制。在撰写本文时,每个流有 20 个 "enhanced fan-out" 消费者的限制。

据我所知,关于这两个选项的一个警告是您需要使用 Kinesis Client Library (KCL) (and not the raw AWS SDK).