在 python 中读取多个 gmail 收件箱
Read multiple gmail inbox in python
我有一个脚本可以读取 gmail 收件箱,但只能用于特定帐户。所以,我想要实现的是有一个列表来存储多个凭据并登录以读取多个帐户的收件箱。
我的脚本:
#email inbox declaration (receiver)
EMAIL_ACCOUNT = "xxx@gmail.com"
PASSWORD = "xxx"
#email
#Main function
class SendMail(threading.Thread):
def __init__(self, id_manager):
threading.Thread.__init__(self)
self.id_manager = int(id_manager)
def run(self):
while True:
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login(EMAIL_ACCOUNT, PASSWORD)
mail.list()
mail.select('inbox')
result, data = mail.uid('search', None, "UNSEEN") # (ALL/UNSEEN)
i = len(data[0].split())
for x in range(i):
latest_email_uid = data[0].split()[x]
result, email_data = mail.uid('fetch', latest_email_uid, '(RFC822)')
# result, email_data = conn.store(num,'-FLAGS','\Seen')
# this might work to set flag to seen, if it doesn't already
raw_email = email_data[0][1]
raw_email_string = raw_email.decode('utf-8')
email_message = email.message_from_string(raw_email_string)
# Header Details
date_tuple = email.utils.parsedate_tz(email_message['Date'])
if date_tuple:
local_date = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
local_message_date = "%s" % (str(local_date.strftime("%a, %d %b %Y %H:%M:%S")))
email_from = str(email.header.make_header(email.header.decode_header(email_message['From'])))
email_to = str(email.header.make_header(email.header.decode_header(email_message['To'])))
global subject
subject = str(email.header.make_header(email.header.decode_header(email_message['Subject'])))
# Body details
for part in email_message.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True)
print("From:", email_from)
print("Email To:", email_to)
print("date:", local_message_date)
print("Subject:", subject)
print("body:", body.decode('utf-8'))
#condition
else:
continue
def mainSendEmail():
thread_id = ("0")
led_index = 0
thread_list = list()
for objs in thread_id:
thread = SendMail(led_index)
thread_list.append(thread)
led_index += 1
for thread in thread_list:
thread.start()
time.sleep(1)
if __name__ == "__main__":
mainSendEmail()
我的存储多个凭据的列表。 listA 是电子邮件,listB 是密码。 listA 的电子邮件和 listB 密码是相关的。例如listA的第一封邮件就是listB的第一个密码
listA = [('xxx@yahoo.com',), ('asd@gmail.com',), ('test@gmail.com',)]
listB = [('passwordxxx',), ('passwordForasd',), ('passwordFortest',)]
您可以使用 zip()
函数组合两个列表来创建元组列表。这个新列表中的每个元素都包含一个用户名及其密码。然后您可以迭代此列表以检查所有电子邮件。将这些行添加到 run
函数的顶部:
def run(self):
listA = [('xxx@yahoo.com',), ('asd@gmail.com',), ('test@gmail.com',)]
listB = [('passwordxxx',), ('passwordForasd',), ('passwordFortest',)]
for each_credential_info in tuple(zip(listA, listB)):
while True:
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login(each_credential_info[0][0], each_credential_info[1][0])
mail.list()
...
但是,我想知道您为什么在 listA
和 listB
中使用元组。您可以使用简单的列表并编写更简单的代码:
def run(self):
listA = ['xxx@yahoo.com', 'asd@gmail.com', 'test@gmail.com']
listB = ['passwordxxx', 'passwordForasd', 'passwordFortest']
for each_credential_info in tuple(zip(listA, listB)):
while True:
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login(each_credential_info[0], each_credential_info[1])
mail.list()
...
我还有一些建议可以让您的代码获得更好的性能并且....
如果您的无限循环 while True
没有中断,您将不会在新循环 for each_credential_info in tuple(zip(listA, listB))
上进行迭代。在这种情况下,您需要合并这两个循环:
def run(self):
listA = ['xxx@yahoo.com', 'asd@gmail.com', 'test@gmail.com']
listB = ['passwordxxx', 'passwordForasd', 'passwordFortest']
credential_info = tuple(zip(listA, listB))
credential_info_index = -1
while True:
credential_info_index = (credential_info_index + 1) % len(credential_info)
print("Processing email: " + credential_info[credential_info_index][0] + ", " + credential_info[credential_info_index][1])
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login(credential_info[credential_info_index][0], credential_info[credential_info_index][1])
mail.list()
...
或者简单地制作并使用循环迭代器:
from itertools import cycle
def run(self):
listA = ['xxx@yahoo.com', 'asd@gmail.com', 'test@gmail.com']
listB = ['passwordxxx', 'passwordForasd', 'passwordFortest']
credential_info = cycle(tuple(zip(listA, listB)))
while True:
print("Processing email: " + next(credential_info)[0] + ", " + next(credential_info)[1])
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login(next(credential_info)[0], next(credential_info)[1])
mail.list()
...
我有一个脚本可以读取 gmail 收件箱,但只能用于特定帐户。所以,我想要实现的是有一个列表来存储多个凭据并登录以读取多个帐户的收件箱。
我的脚本:
#email inbox declaration (receiver)
EMAIL_ACCOUNT = "xxx@gmail.com"
PASSWORD = "xxx"
#email
#Main function
class SendMail(threading.Thread):
def __init__(self, id_manager):
threading.Thread.__init__(self)
self.id_manager = int(id_manager)
def run(self):
while True:
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login(EMAIL_ACCOUNT, PASSWORD)
mail.list()
mail.select('inbox')
result, data = mail.uid('search', None, "UNSEEN") # (ALL/UNSEEN)
i = len(data[0].split())
for x in range(i):
latest_email_uid = data[0].split()[x]
result, email_data = mail.uid('fetch', latest_email_uid, '(RFC822)')
# result, email_data = conn.store(num,'-FLAGS','\Seen')
# this might work to set flag to seen, if it doesn't already
raw_email = email_data[0][1]
raw_email_string = raw_email.decode('utf-8')
email_message = email.message_from_string(raw_email_string)
# Header Details
date_tuple = email.utils.parsedate_tz(email_message['Date'])
if date_tuple:
local_date = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
local_message_date = "%s" % (str(local_date.strftime("%a, %d %b %Y %H:%M:%S")))
email_from = str(email.header.make_header(email.header.decode_header(email_message['From'])))
email_to = str(email.header.make_header(email.header.decode_header(email_message['To'])))
global subject
subject = str(email.header.make_header(email.header.decode_header(email_message['Subject'])))
# Body details
for part in email_message.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True)
print("From:", email_from)
print("Email To:", email_to)
print("date:", local_message_date)
print("Subject:", subject)
print("body:", body.decode('utf-8'))
#condition
else:
continue
def mainSendEmail():
thread_id = ("0")
led_index = 0
thread_list = list()
for objs in thread_id:
thread = SendMail(led_index)
thread_list.append(thread)
led_index += 1
for thread in thread_list:
thread.start()
time.sleep(1)
if __name__ == "__main__":
mainSendEmail()
我的存储多个凭据的列表。 listA 是电子邮件,listB 是密码。 listA 的电子邮件和 listB 密码是相关的。例如listA的第一封邮件就是listB的第一个密码
listA = [('xxx@yahoo.com',), ('asd@gmail.com',), ('test@gmail.com',)]
listB = [('passwordxxx',), ('passwordForasd',), ('passwordFortest',)]
您可以使用 zip()
函数组合两个列表来创建元组列表。这个新列表中的每个元素都包含一个用户名及其密码。然后您可以迭代此列表以检查所有电子邮件。将这些行添加到 run
函数的顶部:
def run(self):
listA = [('xxx@yahoo.com',), ('asd@gmail.com',), ('test@gmail.com',)]
listB = [('passwordxxx',), ('passwordForasd',), ('passwordFortest',)]
for each_credential_info in tuple(zip(listA, listB)):
while True:
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login(each_credential_info[0][0], each_credential_info[1][0])
mail.list()
...
但是,我想知道您为什么在 listA
和 listB
中使用元组。您可以使用简单的列表并编写更简单的代码:
def run(self):
listA = ['xxx@yahoo.com', 'asd@gmail.com', 'test@gmail.com']
listB = ['passwordxxx', 'passwordForasd', 'passwordFortest']
for each_credential_info in tuple(zip(listA, listB)):
while True:
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login(each_credential_info[0], each_credential_info[1])
mail.list()
...
我还有一些建议可以让您的代码获得更好的性能并且....
如果您的无限循环 while True
没有中断,您将不会在新循环 for each_credential_info in tuple(zip(listA, listB))
上进行迭代。在这种情况下,您需要合并这两个循环:
def run(self):
listA = ['xxx@yahoo.com', 'asd@gmail.com', 'test@gmail.com']
listB = ['passwordxxx', 'passwordForasd', 'passwordFortest']
credential_info = tuple(zip(listA, listB))
credential_info_index = -1
while True:
credential_info_index = (credential_info_index + 1) % len(credential_info)
print("Processing email: " + credential_info[credential_info_index][0] + ", " + credential_info[credential_info_index][1])
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login(credential_info[credential_info_index][0], credential_info[credential_info_index][1])
mail.list()
...
或者简单地制作并使用循环迭代器:
from itertools import cycle
def run(self):
listA = ['xxx@yahoo.com', 'asd@gmail.com', 'test@gmail.com']
listB = ['passwordxxx', 'passwordForasd', 'passwordFortest']
credential_info = cycle(tuple(zip(listA, listB)))
while True:
print("Processing email: " + next(credential_info)[0] + ", " + next(credential_info)[1])
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login(next(credential_info)[0], next(credential_info)[1])
mail.list()
...