Google Cloud Pubsub 数据丢失
Google Cloud Pubsub Data lost
我遇到了 GCP pubsub 问题,在几秒钟内发布数千条消息时,一小部分数据丢失了。
我正在记录来自 pubsub 的 message_id
和对发布端和接收端的每条消息唯一的 session_id
,我看到的结果是一些接收端消息相同session_id
,但不同message_id
。此外,一些消息丢失了。
例如,在一次测试中,我向 pubsub 发送了 5,000 条消息,恰好收到了 5,000 条消息,丢失了 8 条消息。日志丢失消息如下所示:
MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API)
messageId FOUND: messageId:108562396466545
API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API)
Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request)
重复项看起来像:
======= Duplicates FOUND on sessionId: 730=======
sessionId: 730, messageId:108562396466545
sessionId: 730, messageId:108561339282318
(both are logs from pull request)
所有缺失的数据和重复的数据如下所示。
从上面的例子可以看出,有些消息已经占用了另一条消息的 message_id
,并且已经用两个不同的 message_id
发送了两次。
不知道有没有人帮我弄清楚这是怎么回事?提前致谢。
代码
我有一个 API 发送消息到 pubsub,看起来像这样:
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json
app = Flask(__name__)
ps = pubsub.Client()
...
@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
pubsub_topic = 'test_topic'
data = request.data
topic = ps.topic(pubsub_topic)
event = json.loads(data)
messageId = topic.publish(data)
return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"
这是我用来从 pubsub 读取的代码:
从 google.cloud 导入 pubsub
重新进口
导入 json
ps = pubsub.Client()
topic = ps.topic('test-xiu')
sub = topic.subscription('TEST-xiu')
max_messages = 1
stop = False
messages = []
class Message(object):
"""docstring for Message."""
def __init__(self, sessionId, messageId):
super(Message, self).__init__()
self.seesionId = sessionId
self.messageId = messageId
def pull_all():
while stop == False:
m = sub.pull(max_messages = max_messages, return_immediately = False)
for data in m:
ack_id = data[0]
message = data[1]
messageId = message.message_id
data = message.data
event = json.loads(data)
sessionId = str(event["sessionId"])
messages.append(Message(sessionId = sessionId, messageId = messageId))
print '200 **** sessionId: ' + sessionId + ", messageId:" + messageId + " ******"
sub.acknowledge(ack_ids = [ack_id])
pull_all()
用于生成 session_id、发送请求并记录来自 API 的响应:
// generate trackable sessionId
var sessionId = 0
var increment_session_id = function () {
sessionId++;
return sessionId;
}
var generate_data = function () {
var data = {};
// data.sessionId = faker.random.uuid();
data.sessionId = increment_session_id();
data.user = get_rand(userList);
data.device = get_rand(deviceList);
data.visitTime = new Date;
data.location = get_rand(locationList);
data.content = get_rand(contentList);
return data;
}
var sendData = function (url, payload) {
var request = $.ajax({
url: url,
contentType: 'application/json',
method: 'POST',
data: JSON.stringify(payload),
error: function (xhr, status, errorThrown) {
console.log(xhr, status, errorThrown);
$('.result').prepend("<pre id='json'>" + JSON.stringify(xhr, null, 2) + "</pre>")
$('.result').prepend("<div>errorThrown: " + errorThrown + "</div>")
$('.result').prepend("<div>======FAIL=======</div><div>status: " + status + "</div>")
}
}).done(function (xhr) {
console.log(xhr);
$('.result').prepend("<div>======SUCCESS=======</div><pre id='json'>" + JSON.stringify(payload, null, 2) + "</pre>")
})
}
$(submit_button).click(function () {
var request_num = get_request_num();
var request_url = get_url();
for (var i = 0; i < request_num; i++) {
var data = generate_data();
var loadData = changeVerb(data, 'load');
sendData(request_url, loadData);
}
})
更新
我对 API 进行了更改,问题似乎消失了。我所做的更改不是对所有请求使用一个 pubsub.Client()
,而是为每个传入的请求初始化一个客户端。新的 API 看起来像:
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json
app = Flask(__name__)
...
@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
ps = pubsub.Client()
pubsub_topic = 'test_topic'
data = request.data
topic = ps.topic(pubsub_topic)
event = json.loads(data)
messageId = topic.publish(data)
return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"
Google 云 Pub/Sub 消息 ID 是唯一的。 "some messages [to] taken the message_id
of another message." 应该不可能 消息 ID 108562396466545 似乎已收到,这意味着 Pub/Sub 确实将消息传递给了订阅者并且没有丢失。
我建议您检查一下您的 session_id
是如何生成的,以确保它们确实是独一无二的,并且每条消息只有一个。通过正则表达式搜索在 JSON 中搜索 sessionId 似乎有点奇怪。您最好将此 JSON 解析为实际对象并以这种方式访问字段。
一般来说,Cloud Pub/Sub 中的重复消息总是有可能的;该系统保证至少一次交付。如果重复发生在订阅端(例如,未及时处理 ack)或使用不同的消息 ID(例如,如果在出现类似错误后重试消息发布),则这些消息可以使用相同的消息 ID超过了最后期限)。
您不需要为每个发布操作都创建一个新的客户端。我敢打赌 "fixed the problem" 的原因是因为它缓解了发布者客户端中存在的竞争。我也不相信您在发布者端显示的日志行:
API: 200 **** sessionId: 731, messageId:108562396466545 ******
对应于 publish_test_topic() 成功发布 sessionId 731。在什么条件下打印该日志行?目前给出的代码没有显示这一点。
与 Google 的某个人交谈过,这似乎是 Python 客户的问题:
The consensus on our side is that there is a thread-safety problem in the current python client. The client library is being rewritten almost from scratch as we speak, so I don't want to pursue any fixes in the current version. We expect the new version to become available by end of June.
Running the current code with thread_safe: false in app.yaml or better yet just instantiating the client in every call should is the work around -- the solution you found.
详细解决方法请看问题更新
我遇到了 GCP pubsub 问题,在几秒钟内发布数千条消息时,一小部分数据丢失了。
我正在记录来自 pubsub 的 message_id
和对发布端和接收端的每条消息唯一的 session_id
,我看到的结果是一些接收端消息相同session_id
,但不同message_id
。此外,一些消息丢失了。
例如,在一次测试中,我向 pubsub 发送了 5,000 条消息,恰好收到了 5,000 条消息,丢失了 8 条消息。日志丢失消息如下所示:
MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API)
messageId FOUND: messageId:108562396466545
API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API)
Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request)
重复项看起来像:
======= Duplicates FOUND on sessionId: 730=======
sessionId: 730, messageId:108562396466545
sessionId: 730, messageId:108561339282318
(both are logs from pull request)
所有缺失的数据和重复的数据如下所示。
从上面的例子可以看出,有些消息已经占用了另一条消息的 message_id
,并且已经用两个不同的 message_id
发送了两次。
不知道有没有人帮我弄清楚这是怎么回事?提前致谢。
代码
我有一个 API 发送消息到 pubsub,看起来像这样:
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json
app = Flask(__name__)
ps = pubsub.Client()
...
@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
pubsub_topic = 'test_topic'
data = request.data
topic = ps.topic(pubsub_topic)
event = json.loads(data)
messageId = topic.publish(data)
return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"
这是我用来从 pubsub 读取的代码:
从 google.cloud 导入 pubsub 重新进口 导入 json
ps = pubsub.Client()
topic = ps.topic('test-xiu')
sub = topic.subscription('TEST-xiu')
max_messages = 1
stop = False
messages = []
class Message(object):
"""docstring for Message."""
def __init__(self, sessionId, messageId):
super(Message, self).__init__()
self.seesionId = sessionId
self.messageId = messageId
def pull_all():
while stop == False:
m = sub.pull(max_messages = max_messages, return_immediately = False)
for data in m:
ack_id = data[0]
message = data[1]
messageId = message.message_id
data = message.data
event = json.loads(data)
sessionId = str(event["sessionId"])
messages.append(Message(sessionId = sessionId, messageId = messageId))
print '200 **** sessionId: ' + sessionId + ", messageId:" + messageId + " ******"
sub.acknowledge(ack_ids = [ack_id])
pull_all()
用于生成 session_id、发送请求并记录来自 API 的响应:
// generate trackable sessionId
var sessionId = 0
var increment_session_id = function () {
sessionId++;
return sessionId;
}
var generate_data = function () {
var data = {};
// data.sessionId = faker.random.uuid();
data.sessionId = increment_session_id();
data.user = get_rand(userList);
data.device = get_rand(deviceList);
data.visitTime = new Date;
data.location = get_rand(locationList);
data.content = get_rand(contentList);
return data;
}
var sendData = function (url, payload) {
var request = $.ajax({
url: url,
contentType: 'application/json',
method: 'POST',
data: JSON.stringify(payload),
error: function (xhr, status, errorThrown) {
console.log(xhr, status, errorThrown);
$('.result').prepend("<pre id='json'>" + JSON.stringify(xhr, null, 2) + "</pre>")
$('.result').prepend("<div>errorThrown: " + errorThrown + "</div>")
$('.result').prepend("<div>======FAIL=======</div><div>status: " + status + "</div>")
}
}).done(function (xhr) {
console.log(xhr);
$('.result').prepend("<div>======SUCCESS=======</div><pre id='json'>" + JSON.stringify(payload, null, 2) + "</pre>")
})
}
$(submit_button).click(function () {
var request_num = get_request_num();
var request_url = get_url();
for (var i = 0; i < request_num; i++) {
var data = generate_data();
var loadData = changeVerb(data, 'load');
sendData(request_url, loadData);
}
})
更新
我对 API 进行了更改,问题似乎消失了。我所做的更改不是对所有请求使用一个 pubsub.Client()
,而是为每个传入的请求初始化一个客户端。新的 API 看起来像:
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json
app = Flask(__name__)
...
@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
ps = pubsub.Client()
pubsub_topic = 'test_topic'
data = request.data
topic = ps.topic(pubsub_topic)
event = json.loads(data)
messageId = topic.publish(data)
return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"
Google 云 Pub/Sub 消息 ID 是唯一的。 "some messages [to] taken the message_id
of another message." 应该不可能 消息 ID 108562396466545 似乎已收到,这意味着 Pub/Sub 确实将消息传递给了订阅者并且没有丢失。
我建议您检查一下您的 session_id
是如何生成的,以确保它们确实是独一无二的,并且每条消息只有一个。通过正则表达式搜索在 JSON 中搜索 sessionId 似乎有点奇怪。您最好将此 JSON 解析为实际对象并以这种方式访问字段。
一般来说,Cloud Pub/Sub 中的重复消息总是有可能的;该系统保证至少一次交付。如果重复发生在订阅端(例如,未及时处理 ack)或使用不同的消息 ID(例如,如果在出现类似错误后重试消息发布),则这些消息可以使用相同的消息 ID超过了最后期限)。
您不需要为每个发布操作都创建一个新的客户端。我敢打赌 "fixed the problem" 的原因是因为它缓解了发布者客户端中存在的竞争。我也不相信您在发布者端显示的日志行:
API: 200 **** sessionId: 731, messageId:108562396466545 ******
对应于 publish_test_topic() 成功发布 sessionId 731。在什么条件下打印该日志行?目前给出的代码没有显示这一点。
与 Google 的某个人交谈过,这似乎是 Python 客户的问题:
The consensus on our side is that there is a thread-safety problem in the current python client. The client library is being rewritten almost from scratch as we speak, so I don't want to pursue any fixes in the current version. We expect the new version to become available by end of June.
Running the current code with thread_safe: false in app.yaml or better yet just instantiating the client in every call should is the work around -- the solution you found.
详细解决方法请看问题更新