Spark Streaming + Kinesis:违反接收器 MaxRate
Spark Streaming + Kinesis : Receiver MaxRate is violated
我正在调用 spark-submit 传递 maxRate,我有一个 kinesis 接收器和 1s 的批次
spark-submit --conf spark.streaming.receiver.maxRate=10 ....
然而,单个批次可以大大超过已建立的 maxRate。即:我得到 300 条记录。
我是否遗漏了任何设置?
我觉得这像是一个错误。从查看代码来看,Kinesis 似乎完全忽略了 spark.streaming.receiver.maxRate
配置。
如果你往里看 KinesisReceiver.onStart
,你会看到:
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
这个构造函数最终调用了另一个构造函数,它有很多配置的默认值:
public KinesisClientLibConfiguration(String applicationName,
String streamName,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}
你关心的是DEFAULT_MAX_RECORDS
,它一直设置为10,000条记录。 KinesisClientLibConfiguration
上有一个名为 withMaxRecords
的方法,您可以调用它来设置实际的记录数。这应该很容易修复。
但就目前而言,Kinesis 接收器似乎不遵守该参数。
供日后参考。
这是 bug 在 Spark 2.2.0
版本
中修复的已知问题
我正在调用 spark-submit 传递 maxRate,我有一个 kinesis 接收器和 1s 的批次
spark-submit --conf spark.streaming.receiver.maxRate=10 ....
然而,单个批次可以大大超过已建立的 maxRate。即:我得到 300 条记录。
我是否遗漏了任何设置?
我觉得这像是一个错误。从查看代码来看,Kinesis 似乎完全忽略了 spark.streaming.receiver.maxRate
配置。
如果你往里看 KinesisReceiver.onStart
,你会看到:
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
这个构造函数最终调用了另一个构造函数,它有很多配置的默认值:
public KinesisClientLibConfiguration(String applicationName,
String streamName,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}
你关心的是DEFAULT_MAX_RECORDS
,它一直设置为10,000条记录。 KinesisClientLibConfiguration
上有一个名为 withMaxRecords
的方法,您可以调用它来设置实际的记录数。这应该很容易修复。
但就目前而言,Kinesis 接收器似乎不遵守该参数。
供日后参考。
这是 bug 在 Spark 2.2.0
版本