使用Jmeter的HTTP Request将记录放入Amazon Kinesis

Using HTTP Request of Jmeter to put records into Amazon Kinesis

我使用 jmeter 为我的 web 服务 REST 创建 HTTP 请求。 现在我想使用 PutRecords 方法将此请求发送到 Amazon kinesis,但我不知道如何创建请求,特别是如何在 kinesis 中设置 Headers 字段以进行签名和身份验证。 有人使用过休息请求吗? 谢谢

根据 PutRecords API reference 示例请求应如下所示

POST / HTTP/1.1
Host: kinesis.<region>.<domain>
x-amz-Date: <Date>
Authorization: AWS4-HMAC-SHA256 Credential=<Credential>,     SignedHeaders=content-type;date;host;user-agent;x-amz-date;x-amz-target;x-  amzn-requestid, Signature=<Signature>
User-Agent: <UserAgentString>
Content-Type: application/x-amz-json-1.1
Content-Length: <PayloadSizeBytes>
Connection: Keep-Alive 
X-Amz-Target: Kinesis_20131202.PutRecords

所以至少需要满足以下条件:

  • 内容类型
  • X-Amz-目标
  • 授权
  • x-amz-日期

您可以添加 HTTP Header Manager 以将它们添加到您的请求中。

JMeter 应该自己填充 Content-Length、Connection 和 Host。

这个问题很老了,我不记得了,但如果有人需要,这是代码: BeanShell 采样器

import org.MyKinesisClient;

//Create a controller object every time Jmeter starts 
MyKinesisClient controller=new MyKinesisClient(vars.get("accessKey"),vars.get("secretKey"),vars.get("endpoint"),vars.get("serviceName"),vars.get("regionId"));
bsh.shared.controller=controller;

这是最后一个代码:

import com.amazonaws.util.json.JSONArray;
import com.amazonaws.util.json.JSONObject;
import org.MyKinesisClient;

//Variables
int timestampValue=(${i});
String idValue=${__threadNum}+"_"+"1";
JSONObject part = new JSONObject();

//Inserimento campi Json
part.put("updated",timestampValue);
part.put("parent","${__threadNum}");
part.put("id",idValue);
part.put("thingClass","CosyInverter");
part.put("mac_address_w","${mac_address_w_1}");
//Other put
.... 

//Send Json to kinesis
MyKinesisClient controller=bsh.shared.controller;
controller.sendJson(part, ${__Random(0,${__threadNum})},vars.get("streamName"));

我的代码比上面的更复杂(数据库查询等...),但我希望这是你需要的。

这是关于 MyKinesisClient

的 java 代码
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.util.json.JSONArray;
import com.amazonaws.util.json.JSONException;
import com.amazonaws.util.json.JSONObject;

/**
 * Class useful to send Json to Amazon Kinesis.
 * @author l.calicchio
 *
 */
public class MyKinesisClient {
    private AmazonKinesisClient kinesisClient;

    /**
     * Class constructor. Allow all parameter setting
     * @param accessKey: access key for kinesis connection
     * @param secretKey: secret key for kinesis connection
     * @param endpoint: Kinesis endpoint 
     * @param serviceName: Amazon service name
     * @param regionId: region id of kinesis endpoint 
     */
    public MyKinesisClient(String accessKey, String secretKey, String endpoint, String serviceName, String regionId ) {
        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
        kinesisClient = new AmazonKinesisClient(credentials);
        kinesisClient.setEndpoint(endpoint,serviceName,regionId);
    }

    /**
     * Method useful to send the json to Kinesis
     * @param json: com.amazonaws.util.json.JSONObject to be sent
     * @param partitionKey: partition key for Kinesis stream 
     * @param streamName: name of Kinesis stream 
     * @throws UnsupportedEncodingException
     * @throws JSONException
     */
    public void sendJson(JSONObject json, int partitionKey, String streamName) throws UnsupportedEncodingException, JSONException {
        try{
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setStreamName(streamName);
        putRecordRequest.setData(ByteBuffer.wrap(json.toString().getBytes("utf-8")));
        putRecordRequest.setPartitionKey(String.format("partitionKey-%d", partitionKey));
        kinesisClient.putRecord(putRecordRequest);
        }catch(Exception e){
            System.out.println(e.getMessage());
        }
    }   

    /**
     * Method useful to send the json Array to Kinesis
     * @param json: com.amazonaws.util.json.JSONObject Array to be sent
     * @param partitionKey: partition key for Kinesis stream
     * @param streamName: Kinesis stream name
     */
    public void sendJsonArray(JSONArray json,int partitionKey, String streamName) {
        try{
        PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);            
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<PutRecordsRequestEntry>();
        for(int i=0;i<json.length();i++){           
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(json.getJSONObject(i).toString().getBytes("utf-8")));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", partitionKey));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }
        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        kinesisClient.putRecords(putRecordsRequest);
        }catch(Exception e){
            System.out.println(e.getMessage());
        }
    }
}