在写入 Kinesis Firehose 流的 java 程序中出现错误
Getting errors in java program to write to Kinesis Firehose stream
我正在尝试将来自 API(google stocks/finance API)的一些数据写入我的 AWS Firehose 流。我已经在 Eclipse 上下载并安装了 AWS 插件,在 AWS 上设置了我的 Firehose 流,一切似乎都已正确设置。不过,我遇到了一些问题。以下行似乎已被弃用...我尝试了亚马逊 SDK 的不同变体,但我似乎无法获得正确的代码。
AmazonKinesisFirehoseClient firehoseClient = new
AmazonKinesisFirehoseClient(credentials);
接下来,我遇到了以下错误。具体的错误是,“The method setRecord(Record) is undefined for the type PutRecordRequest,” 即使我直接从亚马逊的 API 引用中获取它。
request.setRecord(record);
firehoseClient.putRecord(request);
上面第二行也报错:"The method putRecord(com.amazonaws.services.kinesisfirehose.model.PutRecordRequest) in the type AmazonKinesisFirehoseClient is not applicable for the arguments (com.amazonaws.services.kinesis.model.PutRecordRequest)"
package com.amazonaws.samples;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import org.apache.http.client.CredentialsProvider;
import com.amazonaws.*;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.Record;
public class FirehoseExample {
public static void main(String[] args) {
AWSCredentials credentials = null;
try {
credentials = new ProfileCredentialsProvider().getCredentials();
}
catch (Exception e) {
throw new AmazonClientException("Cannot load the credentials from the credential profiles file. "
+ "Please make sure that your credentials file is at the correct "
+ "location (/Users/elybenari/.aws/credentials), and is in valid format.", e);
}
AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);
PutRecordRequest request = new PutRecordRequest();
request.setStreamName("project-stream");
Record record = new Record();
for (int i = 0; i < 20*60; i++){
try {
URL url = new URL("https://www.google.com/finance/info?q=NASDAQ:AMZN");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
StringBuilder response = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
response.append(line);
}
reader.close();
System.out.println(response.toString().replace("\n", "").replaceAll(" ", ""));
System.out.println("****\n");
ByteBuffer buffer = ByteBuffer.wrap(response.toString().replace("\n", "").replaceAll(" ", "").getBytes());
record.setData(buff);
request.setRecord(record);
firehoseClient.putRecord(request);
Thread.sleep(2000);
}
catch(Exception e){
e.printStackTrace();
}
}
}
}
问题是您包含了来自 Kinesis 的一些 类, 而不是 Kinesis Firehose,Java 包。例如,您使用过:
import com.amazonaws.services.kinesis.model.PutRecordRequest;
然而,您应该使用:
import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest;
Kinesis、Kinesis Firehose 和 Kinesis Analytics 是不同的服务,尽管它们属于 AWS 上的流服务的一个保护伞。因此,它们在 Java SDK 中具有不同的包名称空间。如果从官方文档入手here, you'll reach the correct Java SDK reference here.
编辑:回答您的其他问题:是的,以下内容已弃用:
AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);
您应该改为使用以下内容:
AmazonKinesisFirehoseClient firehoseClient = AmazonKinesisFirehoseClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCredentials)).build();
参考官方文档here如何正确初始化AmazonKinesisFirehoseClient
我正在尝试将来自 API(google stocks/finance API)的一些数据写入我的 AWS Firehose 流。我已经在 Eclipse 上下载并安装了 AWS 插件,在 AWS 上设置了我的 Firehose 流,一切似乎都已正确设置。不过,我遇到了一些问题。以下行似乎已被弃用...我尝试了亚马逊 SDK 的不同变体,但我似乎无法获得正确的代码。
AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);
接下来,我遇到了以下错误。具体的错误是,“The method setRecord(Record) is undefined for the type PutRecordRequest,” 即使我直接从亚马逊的 API 引用中获取它。
request.setRecord(record);
firehoseClient.putRecord(request);
上面第二行也报错:"The method putRecord(com.amazonaws.services.kinesisfirehose.model.PutRecordRequest) in the type AmazonKinesisFirehoseClient is not applicable for the arguments (com.amazonaws.services.kinesis.model.PutRecordRequest)"
package com.amazonaws.samples;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import org.apache.http.client.CredentialsProvider;
import com.amazonaws.*;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.Record;
public class FirehoseExample {
public static void main(String[] args) {
AWSCredentials credentials = null;
try {
credentials = new ProfileCredentialsProvider().getCredentials();
}
catch (Exception e) {
throw new AmazonClientException("Cannot load the credentials from the credential profiles file. "
+ "Please make sure that your credentials file is at the correct "
+ "location (/Users/elybenari/.aws/credentials), and is in valid format.", e);
}
AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);
PutRecordRequest request = new PutRecordRequest();
request.setStreamName("project-stream");
Record record = new Record();
for (int i = 0; i < 20*60; i++){
try {
URL url = new URL("https://www.google.com/finance/info?q=NASDAQ:AMZN");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
StringBuilder response = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
response.append(line);
}
reader.close();
System.out.println(response.toString().replace("\n", "").replaceAll(" ", ""));
System.out.println("****\n");
ByteBuffer buffer = ByteBuffer.wrap(response.toString().replace("\n", "").replaceAll(" ", "").getBytes());
record.setData(buff);
request.setRecord(record);
firehoseClient.putRecord(request);
Thread.sleep(2000);
}
catch(Exception e){
e.printStackTrace();
}
}
}
}
问题是您包含了来自 Kinesis 的一些 类, 而不是 Kinesis Firehose,Java 包。例如,您使用过:
import com.amazonaws.services.kinesis.model.PutRecordRequest;
然而,您应该使用:
import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest;
Kinesis、Kinesis Firehose 和 Kinesis Analytics 是不同的服务,尽管它们属于 AWS 上的流服务的一个保护伞。因此,它们在 Java SDK 中具有不同的包名称空间。如果从官方文档入手here, you'll reach the correct Java SDK reference here.
编辑:回答您的其他问题:是的,以下内容已弃用:
AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);
您应该改为使用以下内容:
AmazonKinesisFirehoseClient firehoseClient = AmazonKinesisFirehoseClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCredentials)).build();
参考官方文档here如何正确初始化AmazonKinesisFirehoseClient