PubSub 模拟器 -(支持 Proto Buffer publish/receive msg)
PubSub Emulator - ( Support Proto Buffer publish/receive msg)
我正在开发一种解决方案,使用通用 Proto Buffer 库发送和接收消息,使用直接序列化的 Proto 缓冲区 (ByteString) 和从 (ByteString) 直接反序列化到同一个 Proto Buffer Class。到目前为止,我的解决方案不起作用。就在我使用真正的 PubSub 时。
基于The doc: Testing apps locally with the emulator information and more specific in the section knowing limitations:
- 模拟器不为协议缓冲区提供架构支持。
不过,我没有使用 Topic/Subscription 中的任何架构定义。只需以编程方式使用通用原型缓冲区库。恐怕存在 Pubsub 仿真限制,因此我的解决方案不适用于仿真器。
下面是我的测试Class我们非常欢迎任何澄清。
package com.example.pubsubgcpspringapplications;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.alpian.common.pubsub.messages.OnfidoVerificationEvent;
import com.example.pubsubgcpspringapplications.config.PubSubTestConfig;
import com.example.pubsubgcpspringapplications.services.MessageRealGcpService;
import com.example.pubsubgcpspringapplications.util.DataGenerationUtils;
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
//@ActiveProfiles("test")
public class EmulatorPubSubWithSpringTest {
@BeforeAll
static void startUpTests() throws IOException {
PubSubTestConfig.setupPubSubEmulator();
}
@SneakyThrows
@Test
void successfulTest() throws InterruptedException {
var status = DataGenerationUtils.STATUS_COMPLETE;
var result = DataGenerationUtils.RESULT_CLEAR;
var subResult = DataGenerationUtils.SUB_RESULT_CLEAR;
var documentReport = DataGenerationUtils.generateOnfidoDocumentReport(status, result, subResult);
var facialSimilarityReport = DataGenerationUtils
.generateOnfidoFacialSimiliratyVideoReport(status, result, subResult);
OnfidoVerificationEvent.Builder builder = OnfidoVerificationEvent.newBuilder();
builder.setCheckId(DataGenerationUtils.FAKE_CHECK_ID);
builder.setApplicantId(DataGenerationUtils.FAKE_APPLICANT_ID);
builder.setDocument(documentReport);
builder.setFacialSimilarityVideo(facialSimilarityReport);
OnfidoVerificationEvent onfidoVerificationEvent = builder.build();
publishProtoMessageTest(onfidoVerificationEvent);
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
ByteString data = message.getData();
// Get the schema encoding type.
String encoding = message.getAttributesMap().get("googclient_schemaencoding");
block:
try {
switch (encoding) {
case "BINARY":
// Obtain an object of the generated proto class.
OnfidoVerificationEvent state = OnfidoVerificationEvent.parseFrom(data);
System.out.println("Received a BINARY-formatted message: " + state);
break;
case "JSON":
OnfidoVerificationEvent.Builder stateBuilder = OnfidoVerificationEvent.newBuilder();
JsonFormat.parser().merge(data.toStringUtf8(), stateBuilder);
System.out.println("Received a JSON-formatted message:" + stateBuilder.build());
break;
default:
break block;
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
consumer.ack();
System.out.println("Ack'ed the message");
};
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(PubSubTestConfig.PROJECT_ID, PubSubTestConfig.SUBSCRIPTION_NAME);
// Create subscriber client.
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
try {
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName);
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
subscriber.stopAsync();
}
Thread.sleep(15000);
}
public static void publishProtoMessageTest(OnfidoVerificationEvent onfidoVerificationEvent)
throws IOException, ExecutionException, InterruptedException {
Publisher publisher = null;
block:
try {
publisher = Publisher.newBuilder("projects/my-project-id/topics/topic-one").build();
PubsubMessage.Builder message = PubsubMessage.newBuilder();
// Prepare an appropriately formatted message based on topic encoding.
message.setData(onfidoVerificationEvent.toByteString());
System.out.println("Publishing a BINARY-formatted message:\n" + message);
// Publish the message.
ApiFuture<String> future = publisher.publish(message.build());
//System.out.println("Published message ID: " + future.get());
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
注意:拜托,我刚刚从 google 教程中复制了一些截图代码并进行了修改。我不想使用 JSON 只是使用 proto 文件发布和接收消息。
非常感谢!
编辑: 在评论和另一个发布的答案中更好地说明模拟器。
正如您所指出的,PubSub 模拟器目前 not support the use os protobuffer messages, and that's what you are using in your code (Snippets from Publish / Receive messages of protobuf schema type), and its not supported currently. You can try to use Avro schema type or open a feature request
on Google issue tracker 可以在 PubSub 模拟器中使用 protobuffer 模式。
“找不到资源”问题与 Pub/Sub 模拟器不支持 Protocol Buffer 架构没有任何关系。如果您尝试以不受支持的方式使用 Protocol Buffers(这将创建一个使用 PROTCOL_BUFFER
作为其 type
的 Schema
对象),那么您会返回一个错误,特别是关于模拟器中不支持 Protocol Buffer 模式。
您的问题更像是以下问题之一:
- 订阅名称与您创建的订阅名称不匹配。
- 您实际上并没有在模拟器中创建订阅,而是在实际的 Pub/Sub 服务中创建的。
- 您没有通过设置
PUBSUB_EMULATOR_HOST
环境变量将订阅者指向模拟器。
您应该验证模拟器中是否存在订阅。您可以通过 运行 对它使用 gcloud
工具来做到这一点。假设您使用以下命令启动了模拟器:
gcloud beta emulators pubsub start --project=my-test-project
如果这会在端口 8085 上启动您的模拟器,您可以通过 运行ning 检查您的订阅是否存在:
> CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB=http://localhost:8085/ gcloud --project my-test-topic pubsub subscriptions list
如果在您 运行 该命令时您的订阅不存在,那么这意味着您可能没有在模拟器中创建订阅,而是在实际服务中创建了它。如果您确实看到了它,那么这可能意味着您的订阅者没有向模拟器发送请求,而是实际上向 Pub/Sub 服务本身发送请求。
我正在开发一种解决方案,使用通用 Proto Buffer 库发送和接收消息,使用直接序列化的 Proto 缓冲区 (ByteString) 和从 (ByteString) 直接反序列化到同一个 Proto Buffer Class。到目前为止,我的解决方案不起作用。就在我使用真正的 PubSub 时。
基于The doc: Testing apps locally with the emulator information and more specific in the section knowing limitations:
- 模拟器不为协议缓冲区提供架构支持。
不过,我没有使用 Topic/Subscription 中的任何架构定义。只需以编程方式使用通用原型缓冲区库。恐怕存在 Pubsub 仿真限制,因此我的解决方案不适用于仿真器。
下面是我的测试Class我们非常欢迎任何澄清。
package com.example.pubsubgcpspringapplications;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.alpian.common.pubsub.messages.OnfidoVerificationEvent;
import com.example.pubsubgcpspringapplications.config.PubSubTestConfig;
import com.example.pubsubgcpspringapplications.services.MessageRealGcpService;
import com.example.pubsubgcpspringapplications.util.DataGenerationUtils;
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
//@ActiveProfiles("test")
public class EmulatorPubSubWithSpringTest {
@BeforeAll
static void startUpTests() throws IOException {
PubSubTestConfig.setupPubSubEmulator();
}
@SneakyThrows
@Test
void successfulTest() throws InterruptedException {
var status = DataGenerationUtils.STATUS_COMPLETE;
var result = DataGenerationUtils.RESULT_CLEAR;
var subResult = DataGenerationUtils.SUB_RESULT_CLEAR;
var documentReport = DataGenerationUtils.generateOnfidoDocumentReport(status, result, subResult);
var facialSimilarityReport = DataGenerationUtils
.generateOnfidoFacialSimiliratyVideoReport(status, result, subResult);
OnfidoVerificationEvent.Builder builder = OnfidoVerificationEvent.newBuilder();
builder.setCheckId(DataGenerationUtils.FAKE_CHECK_ID);
builder.setApplicantId(DataGenerationUtils.FAKE_APPLICANT_ID);
builder.setDocument(documentReport);
builder.setFacialSimilarityVideo(facialSimilarityReport);
OnfidoVerificationEvent onfidoVerificationEvent = builder.build();
publishProtoMessageTest(onfidoVerificationEvent);
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
ByteString data = message.getData();
// Get the schema encoding type.
String encoding = message.getAttributesMap().get("googclient_schemaencoding");
block:
try {
switch (encoding) {
case "BINARY":
// Obtain an object of the generated proto class.
OnfidoVerificationEvent state = OnfidoVerificationEvent.parseFrom(data);
System.out.println("Received a BINARY-formatted message: " + state);
break;
case "JSON":
OnfidoVerificationEvent.Builder stateBuilder = OnfidoVerificationEvent.newBuilder();
JsonFormat.parser().merge(data.toStringUtf8(), stateBuilder);
System.out.println("Received a JSON-formatted message:" + stateBuilder.build());
break;
default:
break block;
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
consumer.ack();
System.out.println("Ack'ed the message");
};
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(PubSubTestConfig.PROJECT_ID, PubSubTestConfig.SUBSCRIPTION_NAME);
// Create subscriber client.
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
try {
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName);
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
subscriber.stopAsync();
}
Thread.sleep(15000);
}
public static void publishProtoMessageTest(OnfidoVerificationEvent onfidoVerificationEvent)
throws IOException, ExecutionException, InterruptedException {
Publisher publisher = null;
block:
try {
publisher = Publisher.newBuilder("projects/my-project-id/topics/topic-one").build();
PubsubMessage.Builder message = PubsubMessage.newBuilder();
// Prepare an appropriately formatted message based on topic encoding.
message.setData(onfidoVerificationEvent.toByteString());
System.out.println("Publishing a BINARY-formatted message:\n" + message);
// Publish the message.
ApiFuture<String> future = publisher.publish(message.build());
//System.out.println("Published message ID: " + future.get());
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
注意:拜托,我刚刚从 google 教程中复制了一些截图代码并进行了修改。我不想使用 JSON 只是使用 proto 文件发布和接收消息。
非常感谢!
编辑: 在评论和另一个发布的答案中更好地说明模拟器。
正如您所指出的,PubSub 模拟器目前 not support the use os protobuffer messages, and that's what you are using in your code (Snippets from Publish / Receive messages of protobuf schema type), and its not supported currently. You can try to use Avro schema type or open a feature request
on Google issue tracker 可以在 PubSub 模拟器中使用 protobuffer 模式。
“找不到资源”问题与 Pub/Sub 模拟器不支持 Protocol Buffer 架构没有任何关系。如果您尝试以不受支持的方式使用 Protocol Buffers(这将创建一个使用 PROTCOL_BUFFER
作为其 type
的 Schema
对象),那么您会返回一个错误,特别是关于模拟器中不支持 Protocol Buffer 模式。
您的问题更像是以下问题之一:
- 订阅名称与您创建的订阅名称不匹配。
- 您实际上并没有在模拟器中创建订阅,而是在实际的 Pub/Sub 服务中创建的。
- 您没有通过设置
PUBSUB_EMULATOR_HOST
环境变量将订阅者指向模拟器。
您应该验证模拟器中是否存在订阅。您可以通过 运行 对它使用 gcloud
工具来做到这一点。假设您使用以下命令启动了模拟器:
gcloud beta emulators pubsub start --project=my-test-project
如果这会在端口 8085 上启动您的模拟器,您可以通过 运行ning 检查您的订阅是否存在:
> CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB=http://localhost:8085/ gcloud --project my-test-topic pubsub subscriptions list
如果在您 运行 该命令时您的订阅不存在,那么这意味着您可能没有在模拟器中创建订阅,而是在实际服务中创建了它。如果您确实看到了它,那么这可能意味着您的订阅者没有向模拟器发送请求,而是实际上向 Pub/Sub 服务本身发送请求。