Ruby Google Cloud PubSub 异步拉取 returns GRPC::DeadlineExceeded 错误

Ruby Google Cloud PubSub async pull returns GRPC::DeadlineExceeded error

您好,我在 Google PubSub 中设置了订阅,我正在尝试使用“官方”google-cloud-ruby 库异步提取消息。这是我的代码,它将从传入 subscription_name:

的 rake 任务中执行
def pull!
  creds = Google::Cloud::PubSub::Credentials.new(
    GCP_CREDENTIALS_KEYFILE_PATH,
    scope: "https://www.googleapis.com/auth/pubsub"
  )
  messages = []
  pubsub = Google::Cloud::PubSub.new(
    project_id: GOOGLE_PROJECT_ID,
    credentials: creds
  )

  subscription = pubsub.subscription(subscription_name)

  subscription.pull(immediate: true).each do |received_message|
    puts "Received message: #{received_message.data}"
    received_message.acknowledge!
    messages.push(received_message)
  end

  # Return the collected messages
  messages
  
rescue => error
  Rails.logger error

  messages.presence
end

Google::Cloud::PubSub::Credentials 部分引用了一个有效的密钥文件。我知道 JSON 密钥文件很好,因为我可以使用它来使用 oauth2l 生成工作的 Bearer 令牌,并使用 cURL、postman、Net::HTTP 等从 PubSub 中提取。使用相同的 JSON 凭据用于单独的 Google::Cloud::Storage 服务并且工作正常。

但由于某些原因,使用 Google::Cloud::PubSub 它只是挂起并且不会响应。大约 60 秒后,我收到以下错误:

GRPC::DeadlineExceeded: 4:Deadline Exceeded. debug_error_string:{"created":"@1602610740.445195000","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}
/Users/bbulpet/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/grpc-1.32.0-universal-darwin/src/ruby/lib/grpc/generic/active_call.rb:29:in `check_status'
/Users/bbulpet/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/grpc-1.32.0-universal-darwin/src/ruby/lib/grpc/generic/active_call.rb:180:in `attach_status_results_and_complete_call'
/Users/bbulpet/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/grpc-1.32.0-universal-darwin/src/ruby/lib/grpc/generic/active_call.rb:376:in `request_response'
/Users/bbulpet/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/grpc-1.32.0-universal-darwin/src/ruby/lib/grpc/generic/client_stub.rb:172:in `block (2 levels) in request_response'
/Users/bbulpet/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/grpc-1.32.0-universal-darwin/src/ruby/lib/grpc/generic/interceptors.rb:170:in `intercept!'
/Users/bbulpet/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/grpc-1.32.0-universal-darwin/src/ruby/lib/grpc/generic/client_stub.rb:171:in `block in request_response'
/Users/bbulpet/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/gapic-common-0.3.4/lib/gapic/grpc/service_stub/rpc_call.rb:121:in `call'
/Users/bbulpet/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/gapic-common-0.3.4/lib/gapic/grpc/service_stub.rb:156:in `call_rpc'
/Users/bbulpet/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/google-cloud-pubsub-v1-0.1.2/lib/google/cloud/pubsub/v1/subscriber/client.rb:503:in `get_subscription'
/Users/bbulpet/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/google-cloud-pubsub-2.1.0/lib/google/cloud/pubsub/service.rb:154:in `get_subscription'
/Users/bbulpet/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/google-cloud-pubsub-2.1.0/lib/google/cloud/pubsub/project.rb:286:in `subscription'

添加调试器显示以下行挂起并导致错误:

subscription = pubsub.subscription(subscription_name)

我已经根据 documentation 尝试了所有我能想到的方法。更新了所有相关的 gem,甚至绝望地尝试使用已弃用的语法。如果有人至少知道从哪里开始,将不胜感激,谢谢!

更新

所以事实证明,出于某种原因 运行 这在本地无法连接,但是当运送到部署环境时,上述代码完美地连接到 pubsub 并且能够拉取和确认消息。此外,在已部署环境中建立初始连接后,我现在也可以使用相同的凭据进行本地连接。

Link to Github issue 讨论有关故障排除过程的上下文和 quartzmo 尝试部署到另一个环境的建议。

你还有这个问题吗?我只是尝试使用 Ruby 2.6.5p114、google-cloud-pubsub 2.1.0 和 grpc 1.32.0(与您相同的版本)重现它,但我无法重现它。这是我的代码(在 Minitest 规范上下文中稍微修改为 运行)以供比较:

  GCP_CREDENTIALS_KEYFILE_PATH = "/Users/quartzmo/my-project.json"
  GOOGLE_PROJECT_ID = "my-project-id"

  def pull! topic_name, subscription_name
    creds = Google::Cloud::PubSub::Credentials.new(
      GCP_CREDENTIALS_KEYFILE_PATH,
      scope: "https://www.googleapis.com/auth/pubsub"
    )
    messages = []
    pubsub = Google::Cloud::PubSub.new(
      project_id: GOOGLE_PROJECT_ID,
      credentials: creds
    )
    topic = pubsub.create_topic topic_name
    topic.subscribe subscription_name

    topic.publish "A test message from #{topic_name} to #{subscription_name}"
  
    subscription = pubsub.subscription(subscription_name)
  
    subscription.pull(immediate: true).each do |received_message|
      puts "Received message: #{received_message.data}"
      received_message.acknowledge!
      messages.push(received_message)
    end
  
    # Return the collected messages
    messages
  end

  focus
  it "pull!" do
    topic_name = random_topic_name
    subscription_name = random_subscription_name
    messages = pull! topic_name, subscription_name
    assert_equal 1, messages.count
    assert_equal "A test message from #{topic_name} to #{subscription_name}", messages[0].data
  end

这是输出:

% bundle exec rake test
Run options: --junit --junit-filename=sponge_log.xml --seed 30984

# Running:

Received message: A test message from ruby-pubsub-samples-test-topic-7cb10bde to ruby-pubsub-samples-test-subscription-f47f2eaa
.

Finished in 6.529219s, 0.1532 runs/s, 0.3063 assertions/s.

1 runs, 2 assertions, 0 failures, 0 errors, 0 skips

更新(2020-10-20):在不同环境下执行代码时,该问题已解决,但原因不明。参见 comment on GitHub issue