我如何 运行 脚本一分钟,停止执行,然后再次启动?
How do I run a script for a minute, stop its execution, and then start it again?
我目前正在编写一个脚本,该脚本将 运行 持续特定的时间(比如说一分钟),然后必须停止执行大约 5 秒钟,然后再开始执行再次(这个循环不断重复),没有失败。该脚本位于 Python 3.x 和 运行 环境中的 Ubuntu 中。因此创建一个 Service/Daemon 也可以(尽管脚本的执行必须停止几秒钟)。
它基本上是在捕获 Live 时使用 Scapy 模块进行数据包嗅探,然后在将数据插入数据库之前对捕获的数据包进行一些分析。当我通过按 Ctrl+C
停止执行脚本时,它停止,然后将数据插入数据库而不是并行。虽然如果这个过程并行进行并且脚本永远不必停止执行会更好,但在那之前我需要一个相同的解决方法。
我的做法:
import scapy
def main():
capture = LiveCapture(interface = "<some interface>", filter="<some filter>")
count = 0
for pkt in capture:
#DO SOMETHING
insert_in_DB() #--------This happens only when I stop the execution.
if count == 100:
count = 0
#back to main()
所以,你
大致了解我的代码要做什么,对吗?但我希望每 1 分钟后发生一次,在 运行ning 1 分钟后,代码执行停止,以便数据可以输入数据库,然后在 5 秒或更短的时间内再次开始。
提前致谢:)
您必须使用 MySQLdb 才能在 python 和 Twisted adbapi 中使用 MySQL 来执行异步连接。
MySQLdb:
sudo apt-get install python-dev
sudo apt-get install libmysqlclient-dev
pip3 install mysql-python
扭曲的 adbapi:
pip3 install twisted
Spider.py
def parse(self, response):
yield {
'item_id' : ...
'item_name': ...
...
}
在pipelines.py
中添加一个MySQLStorePipeline
并在settings.py
中声明:
pipelines.py
from twisted.enterprise import adbapi #pip3 install twisted
from scrapy import log
from scrapy.conf import settings
import MySQLdb.cursors
class MySQLStorePipeline(object):
#A pipeline to store the item in a MySQL database.
#This implementation uses Twisted's asynchronous database API.
def __init__(self):
dbargs = settings.get('DB_CONN')
self.dbpool = adbapi.ConnectionPool(
"MySQLdb",
cursorclass=MySQLdb.cursors.DictCursor,
**dbargs
)
def process_item(self, item, spider):
# run db query in thread pool
query = self.dbpool.runInteraction(self._conditional_insert, item)
query.addErrback(self.handle_error)
return item
def _conditional_insert(self, tx, item):
# create record if doesn't exist.
# all this block run on it's own thread
item_id = item['item_id']
db_table = 'your_table_name'
try:
tx.execute("SELECT 1 FROM " + db_table + " WHERE item_id = %s", (item_id, ))
except:
print("## Query Failed:" + str(tx._last_executed))
result = tx.fetchone()
if result:
log.msg("Item already stored in db: %s" % item, level=log.DEBUG)
else:
try:
tx.execute(\
"INSERT INTO " + db_table + " (item_id, item_name) "
"values (%s, %s)",
(item_id, item['item_name'])
)
log.msg("Item stored in db: %s" % item, level=log.DEBUG)
except:
print("## Query Failed:" + str(tx._last_executed))
def handle_error(self, e):
log.err(e)
Settings.py
ITEM_PIPELINES = {
'your_project.pipelines.your_projectPipeline': 300,
'your_project.pipelines.MySQLStorePipeline': 600,
} #note:
DB_CONN = {
'db': 'your_db',
'user': 'your_username',
'passwd': 'your_password',
'host': 'your_host',
'charset': 'utf8',
'use_unicode': True,
}
笔记:
用您的 SQL 凭据替换所有 your_*****。
上面的代码假设你的SQLtable只有2列:'item_id'、'item_name',当然你可以在INSERT INTO查询中修改.
如果您有任何问题,请发表评论。
我目前正在编写一个脚本,该脚本将 运行 持续特定的时间(比如说一分钟),然后必须停止执行大约 5 秒钟,然后再开始执行再次(这个循环不断重复),没有失败。该脚本位于 Python 3.x 和 运行 环境中的 Ubuntu 中。因此创建一个 Service/Daemon 也可以(尽管脚本的执行必须停止几秒钟)。
它基本上是在捕获 Live 时使用 Scapy 模块进行数据包嗅探,然后在将数据插入数据库之前对捕获的数据包进行一些分析。当我通过按 Ctrl+C
停止执行脚本时,它停止,然后将数据插入数据库而不是并行。虽然如果这个过程并行进行并且脚本永远不必停止执行会更好,但在那之前我需要一个相同的解决方法。
我的做法:
import scapy
def main():
capture = LiveCapture(interface = "<some interface>", filter="<some filter>")
count = 0
for pkt in capture:
#DO SOMETHING
insert_in_DB() #--------This happens only when I stop the execution.
if count == 100:
count = 0
#back to main()
所以,你 大致了解我的代码要做什么,对吗?但我希望每 1 分钟后发生一次,在 运行ning 1 分钟后,代码执行停止,以便数据可以输入数据库,然后在 5 秒或更短的时间内再次开始。
提前致谢:)
您必须使用 MySQLdb 才能在 python 和 Twisted adbapi 中使用 MySQL 来执行异步连接。
MySQLdb:
sudo apt-get install python-dev
sudo apt-get install libmysqlclient-dev
pip3 install mysql-python
扭曲的 adbapi:
pip3 install twisted
Spider.py
def parse(self, response):
yield {
'item_id' : ...
'item_name': ...
...
}
在pipelines.py
中添加一个MySQLStorePipeline
并在settings.py
中声明:
pipelines.py
from twisted.enterprise import adbapi #pip3 install twisted
from scrapy import log
from scrapy.conf import settings
import MySQLdb.cursors
class MySQLStorePipeline(object):
#A pipeline to store the item in a MySQL database.
#This implementation uses Twisted's asynchronous database API.
def __init__(self):
dbargs = settings.get('DB_CONN')
self.dbpool = adbapi.ConnectionPool(
"MySQLdb",
cursorclass=MySQLdb.cursors.DictCursor,
**dbargs
)
def process_item(self, item, spider):
# run db query in thread pool
query = self.dbpool.runInteraction(self._conditional_insert, item)
query.addErrback(self.handle_error)
return item
def _conditional_insert(self, tx, item):
# create record if doesn't exist.
# all this block run on it's own thread
item_id = item['item_id']
db_table = 'your_table_name'
try:
tx.execute("SELECT 1 FROM " + db_table + " WHERE item_id = %s", (item_id, ))
except:
print("## Query Failed:" + str(tx._last_executed))
result = tx.fetchone()
if result:
log.msg("Item already stored in db: %s" % item, level=log.DEBUG)
else:
try:
tx.execute(\
"INSERT INTO " + db_table + " (item_id, item_name) "
"values (%s, %s)",
(item_id, item['item_name'])
)
log.msg("Item stored in db: %s" % item, level=log.DEBUG)
except:
print("## Query Failed:" + str(tx._last_executed))
def handle_error(self, e):
log.err(e)
Settings.py
ITEM_PIPELINES = {
'your_project.pipelines.your_projectPipeline': 300,
'your_project.pipelines.MySQLStorePipeline': 600,
} #note:
DB_CONN = {
'db': 'your_db',
'user': 'your_username',
'passwd': 'your_password',
'host': 'your_host',
'charset': 'utf8',
'use_unicode': True,
}
笔记:
用您的 SQL 凭据替换所有 your_*****。
上面的代码假设你的SQLtable只有2列:'item_id'、'item_name',当然你可以在INSERT INTO查询中修改.
如果您有任何问题,请发表评论。