如何摆脱类型错误?
How to get rid of TypeError?
目前正致力于在 Raspberry Pi 上实施 Google 云存储。每当我 运行 我的代码时,我都会收到错误消息:
TypeError: callback() takes 0 positional arguments but 1 was given
这是我的代码:
from google.cloud import pubsub_v1
import json
import datetime
import time
project_id = "projectid"
topic_name = "topic"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
futures = dict()
def get_callback(f, data):
def callback():
try:
futures.pop(data)
except:
print("Please handle {} for {}.".format(f.exception(), data))
return callback
while True:
time.sleep(3)
text = "Hello Google"
data = {"text":text}
print(data)
future = publisher.publish(topic_path, data=(json.dumps(data)).encode("utf-8"))
future.add_done_callback(get_callback(future, data))
time.sleep(5)
while futures:
time.sleep(5)
print("Published message with error handler")
关于如何解决上述问题的任何建议? :)
“add_done_callback”设置的回调函数是用一个参数(未来本身)调用的,但它没有任何定义。
将您的回调创建更改为:
def get_callback(f, data):
def callback(future):
...
应该可以解决问题。
目前正致力于在 Raspberry Pi 上实施 Google 云存储。每当我 运行 我的代码时,我都会收到错误消息:
TypeError: callback() takes 0 positional arguments but 1 was given
这是我的代码:
from google.cloud import pubsub_v1
import json
import datetime
import time
project_id = "projectid"
topic_name = "topic"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
futures = dict()
def get_callback(f, data):
def callback():
try:
futures.pop(data)
except:
print("Please handle {} for {}.".format(f.exception(), data))
return callback
while True:
time.sleep(3)
text = "Hello Google"
data = {"text":text}
print(data)
future = publisher.publish(topic_path, data=(json.dumps(data)).encode("utf-8"))
future.add_done_callback(get_callback(future, data))
time.sleep(5)
while futures:
time.sleep(5)
print("Published message with error handler")
关于如何解决上述问题的任何建议? :)
“add_done_callback”设置的回调函数是用一个参数(未来本身)调用的,但它没有任何定义。
将您的回调创建更改为:
def get_callback(f, data):
def callback(future):
...
应该可以解决问题。