处理大量记录会出现 OutOfMemoryException - Kafka REST 代理
Processing huge records gives OutOfMemoryException - Kafka REST proxy
我正在使用融合的 REST API 代理调用 Kafka。我正在读取一个 CSV 文件,从那里的所有记录(大约 400 万条记录)中创建一个对象,然后向 REST 代理发送请求。我不断收到 OutOfMemory
异常。
确切的异常消息是:
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-81"
我只有一个 REST 代理服务器实例,作为 docker 容器托管。环境变量设置为:
JAVA_OPTIONS=-Xmx1g
其他配置:
CPU - 1
Memory - 1024
它在崩溃前处理了大约 1,00,000 个。
我已经尝试将它扩展到 4 个实例,将 CPU 增加到 3,内存也增加到 2046 mb。然后它处理大约 5,00,000 条记录。
读取 csv 后,我以 5k 条记录为一组调用 Kafka 端点。那是用 Node.js 写的。这是节点代码
fs.createReadStream(inputFile)
.pipe(parser({skip_lines_with_error: true}))
.on('data', (records) => {
country.push({ 'value' : {
country: records[0],
capital: records[1]
}
});
if (country.length > 5000) {
batch++;
callKafkaProxy(country).then((rec) => {
console.log(`'Batch done!'`);
}).catch((reason) => {
console.log(reason);
});
country = [];
}
})
.on('end', () => {
console.log('All done!');
});
function callKafkaProxy(records) {
const urlAndRequestOptions = {
url: 'http://kafka-rest-proxy.com/topics/test-topic',
headers: {
'content-type' : 'application/vnd.kafka.json.v2+json',
'Accept' : 'application/vnd.kafka.v2+json'
}
};
let recordsObject = {records: records};
//request here is a wrapper on the http package.
return request.post(urlAndRequestOptions, recordsObject);
我觉得我缺少一些配置,这些配置应该有助于解决这个问题,而不会增加 > 1 的实例数。
任何帮助将不胜感激。
.on('data', () => {}); ...
1。它不处理背压。创建可写流,它将处理您的批处理过程。然后使用管道。
inputStream
.pipe(parser)
.pipe(kafka)
然后分析这些行:
if (country.length > 5000) {
batch++;
callKafkaProxy(country).then((rec) => {
console.log(`'Batch done!'`);
).catch((reason) => {
console.log(reason);
});
country = [];
}
- 你的 callKafkaProxy 是异步的,这就是为什么你的国家数组总是被填充,不管 callKafkaProxy 函数的结果如何。 Country 数组不断填充并不断发出请求。您可以在 batch++ 之后通过控制台日志记录来确保。你会看到你发起了很多请求,而 Kafka 的响应速度比你发出请求要慢得多。
解决方案:
- 创建可写流。
- 从你的解析器向它传输数据。 input.pipe(解析器).pipe(yourJustCreatedKafkaWritableStream)
- 当您准备好接收其他记录时,让您的可写流将国家推入数组并回调。当你到达边缘时(如果 countries.length > 5000)然后向 kafka 发出请求并等待响应,然后才给出回调。通过这种方式,您的流将具有自适应性。您应该阅读有关节点流及其功能的更多信息。但请记住,能力越大,责任越大,在这种情况下,您必须仔细设计代码以避免此类内存泄漏。
在 Zilvinas 的回答的帮助下,我了解了如何利用流来批量发送数据。这是一个解决方案:
var stream = fs.createReadStream(file)
.pipe(es.split())
.pipe(es.mapSync(function (line) {
if (line.length) {
//read your line and create a record message
}
//put 5000 in a config constant
if (records.length === 5000) {
stream.pause();
logger.debug(`Got ${records.length} messages. Pushing to Kafka...`);
postChunkToKafka(records).then((response) => {
records = [];
stream.resume();
});
}
我正在使用融合的 REST API 代理调用 Kafka。我正在读取一个 CSV 文件,从那里的所有记录(大约 400 万条记录)中创建一个对象,然后向 REST 代理发送请求。我不断收到 OutOfMemory
异常。
确切的异常消息是:
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-81"
我只有一个 REST 代理服务器实例,作为 docker 容器托管。环境变量设置为:
JAVA_OPTIONS=-Xmx1g
其他配置:
CPU - 1
Memory - 1024
它在崩溃前处理了大约 1,00,000 个。 我已经尝试将它扩展到 4 个实例,将 CPU 增加到 3,内存也增加到 2046 mb。然后它处理大约 5,00,000 条记录。
读取 csv 后,我以 5k 条记录为一组调用 Kafka 端点。那是用 Node.js 写的。这是节点代码
fs.createReadStream(inputFile)
.pipe(parser({skip_lines_with_error: true}))
.on('data', (records) => {
country.push({ 'value' : {
country: records[0],
capital: records[1]
}
});
if (country.length > 5000) {
batch++;
callKafkaProxy(country).then((rec) => {
console.log(`'Batch done!'`);
}).catch((reason) => {
console.log(reason);
});
country = [];
}
})
.on('end', () => {
console.log('All done!');
});
function callKafkaProxy(records) {
const urlAndRequestOptions = {
url: 'http://kafka-rest-proxy.com/topics/test-topic',
headers: {
'content-type' : 'application/vnd.kafka.json.v2+json',
'Accept' : 'application/vnd.kafka.v2+json'
}
};
let recordsObject = {records: records};
//request here is a wrapper on the http package.
return request.post(urlAndRequestOptions, recordsObject);
我觉得我缺少一些配置,这些配置应该有助于解决这个问题,而不会增加 > 1 的实例数。
任何帮助将不胜感激。
.on('data', () => {}); ...
1。它不处理背压。创建可写流,它将处理您的批处理过程。然后使用管道。
inputStream
.pipe(parser)
.pipe(kafka)
然后分析这些行:
if (country.length > 5000) {
batch++;
callKafkaProxy(country).then((rec) => {
console.log(`'Batch done!'`);
).catch((reason) => {
console.log(reason);
});
country = [];
}
- 你的 callKafkaProxy 是异步的,这就是为什么你的国家数组总是被填充,不管 callKafkaProxy 函数的结果如何。 Country 数组不断填充并不断发出请求。您可以在 batch++ 之后通过控制台日志记录来确保。你会看到你发起了很多请求,而 Kafka 的响应速度比你发出请求要慢得多。
解决方案:
- 创建可写流。
- 从你的解析器向它传输数据。 input.pipe(解析器).pipe(yourJustCreatedKafkaWritableStream)
- 当您准备好接收其他记录时,让您的可写流将国家推入数组并回调。当你到达边缘时(如果 countries.length > 5000)然后向 kafka 发出请求并等待响应,然后才给出回调。通过这种方式,您的流将具有自适应性。您应该阅读有关节点流及其功能的更多信息。但请记住,能力越大,责任越大,在这种情况下,您必须仔细设计代码以避免此类内存泄漏。
在 Zilvinas 的回答的帮助下,我了解了如何利用流来批量发送数据。这是一个解决方案:
var stream = fs.createReadStream(file)
.pipe(es.split())
.pipe(es.mapSync(function (line) {
if (line.length) {
//read your line and create a record message
}
//put 5000 in a config constant
if (records.length === 5000) {
stream.pause();
logger.debug(`Got ${records.length} messages. Pushing to Kafka...`);
postChunkToKafka(records).then((response) => {
records = [];
stream.resume();
});
}