kafka:'soTimeout'、'bufferSize' 和 'minBytes' 对 SimpleConsumer 意味着什么?
kafka: what do 'soTimeout', 'bufferSize' and 'minBytes' mean for SimpleConsumer?
我正在使用卡夫卡
0.8.2.1 简单消费者。有人可以阐明 SimpleConsumer 和 FetchRequestBuilder 的一些配置参数的含义吗?由于没有阅读 KAfka 的源代码,我当时找不到任何文档。 (我尝试将这个问题发布到 kafka 用户组——但没有成功):
-- Q1:在 SimpleConsumer 构造函数的签名中,我看到了 Int 'soTimeout' 参数 -
这个超时是什么意思?这是连接到 Kafka 代理的超时吗?从对 Kafka 的任何 [或特定??] 请求(如 FetchRequest)获得响应时超时?还有别的吗?
kafka.javaapi.consumer.SimpleConsumer
(val host: String,
val port: Int,
val soTimeout: Int,
val bufferSize: Int,
val clientId: String)
-- Q2:另外,SimpleConsumer 构造函数采用 Int 'bufferSize' 参数。它的意义是什么?这是发出 fetchRequest 时 SimpleConsumer 将读取多少字节?或者它是从 Kafka 每次提取读取的最大字节数 - 如果有更多数据可用,将发生多次提取?
-- 通过 FetchRequestBuilder 构建 FetchRequest 时(见下文),我还需要指定 'fetchSize':
FetchRequest req= newFetchRequestBuilder ()
.clientId(kafkaGroupId)
.addFetch(topic, partition, offset, fetchSizeInBytes)
.build();
查看 FetchRequestBuilder 的源代码,我认为(我不是 Scala 专家)那些调用
转换为以下方法调用 - 传递给 FetchRequest 的最终参数称为“minBytes”,暗示这可能不是准确的提取大小? .这是否意味着除非至少 'minBytes' 可用数据,否则它甚至不会获取任何内容?
class FetchRequestBuilder():
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int)
def build() = {
val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
**minBytes: Int = FetchRequest.DefaultMinBytes**,
...)
所以,我的最后一个问题是:
-- Q3:'bufferSize'和'fetchSize/minBytes'有什么关系?他们到底定义了什么?我必须让
确定一个比另一个小还是大?
谢谢,
玛丽娜
soTimeout 是等待连接到给定代理的时间(以毫秒为单位)。我不知道连接会发生什么特别的事情,除非您确认那里有一个代理已准备好执行一些后续操作。
我认为构造函数中使用的bufferSize是客户端套接字接收broker发送的数据所使用的缓冲区大小。
关于你的最后一个问题,如果无论出于何种原因,获取请求返回的字节总数大于请求的套接字缓冲区大小,则需要不止一个较低级别的调用来检索所有数据,即使只有一个更高级别的提取调用。
我正在使用卡夫卡 0.8.2.1 简单消费者。有人可以阐明 SimpleConsumer 和 FetchRequestBuilder 的一些配置参数的含义吗?由于没有阅读 KAfka 的源代码,我当时找不到任何文档。 (我尝试将这个问题发布到 kafka 用户组——但没有成功):
-- Q1:在 SimpleConsumer 构造函数的签名中,我看到了 Int 'soTimeout' 参数 - 这个超时是什么意思?这是连接到 Kafka 代理的超时吗?从对 Kafka 的任何 [或特定??] 请求(如 FetchRequest)获得响应时超时?还有别的吗?
kafka.javaapi.consumer.SimpleConsumer
(val host: String,
val port: Int,
val soTimeout: Int,
val bufferSize: Int,
val clientId: String)
-- Q2:另外,SimpleConsumer 构造函数采用 Int 'bufferSize' 参数。它的意义是什么?这是发出 fetchRequest 时 SimpleConsumer 将读取多少字节?或者它是从 Kafka 每次提取读取的最大字节数 - 如果有更多数据可用,将发生多次提取?
-- 通过 FetchRequestBuilder 构建 FetchRequest 时(见下文),我还需要指定 'fetchSize':
FetchRequest req= newFetchRequestBuilder ()
.clientId(kafkaGroupId)
.addFetch(topic, partition, offset, fetchSizeInBytes)
.build();
查看 FetchRequestBuilder 的源代码,我认为(我不是 Scala 专家)那些调用 转换为以下方法调用 - 传递给 FetchRequest 的最终参数称为“minBytes”,暗示这可能不是准确的提取大小? .这是否意味着除非至少 'minBytes' 可用数据,否则它甚至不会获取任何内容?
class FetchRequestBuilder():
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int)
def build() = {
val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
**minBytes: Int = FetchRequest.DefaultMinBytes**,
...)
所以,我的最后一个问题是:
-- Q3:'bufferSize'和'fetchSize/minBytes'有什么关系?他们到底定义了什么?我必须让 确定一个比另一个小还是大?
谢谢,
玛丽娜
soTimeout 是等待连接到给定代理的时间(以毫秒为单位)。我不知道连接会发生什么特别的事情,除非您确认那里有一个代理已准备好执行一些后续操作。
我认为构造函数中使用的bufferSize是客户端套接字接收broker发送的数据所使用的缓冲区大小。
关于你的最后一个问题,如果无论出于何种原因,获取请求返回的字节总数大于请求的套接字缓冲区大小,则需要不止一个较低级别的调用来检索所有数据,即使只有一个更高级别的提取调用。