Twitter Streaming API - urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead
Twitter Streaming API - urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead
运行 一个使用 tweepy 的 python 脚本,它在英语推文的随机样本中流式传输(使用推特流 API)一分钟,然后交替搜索(使用Twitter 搜索 API) 一分钟,然后搜索 returns。我发现的问题是,在大约 40 多秒后,流媒体崩溃并出现以下错误:
完整错误:
urllib3.exceptions.ProtocolError: ('Connection broken:
IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))
读取的字节数可以从 0 到 1000 不等。
这是第一次看到流式传输过早中断并且搜索功能提早启动,搜索功能完成后它再次返回流式传输,第二次出现此错误时代码崩溃。
我 运行 的代码是:
# Handles date time calculation
def calculateTweetDateTime(tweet):
tweetDateTime = str(tweet.created_at)
tweetDateTime = ciso8601.parse_datetime(tweetDateTime)
time.mktime(tweetDateTime.timetuple())
return tweetDateTime
# Checks to see whether that permitted time has past.
def hasTimeThresholdPast():
global startTime
if time.clock() - startTime > 60:
return True
else:
return False
#override tweepy.StreamListener to add logic to on_status
class StreamListener(StreamListener):
def on_status(self, tweet):
if hasTimeThresholdPast():
return False
if hasattr(tweet, 'lang'):
if tweet.lang == 'en':
try:
tweetText = tweet.extended_tweet["full_text"]
except AttributeError:
tweetText = tweet.text
tweetDateTime = calculateTweetDateTime(tweet)
entityList = DataProcessing.identifyEntities(True, tweetText)
DataStorage.storeHotTerm(entityList, tweetDateTime)
DataStorage.storeTweet(tweet)
def on_error(self, status_code):
def on_error(self, status_code):
if status_code == 420:
# returning False in on_data disconnects the stream
return False
def startTwitterStream():
searchTerms = []
myStreamListener = StreamListener()
twitterStream = Stream(auth=api.auth, listener=StreamListener())
global geoGatheringTag
if geoGatheringTag == False:
twitterStream.filter(track=['the', 'this', 'is', 'their', 'though', 'a', 'an'], async=True, stall_warnings=True)
if geoGatheringTag == True:
twitterStream.filter(track=['the', 'this', 'is', 'their', 'though', 'a', 'an', 'they\'re'],
async=False, locations=[-4.5091, 55.7562, -3.9814, 55.9563], stall_warnings=True)
# ----------------------- Twitter API Functions ------------------------
# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
# --------------------------- Main Function ----------------------------
startTime = 0
def main():
global startTime
userInput = ""
userInput.lower()
while userInput != "-1":
userInput = input("Type ACTiVATE to activate the Crawler, or DATABASE to access data analytic option (-1 to exit): \n")
if userInput.lower() == 'activate':
while(True):
startTime = time.clock()
startTwitterStream()
startTime = time.clock()
startTwitterSearchAPI()
if __name__ == '__main__':
main()
我已经删除了搜索功能和数据库处理方面,因为它们是分开的,以避免代码混乱。
如果有人知道为什么会发生这种情况以及我如何解决它,请告诉我,我很想知道任何见解。
我尝试过的解决方案:
Try/Except 块与 http.client.IncompleteRead:
根据
设置 Stall_Warning = 为真:
根据 Incompleteread-error-when-retrieving-twitter-data-using-python
正在删除英语过滤器。
已解决。
对于那些好奇或遇到类似问题的人:经过一些实验,我发现传入推文的积压是问题所在。每次系统收到一条推文时,我的系统 运行 一个实体识别和存储的过程花费了一小段时间,并且随着收集数百到数千条推文的时间,这个积压变得越来越大,直到 API 无法处理并抛出了那个错误。
解决方案: 将您的 "on_status/on_data/on_success" 函数剥离到最基本的部分,并在流式传输会话关闭后单独处理任何计算,即存储或实体识别。或者,您可以使计算效率更高,并使时间差距变得微不足道,由您决定。
我只是根据关注用户 Chris Cookman 的结果分享我的经验。按照他的建议去做后,我遇到的同样问题就消失了。但就我而言,我将它与 discord.py 一起使用。所以我所做的是创建一个通用列表 (status_list),每当 tweepy on_status 启动时,它就会附加到该通用列表中。
然后我设置了一个@tasks.loop(seconds=10) using discord.py 来监控每隔几秒status_list是否不为空,然后如果它检测到它有一个内容,它将遍历它,然后在每个列表上启动该过程。
运行 一个使用 tweepy 的 python 脚本,它在英语推文的随机样本中流式传输(使用推特流 API)一分钟,然后交替搜索(使用Twitter 搜索 API) 一分钟,然后搜索 returns。我发现的问题是,在大约 40 多秒后,流媒体崩溃并出现以下错误:
完整错误:
urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))
读取的字节数可以从 0 到 1000 不等。
这是第一次看到流式传输过早中断并且搜索功能提早启动,搜索功能完成后它再次返回流式传输,第二次出现此错误时代码崩溃。
我 运行 的代码是:
# Handles date time calculation
def calculateTweetDateTime(tweet):
tweetDateTime = str(tweet.created_at)
tweetDateTime = ciso8601.parse_datetime(tweetDateTime)
time.mktime(tweetDateTime.timetuple())
return tweetDateTime
# Checks to see whether that permitted time has past.
def hasTimeThresholdPast():
global startTime
if time.clock() - startTime > 60:
return True
else:
return False
#override tweepy.StreamListener to add logic to on_status
class StreamListener(StreamListener):
def on_status(self, tweet):
if hasTimeThresholdPast():
return False
if hasattr(tweet, 'lang'):
if tweet.lang == 'en':
try:
tweetText = tweet.extended_tweet["full_text"]
except AttributeError:
tweetText = tweet.text
tweetDateTime = calculateTweetDateTime(tweet)
entityList = DataProcessing.identifyEntities(True, tweetText)
DataStorage.storeHotTerm(entityList, tweetDateTime)
DataStorage.storeTweet(tweet)
def on_error(self, status_code):
def on_error(self, status_code):
if status_code == 420:
# returning False in on_data disconnects the stream
return False
def startTwitterStream():
searchTerms = []
myStreamListener = StreamListener()
twitterStream = Stream(auth=api.auth, listener=StreamListener())
global geoGatheringTag
if geoGatheringTag == False:
twitterStream.filter(track=['the', 'this', 'is', 'their', 'though', 'a', 'an'], async=True, stall_warnings=True)
if geoGatheringTag == True:
twitterStream.filter(track=['the', 'this', 'is', 'their', 'though', 'a', 'an', 'they\'re'],
async=False, locations=[-4.5091, 55.7562, -3.9814, 55.9563], stall_warnings=True)
# ----------------------- Twitter API Functions ------------------------
# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
# --------------------------- Main Function ----------------------------
startTime = 0
def main():
global startTime
userInput = ""
userInput.lower()
while userInput != "-1":
userInput = input("Type ACTiVATE to activate the Crawler, or DATABASE to access data analytic option (-1 to exit): \n")
if userInput.lower() == 'activate':
while(True):
startTime = time.clock()
startTwitterStream()
startTime = time.clock()
startTwitterSearchAPI()
if __name__ == '__main__':
main()
我已经删除了搜索功能和数据库处理方面,因为它们是分开的,以避免代码混乱。
如果有人知道为什么会发生这种情况以及我如何解决它,请告诉我,我很想知道任何见解。
我尝试过的解决方案:
Try/Except 块与 http.client.IncompleteRead:
根据
设置 Stall_Warning = 为真:
根据 Incompleteread-error-when-retrieving-twitter-data-using-python
正在删除英语过滤器。
已解决。
对于那些好奇或遇到类似问题的人:经过一些实验,我发现传入推文的积压是问题所在。每次系统收到一条推文时,我的系统 运行 一个实体识别和存储的过程花费了一小段时间,并且随着收集数百到数千条推文的时间,这个积压变得越来越大,直到 API 无法处理并抛出了那个错误。
解决方案: 将您的 "on_status/on_data/on_success" 函数剥离到最基本的部分,并在流式传输会话关闭后单独处理任何计算,即存储或实体识别。或者,您可以使计算效率更高,并使时间差距变得微不足道,由您决定。
我只是根据关注用户 Chris Cookman 的结果分享我的经验。按照他的建议去做后,我遇到的同样问题就消失了。但就我而言,我将它与 discord.py 一起使用。所以我所做的是创建一个通用列表 (status_list),每当 tweepy on_status 启动时,它就会附加到该通用列表中。
然后我设置了一个@tasks.loop(seconds=10) using discord.py 来监控每隔几秒status_list是否不为空,然后如果它检测到它有一个内容,它将遍历它,然后在每个列表上启动该过程。