如何将 25k 记录放入运动流和测试工具以确认它

How to put 25k record to kinesis stream and Test tool to acknowledge it

我开发了一款软件,可以将记录写入 Amazon kinesis Stream 网络服务。我想了解我们是否有任何软件工具可以让我测量我的代码在一秒钟内为 1 个分片生成到 Kinesis Stream 的最大吞吐量。 是的,我同意这也取决于硬件配置。但是首先我想知道通用机器然后我可能会看到水平可扩展性

有了这个,我试图实现每秒 25k 条记录来写入运动流

参考资料:Kinesis http://aws.amazon.com/kinesis/

我相信你可以使用 Apache JMeter 作为

  1. 下载并安装 JMeter
  2. 下载 Amazon Kinesis Java Client Library 并将 jars 拖放到 JMeter 类路径(您可以使用 JMeter 安装的 /lib 文件夹)
  3. 使用JSR223 Sampler, "groovy" as a language and AmazonKinesisRecordProducerSample作为参考实现将记录写入流的代码

有关安装 "groovy" 引擎支持和脚本最佳实践的说明,请参阅 Beanshell vs JSR223 vs Java JMeter Scripting: The Performance-Off You've Been Waiting For! 指南。

感谢您的提示。我已经找到了一种方法,可以让 groovy 中的工作代码使用 AWS-Java-SDK 使用 Kinesis Stream 发送记录: 这是示例代码:

/*
 * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.nio.ByteBuffer
import java.util.List
import java.util.concurrent.TimeUnit

import com.amazonaws.AmazonClientException
import com.amazonaws.AmazonServiceException
import com.amazonaws.auth.AWSCredentials
import com.amazonaws.auth.profile.ProfileCredentialsProvider
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model.CreateStreamRequest
import com.amazonaws.services.kinesis.model.DescribeStreamRequest
import com.amazonaws.services.kinesis.model.DescribeStreamResult
import com.amazonaws.services.kinesis.model.ListStreamsRequest
import com.amazonaws.services.kinesis.model.ListStreamsResult
import com.amazonaws.services.kinesis.model.PutRecordRequest
import com.amazonaws.services.kinesis.model.PutRecordResult
import com.amazonaws.services.kinesis.model.ResourceNotFoundException
import com.amazonaws.services.kinesis.model.StreamDescription

 class AmazonKinesisRecordProducerSample {

    /*
     * Before running the code:
     *      Fill in your AWS access credentials in the provided credentials
     *      file template, and be sure to move the file to the default location
     *      (~/.aws/credentials) where the sample code will load the
     *      credentials from.
     *      https://console.aws.amazon.com/iam/home?#security_credential
     *
     * WARNING:
     *      To avoid accidental leakage of your credentials, DO NOT keep
     *      the credentials file in your source directory.
     */

     def kinesis

    def init() {
        /*
         * The ProfileCredentialsProvider will return your [default]
         * credential profile by reading from the credentials file located at
         * (~/.aws/credentials).
         */
            AWSCredentials credentials = null
            credentials = new ProfileCredentialsProvider().getCredentials()
             kinesis = new AmazonKinesisClient(credentials)         
    }    
}

 def amazonKinesisRecordProducerSample= new AmazonKinesisRecordProducerSample() 
amazonKinesisRecordProducerSample.init()

  def myStreamName="<KINESIS STREAM NAME>"

        println("Press CTRL-C to stop.")
        // Write records to the stream until this program is aborted.
        while (true) {
            def createTime = System.currentTimeMillis()
            def data='<Data IN STRING FORMAT>'
            def partitionkey="<PARTITION KEY>"
            def putRecordRequest = new PutRecordRequest()
            putRecordRequest.setStreamName(myStreamName)
           putRecordRequest.setData(ByteBuffer.wrap(String.valueOf(data).getBytes()))
          putRecordRequest.setPartitionKey(partitionkey)
            def putRecordResult = new PutRecordResult()
            putRecordResult = amazonKinesisRecordProducerSample.kinesis.putRecord(putRecordRequest)
            printf("Successfully put record, partition key : %s, ShardID : %s, SequenceNumber : %s.\n",
                    putRecordRequest.getPartitionKey(),
                    putRecordResult.getShardId(),
                    putRecordResult.getSequenceNumber())
        }

注意:仅当您已经创建了 Kinesis 流并且 enabled.If 您需要创建流然后使用它时,此代码才有效,请参考 aws-java 中给出的代码示例-sdk 源文件夹。