pub_sub 来自 google 示例代码错误的操作,缺少 1 个必需的位置参数:'callback'

pub_sub action from google sample code errors with missing 1 required positional argument: 'callback'

我正在对大型查询 table 设置 google DLP 扫描,以查找可识别的个人信息。我一直在研究 google 示例代码,但遇到了代码 pub/sub 元素的问题

这是一个 python google 云函数调用 google dlp,使用 google 示例 here 使用方法 inspect_bigquery .

...

actions = [{
    'pub_sub': {'topic': '{}/topics/{}'.format(parent, topic_id)},
    'save_findings': {
        'output_config': {
                'table': {
                    'project_id': project,
                    'dataset_id': dataset_id,
                    'table_id': table_id + '_inspection_results',
                }
            }
    },
}]

...

subscriber = google.cloud.pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_id)
#    subscription = subscriber.subscribe(subscription_path, callback)
subscription = subscriber.subscribe(subscription_path)

...

def callback(message):
    try:
        if (message.attributes['DlpJobName'] == operation.name):
            # This is the message we're looking for, so acknowledge it.
            message.ack()

            # Now that the job is done, fetch the results and print them.
            job = dlp.get_dlp_job(operation.name)
            if job.inspect_details.result.info_type_stats:
                for finding in job.inspect_details.result.info_type_stats:
                    print('Info type: {}; Count: {}'.format(
                        finding.info_type.name, finding.count))
            else:
                print('No findings.')

            # Signal to the main thread that we can exit.
            job_done.set()
        else:
            # This is not the message we're looking for.
            message.drop()
    except Exception as e:
        # Because this is executing in a thread, an exception won't be
        # noted unless we print it manually.
        print(e)
        raise

# Register the callback and wait on the event.
subscription.open(callback)
finished = job_done.wait(timeout=timeout)
if not finished:
    print('No event received before the timeout. Please verify that the '
          'subscription provided is subscribed to the topic provided.')

我遇到了两个错误,当我只使用订阅路径保留订阅方法时,它会出错并显示 TypeError: subscribe() missing 1 required positional argument: 'callback'.

当我将回调放入订阅方法时,它失败了 函数执行耗时 60002 毫秒,完成状态:'timeout' 超时前未收到任何事件。请验证提供的订阅是否订阅了提供的主题。

但是,保存结果操作确实有效,几秒钟后我能够在 bigquery 中看到结果。

谢谢

两件事: 1. 如您所知,如果您不想参与生成它们的业务,可以将 table_id 留空。

但是对于你的实际问题:

  1. 您 运行 是否偶然在具有执行截止日期的 Cloud Functions 中执行此操作? (https://cloud.google.com/functions/docs/concepts/exec#timeout)

如果是,您实际上希望云函数通过触发器订阅 pub/sub(https://cloud.google.com/functions/docs/calling/pubsub), not in your code to avoid the timeouts. There is a specific DLP solution guide here on that https://cloud.google.com/solutions/automating-classification-of-data-uploaded-to-cloud-storage#create_pubsub_topic_and_subscription

有帮助吗?