未记录的约束?发布到主题 *from* pubsub 触发器
Undocumented Constraint? publishing to topic *from* pubsub trigger
我不知道我是不是疯了,或者这是一个没有记录的限制(我已经搜索了 GCP API 文档):
是否可以在 'topic A' 上使用带有 pubsub 触发器的云函数,并在该云函数内向 'topic B' 发布消息。
我已经用相同的代码尝试了所有其他触发器 运行(云函数作为 HTTP 触发器、云存储触发器、Firebase 触发器),并且它们都成功发布 话题。
但是当我(几乎是字面上)将我的代码复制粘贴到 pubsub 触发器中时,在使用消息之后,当它尝试将自己的消息发布到下一个主题时,它只是 挂起 。该函数在尝试发布时超时。
回顾一下,在 GCP 中是否可以实现以下功能?
PubSub 主题 A --> 云函数 --> Pubsub 主题 B
提前感谢您的澄清!这些都在 Java 11 中。这是代码:
...<bunch of imports>
public class SignedURLGenerator implements BackgroundFunction<PubSubMessage> {
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
private static final Logger logger = Logger.getLogger(SignedURLGenerator.class.getName());
/**
* Handle the incoming PubsubMessage
**/
@Override
public void accept(PubSubMessage message, Context context) throws IOException, InterruptedException {
String data = new String(Base64.getDecoder().decode(message.data));
System.out.println("The input message is: " + data.toString());
//Do a bunch of other stuff not relevant to the issue at hand...
publishSignedURL(url.toString());
}
//Here's the interesting part
public static void publishSignedURL(String message) throws IOException, InterruptedException {
String topicName = "url-ready-notifier";
String responseMessage;
Publisher publisher = null;
try {
// Create the PubsubMessage object
ByteString byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();
System.out.println("Message Constructed:" + message);
//This part works fine, the message gets constructed
publisher = Publisher.newBuilder(ProjectTopicName.of(PROJECT_ID, topicName)).build();
System.out.println("Publisher Created.");
//This part also works fine, the publisher gets created
publisher.publish(pubsubApiMessage).get();
responseMessage = "Message published.";
//The code NEVER GETS HERE. The message is never published. And eventually the cloud function time's out :(
} catch (InterruptedException | ExecutionException e) {
System.out.println("Something went wrong with publishing: " + e.getMessage());
}
System.out.println("Everything wrapped up.");
}
编辑
根据要求,这是我当前的 POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cloudfunctions</groupId>
<artifactId>pubsub-function</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
</properties>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>20.6.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.functions</groupId>
<artifactId>functions-framework-api</artifactId>
<version>1.0.1</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.117.1</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.113.4</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>1.66.0</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax-grpc</artifactId>
<version>1.66.0</version>
</dependency>
<dependency>
<groupId>org.threeten</groupId>
<artifactId>threetenbp</artifactId>
<version>0.7.2</version>
</dependency>
</dependencies>
</project>
您可以尝试在您的发布客户端中显式设置流量控制参数吗?像那样
publisher = Publisher.newBuilder(ProjectTopicName.of(PROJECT_ID, topicName)).setBatchingSettings(BatchingSettings.newBuilder()
.setDelayThreshold(Duration.of(10, ChronoUnit.SECONDS))
.setElementCountThreshold(1L)
.setIsEnabled(true)
.build()).build();
我不知道会发生什么,可能是 PubSub 的默认全局配置。如果不是那样,我会删除这个答案。
编辑 1
这里是发布者父 classe
上构建器 class 的屏幕截图
您拥有所有库的默认值。但是,您观察到的行为并不正常。即使您处于 PubSub 触发器中,默认值也必须保持默认值。我会开个issue直接转发给团队
我不知道我是不是疯了,或者这是一个没有记录的限制(我已经搜索了 GCP API 文档):
是否可以在 'topic A' 上使用带有 pubsub 触发器的云函数,并在该云函数内向 'topic B' 发布消息。
我已经用相同的代码尝试了所有其他触发器 运行(云函数作为 HTTP 触发器、云存储触发器、Firebase 触发器),并且它们都成功发布 话题。 但是当我(几乎是字面上)将我的代码复制粘贴到 pubsub 触发器中时,在使用消息之后,当它尝试将自己的消息发布到下一个主题时,它只是 挂起 。该函数在尝试发布时超时。
回顾一下,在 GCP 中是否可以实现以下功能?
PubSub 主题 A --> 云函数 --> Pubsub 主题 B
提前感谢您的澄清!这些都在 Java 11 中。这是代码:
...<bunch of imports>
public class SignedURLGenerator implements BackgroundFunction<PubSubMessage> {
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
private static final Logger logger = Logger.getLogger(SignedURLGenerator.class.getName());
/**
* Handle the incoming PubsubMessage
**/
@Override
public void accept(PubSubMessage message, Context context) throws IOException, InterruptedException {
String data = new String(Base64.getDecoder().decode(message.data));
System.out.println("The input message is: " + data.toString());
//Do a bunch of other stuff not relevant to the issue at hand...
publishSignedURL(url.toString());
}
//Here's the interesting part
public static void publishSignedURL(String message) throws IOException, InterruptedException {
String topicName = "url-ready-notifier";
String responseMessage;
Publisher publisher = null;
try {
// Create the PubsubMessage object
ByteString byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();
System.out.println("Message Constructed:" + message);
//This part works fine, the message gets constructed
publisher = Publisher.newBuilder(ProjectTopicName.of(PROJECT_ID, topicName)).build();
System.out.println("Publisher Created.");
//This part also works fine, the publisher gets created
publisher.publish(pubsubApiMessage).get();
responseMessage = "Message published.";
//The code NEVER GETS HERE. The message is never published. And eventually the cloud function time's out :(
} catch (InterruptedException | ExecutionException e) {
System.out.println("Something went wrong with publishing: " + e.getMessage());
}
System.out.println("Everything wrapped up.");
}
编辑 根据要求,这是我当前的 POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cloudfunctions</groupId>
<artifactId>pubsub-function</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
</properties>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>20.6.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.functions</groupId>
<artifactId>functions-framework-api</artifactId>
<version>1.0.1</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.117.1</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.113.4</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>1.66.0</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax-grpc</artifactId>
<version>1.66.0</version>
</dependency>
<dependency>
<groupId>org.threeten</groupId>
<artifactId>threetenbp</artifactId>
<version>0.7.2</version>
</dependency>
</dependencies>
</project>
您可以尝试在您的发布客户端中显式设置流量控制参数吗?像那样
publisher = Publisher.newBuilder(ProjectTopicName.of(PROJECT_ID, topicName)).setBatchingSettings(BatchingSettings.newBuilder()
.setDelayThreshold(Duration.of(10, ChronoUnit.SECONDS))
.setElementCountThreshold(1L)
.setIsEnabled(true)
.build()).build();
我不知道会发生什么,可能是 PubSub 的默认全局配置。如果不是那样,我会删除这个答案。
编辑 1
这里是发布者父 classe
上构建器 class 的屏幕截图您拥有所有库的默认值。但是,您观察到的行为并不正常。即使您处于 PubSub 触发器中,默认值也必须保持默认值。我会开个issue直接转发给团队