asyncio 无法在 Google Cloud Functions 上运行

asyncio not working on Google Cloud Functions

我有这个函数,它在我的 python 3.8 机器上本地运行良好,但它在 Google Cloud Functions 上抛出运行时错误。

def telegram_test(request):
    request_json = request.get_json()
    import datetime
    import pandas as pd
    from pyrogram import Client
    
    session_string = "...............38Q8uTHG5gHwyWD8nW6h................."
    # the rest of the authantication
    api_id = 32494131641215
    api_hash = "ioadsfsjnjksfgnfriuthg#qw]/zwq  ]w/\lc ec,"

    # one of bbc channels on telegram you want to access
    channel_name = 'pyrogram'

    # if you only want to get messages older than 7 days in unix style
    seven_days = int((datetime.datetime.now() - datetime.timedelta(days=7)).timestamp())
    # call telegram with parameters such as limit and date
    # save the result to dataframe

    with Client(session_string,api_id,api_hash, takeout=True,workers=2) as app:
        hist_iter = app.iter_history(channel_name,offset_date=seven_days, limit=100)    
        msglist = [msg.__dict__ for msg in hist_iter]
        df = pd.DataFrame(msglist)
        print(df.head(5))
 
    return f'it works!:{request_json}'

我从 GCF 日志中得到的错误信息:

File "/opt/python3.8/lib/python3.8/asyncio/events.py", line 639, in get_event_loop raise RuntimeError('There is no current event loop in thread %r.' RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

更新

我更新了代码,运行时错误消失了。但我收到超时错误。 我将超时设置为 180 秒,但当我测试该功能时仍然超时 60 秒。

这是更新后的代码。我做错了什么吗?

async def foo():
    from datetime import datetime, timedelta
    from pandas import DataFrame
    from pyrogram import Client
    import asyncio

    session_string = "********zNmkubA4ibjsdjhsdfjlhweruifnjkldfioY5DE*********"    
    api_id = 325511548224831351
    api_hash = "jdffjgtrkjhfklmrtgjtrm;sesews;;wex"        
    channel_name = 'cnn'

    with Client(session_string, api_id, api_hash, takeout=True) as app:
        hist_iter = app.iter_history(
            channel_name, limit=10)
        msglist = [msg.__dict__ for msg in hist_iter]
        df = DataFrame(msglist)    
    return df
 
async def bar():
    return await foo() 

def test(request):
    from asyncio import run
    return run(bar())
  1. bar() 是多余的
  2. 您正在尝试 return 数据框。它是有效的 HTTP 响应吗?
  3. with -> async with
  4. hist_iter = app.iter_history() -> hist_iter = await app.iter_history()
  5. M.b。它等待输入?

最终的解决方案是从 Pyrogram 更改为 telethon,并在创建客户端之前手动创建 asyncio。

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

注意:您需要有效的会话字符串,否则在您测试该功能时,它会等待您使用手机号码进行身份验证。所以首先 运行 此代码在本地进行身份验证,然后将会话字符串复制到云函数。

完整代码如下:

from telethon.sessions import StringSession
from telethon import TelegramClient
from pandas import DataFrame
import datetime
import asyncio

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

api_id = 101010101
api_hash = "jhafcgahagfbahgdbw17171736456gerf"
session_string = "hjksdhjbdsfhgbdsabeyitrgdsbfsdbdiyfhsbddasbdjdksf="
channel_name = 'bbcuzbek'
seven_days = int((datetime.datetime.now() -
                  datetime.timedelta(days=7)).timestamp())




client = TelegramClient(StringSession(session_string),
                        api_id, api_hash, loop=loop)

time_format = "%d/%m/%Y, %H:%M:%S"
download_date = datetime.datetime.now(
    tz=datetime.timezone.utc).strftime(time_format)

cols = ["id", "date", "text", "views", "download_date"]


async def foo():
    all_msgs = [[message.id, message.date.strftime(time_format), message.text, message.views, download_date] async for message in client.iter_messages(entity=channel_name, offset_date=seven_days, limit=10)]
    df = DataFrame(data=all_msgs, columns=cols)
    # write it to BQ
    # print(df)
    # async for message in client.iter_messages(entity=channel_name, offset_date=seven_days, limit=10):
    #     print(message.id, message.date, message.text, message.views)
    print("it runs")

    print(len(df))
    return None


def test(request):
    with client:
        return client.loop.run_until_complete(foo())