我如何 运行 脚本一分钟,停止执行,然后再次启动?

How do I run a script for a minute, stop its execution, and then start it again?

我目前正在编写一个脚本,该脚本将 运行 持续特定的时间(比如说一分钟),然后必须停止执行大约 5 秒钟,然后再开始执行再次(这个循环不断重复),没有失败。该脚本位于 Python 3.x 和 运行 环境中的 Ubuntu 中。因此创建一个 Service/Daemon 也可以(尽管脚本的执行必须停止几秒钟)。

它基本上是在捕获 Live 时使用 Scapy 模块进行数据包嗅探,然后在将数据插入数据库之前对捕获的数据包进行一些分析。当我通过按 Ctrl+C 停止执行脚本时,它停止,然后将数据插入数据库而不是并行。虽然如果这个过程并行进行并且脚本永远不必停止执行会更好,但在那之前我需要一个相同的解决方法。

我的做法:

import scapy

def main():
    capture = LiveCapture(interface = "<some interface>", filter="<some filter>")
    count = 0
    for pkt in capture:
        #DO SOMETHING

        insert_in_DB()   #--------This happens only when I stop the execution.
        if count == 100:
            count = 0
            #back to main()

所以,你 大致了解我的代码要做什么,对吗?但我希望每 1 分钟后发生一次,在 运行ning 1 分钟后,代码执行停止,以便数据可以输入数据库,然后在 5 秒或更短的时间内再次开始。

提前致谢:)

您必须使用 MySQLdb 才能在 python 和 Twisted adbapi 中使用 MySQL 来执行异步连接。

MySQLdb:

sudo apt-get install python-dev
sudo apt-get install libmysqlclient-dev
pip3 install mysql-python

扭曲的 adbapi:

pip3 install twisted

Spider.py

def parse(self, response):
    yield {
        'item_id' : ...
        'item_name': ...
        ...
    }

pipelines.py中添加一个MySQLStorePipeline并在settings.py中声明:

pipelines.py

from twisted.enterprise import adbapi       #pip3 install twisted
from scrapy import log
from scrapy.conf import settings
import MySQLdb.cursors

class MySQLStorePipeline(object):
    #A pipeline to store the item in a MySQL database.
    #This implementation uses Twisted's asynchronous database API.


    def __init__(self):
        dbargs = settings.get('DB_CONN')
        self.dbpool = adbapi.ConnectionPool(
            "MySQLdb",
            cursorclass=MySQLdb.cursors.DictCursor,
            **dbargs
        )

    def process_item(self, item, spider):
        # run db query in thread pool
        query = self.dbpool.runInteraction(self._conditional_insert, item)
        query.addErrback(self.handle_error)
        return item

    def _conditional_insert(self, tx, item):
        # create record if doesn't exist. 
        # all this block run on it's own thread
        item_id = item['item_id']
        db_table = 'your_table_name'

        try:
            tx.execute("SELECT 1 FROM " + db_table + " WHERE item_id = %s", (item_id, ))
        except:
            print("## Query Failed:" + str(tx._last_executed))

        result = tx.fetchone()

        if result:
            log.msg("Item already stored in db: %s" % item, level=log.DEBUG)
        else:
            try:
                tx.execute(\
                    "INSERT INTO " + db_table + " (item_id, item_name) "
                    "values (%s, %s)",
                    (item_id, item['item_name'])
                )
                log.msg("Item stored in db: %s" % item, level=log.DEBUG)
            except:
                print("## Query Failed:" + str(tx._last_executed))

    def handle_error(self, e):
        log.err(e)

Settings.py

ITEM_PIPELINES = {
    'your_project.pipelines.your_projectPipeline': 300,
    'your_project.pipelines.MySQLStorePipeline': 600,
} #note: 

DB_CONN = {    
    'db': 'your_db',
    'user': 'your_username',
    'passwd': 'your_password',
    'host': 'your_host',
    'charset': 'utf8',
    'use_unicode': True,
} 

笔记:

用您的 SQL 凭据替换所有 your_*****。

上面的代码假设你的SQLtable只有2列:'item_id'、'item_name',当然你可以在INSERT INTO查询中修改.

如果您有任何问题,请发表评论。