结束包含无限循环的线程
Ending a thread that contains an infinite loop
我正在尝试使用 python 在检测到运动后打开一些 RGB 灯。我的代码在下面,但我无法让线程结束并关闭 LEDS。所有发生的事情是我在键盘中断后卡住了。
基本上,当我 运行 代码可以检测运动并打开灯时,但是当我尝试以键盘中断异常结束程序时,要么程序挂起并且不会停止,要么它停止,但 LED 不会关闭并保持点亮。我查看了有关如何停止线程 运行ning 的各种页面,但其中 none 有帮助。下面是我看过的文章。我认为我的问题是线程 运行 照明模式循环没有停止,所以当我尝试关闭每个 LED 时,它会立即重新打开,但我不确定。
我试过任何方法都无法让线程停止。任何帮助将非常感激。
How to stop a looping thread in Python?
https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html
https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/
Stopping a python thread running an Infinite Loop
Threading infinite loop in python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jan 1 10:10:25 2020
@author: thomas
"""
import RPi.GPIO as GPIO
import time
import subprocess
import argparse
from neopixel import *
import datetime as dt
from threading import Thread
from threading import enumerate
from threading import Event as EV
import sys
global stop
GPIO.setmode(GPIO.BCM)
# LED strip configuration:
LED_COUNT = 288 # Number of LED pixels.
LED_PIN = 18 # GPIO pin connected to the pixels (must support PWM!).
LED_FREQ_HZ = 800000 # LED signal frequency in hertz (usually 800khz)
LED_DMA = 10 # DMA channel to use for generating signal (try 10)
LED_BRIGHTNESS = 255 # Set to 0 for darkest and 255 for brightest
LED_INVERT = False # True to invert the signal (when using NPN transistor level shift)
LED_CHANNEL = 0
LED_STRIP = ws.SK6812_STRIP_RGBW
#LED_STRIP = ws.SK6812W_STRIP
# Define functions which animate LEDs in various ways.
def wheel(pos):
"""Generate rainbow colors across 0-255 positions."""
if pos < 85:
return Color(pos * 3, 255 - pos * 3, 0)
elif pos < 170:
pos -= 85
return Color(255 - pos * 3, 0, pos * 3)
else:
pos -= 170
return Color(0, pos * 3, 255 - pos * 3)
def colorWipe(strip, color, wait_ms=20):
"""Wipe color across display a pixel at a time."""
for i in range(strip.numPixels()):
strip.setPixelColor(i, color)
strip.show()
time.sleep(wait_ms/1000.0)
def rainbowCycle(strip, wait_ms=20, iterations=5):
"""Draw rainbow that uniformly distributes itself across all pixels."""
while not stop:
for j in range(256*iterations):
for i in range(strip.numPixels()):
strip.setPixelColor(i, wheel(((i * 256 // strip.numPixels()) + j) & 255))
strip.show()
time.sleep(wait_ms/1000.0)
class RainbowLights(Thread):
def __init__(self, name="RainbowLights"):
self._stopevent = EV()
self.sleeppreiod = 1.0
Thread.__init__(self, name=name)
def run(self):
while not self._stopevent.isSet():
rainbowCycle(strip)
self._stopevent.wait(self._sleepperiod)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--clear', action='store_true', help='clear the display on exit')
args = parser.parse_args()
# Create NeoPixel object with appropriate configuration.
strip = Adafruit_NeoPixel(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS, LED_CHANNEL, LED_STRIP)
# Intialize the library (must be called once before other functions).
strip.begin()
print ('Press Ctrl-C to quit.')
if not args.clear:
print('Use "-c" argument to clear LEDs on exit')
GPIO.setup(22, GPIO.IN) #PIR
rta = "none"
time1 = "none"
stop = False
RainbowLights = RainbowLights()
try:
time.sleep(2) # to stabilize sensor
while True:
trigger = GPIO.input(22)
if trigger == 1:
print("Motion Detected...")
if rta == "running":
print("Lights Already On")
else:
RainbowLights.start()
rta = "running"
time1 = dt.datetime.now()
time.sleep(5) #to avoid multiple detection
elif trigger == 0:
print("No Motion...")
print("Lights turned on at " + str(time1))
time.sleep(0.1) #loop delay, should be less than detection delay
except KeyboardInterrupt:
RainbowLights._stopevent.set()
stop = True
time.sleep(5)
if args.clear:
colorWipe(strip, Color(0,0,0), 10)
GPIO.cleanup()
减少到最小的例子,这样它就可以 运行 没有硬件。为修复添加了注释。主要问题是 stop
仅在 256*iterations*wait_ms/1000.0
或 25.6 秒后才被检查,因此停止似乎没有反应。还有一些其他错误。全局 stop
足以结束线程,因此不需要 Event
。
import time
import argparse
from threading import Thread
import sys
global stop
def rainbowCycle(strip, wait_ms=20, iterations=5):
"""Draw rainbow that uniformly distributes itself across all pixels."""
while not stop:
for j in range(256*iterations):
if stop: break # add stop check here for responsiveness.
print(f'rainbowCycle')
time.sleep(wait_ms/1000.0)
print('stopping...')
class RainbowLights(Thread):
def __init__(self, name="RainbowLights"):
print('__init__')
Thread.__init__(self, name=name)
def run(self):
print('run')
rainbowCycle(strip)
print('stopped')
if __name__ == '__main__':
strip = None
print('Press Ctrl-C to quit.')
rta = "none"
stop = False
rainbow_lights = RainbowLights() # renamed variable so it doesn't replace the class.
try:
time.sleep(2) # to stabilize sensor
while True:
trigger = 1 # pretend some motion
if trigger == 1:
print("Motion Detected...")
if rta == "running":
print("Lights Already On")
else:
rainbow_lights.start()
rta = "running"
time.sleep(5) #to avoid multiple detection
elif trigger == 0:
print("No Motion...")
time.sleep(0.1) #loop delay, should be less than detection delay
except KeyboardInterrupt:
print('KeyboardInterrupt')
print('stop')
stop = True
Press Ctrl-C to quit.
__init__
Motion Detected...
run
rainbowCycle
rainbowCycle
rainbowCycle
rainbowCycle
rainbowCycle
rainbowCycle
rainbowCycle
rainbowCycle
rainbowCycle
rainbowCycle
rainbowCycle
rainbowCycle
KeyboardInterrupt
stop
stopping...
stopped
我正在尝试使用 python 在检测到运动后打开一些 RGB 灯。我的代码在下面,但我无法让线程结束并关闭 LEDS。所有发生的事情是我在键盘中断后卡住了。
基本上,当我 运行 代码可以检测运动并打开灯时,但是当我尝试以键盘中断异常结束程序时,要么程序挂起并且不会停止,要么它停止,但 LED 不会关闭并保持点亮。我查看了有关如何停止线程 运行ning 的各种页面,但其中 none 有帮助。下面是我看过的文章。我认为我的问题是线程 运行 照明模式循环没有停止,所以当我尝试关闭每个 LED 时,它会立即重新打开,但我不确定。
我试过任何方法都无法让线程停止。任何帮助将非常感激。
How to stop a looping thread in Python?
https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html
https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/
Stopping a python thread running an Infinite Loop
Threading infinite loop in python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jan 1 10:10:25 2020
@author: thomas
"""
import RPi.GPIO as GPIO
import time
import subprocess
import argparse
from neopixel import *
import datetime as dt
from threading import Thread
from threading import enumerate
from threading import Event as EV
import sys
global stop
GPIO.setmode(GPIO.BCM)
# LED strip configuration:
LED_COUNT = 288 # Number of LED pixels.
LED_PIN = 18 # GPIO pin connected to the pixels (must support PWM!).
LED_FREQ_HZ = 800000 # LED signal frequency in hertz (usually 800khz)
LED_DMA = 10 # DMA channel to use for generating signal (try 10)
LED_BRIGHTNESS = 255 # Set to 0 for darkest and 255 for brightest
LED_INVERT = False # True to invert the signal (when using NPN transistor level shift)
LED_CHANNEL = 0
LED_STRIP = ws.SK6812_STRIP_RGBW
#LED_STRIP = ws.SK6812W_STRIP
# Define functions which animate LEDs in various ways.
def wheel(pos):
"""Generate rainbow colors across 0-255 positions."""
if pos < 85:
return Color(pos * 3, 255 - pos * 3, 0)
elif pos < 170:
pos -= 85
return Color(255 - pos * 3, 0, pos * 3)
else:
pos -= 170
return Color(0, pos * 3, 255 - pos * 3)
def colorWipe(strip, color, wait_ms=20):
"""Wipe color across display a pixel at a time."""
for i in range(strip.numPixels()):
strip.setPixelColor(i, color)
strip.show()
time.sleep(wait_ms/1000.0)
def rainbowCycle(strip, wait_ms=20, iterations=5):
"""Draw rainbow that uniformly distributes itself across all pixels."""
while not stop:
for j in range(256*iterations):
for i in range(strip.numPixels()):
strip.setPixelColor(i, wheel(((i * 256 // strip.numPixels()) + j) & 255))
strip.show()
time.sleep(wait_ms/1000.0)
class RainbowLights(Thread):
def __init__(self, name="RainbowLights"):
self._stopevent = EV()
self.sleeppreiod = 1.0
Thread.__init__(self, name=name)
def run(self):
while not self._stopevent.isSet():
rainbowCycle(strip)
self._stopevent.wait(self._sleepperiod)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--clear', action='store_true', help='clear the display on exit')
args = parser.parse_args()
# Create NeoPixel object with appropriate configuration.
strip = Adafruit_NeoPixel(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS, LED_CHANNEL, LED_STRIP)
# Intialize the library (must be called once before other functions).
strip.begin()
print ('Press Ctrl-C to quit.')
if not args.clear:
print('Use "-c" argument to clear LEDs on exit')
GPIO.setup(22, GPIO.IN) #PIR
rta = "none"
time1 = "none"
stop = False
RainbowLights = RainbowLights()
try:
time.sleep(2) # to stabilize sensor
while True:
trigger = GPIO.input(22)
if trigger == 1:
print("Motion Detected...")
if rta == "running":
print("Lights Already On")
else:
RainbowLights.start()
rta = "running"
time1 = dt.datetime.now()
time.sleep(5) #to avoid multiple detection
elif trigger == 0:
print("No Motion...")
print("Lights turned on at " + str(time1))
time.sleep(0.1) #loop delay, should be less than detection delay
except KeyboardInterrupt:
RainbowLights._stopevent.set()
stop = True
time.sleep(5)
if args.clear:
colorWipe(strip, Color(0,0,0), 10)
GPIO.cleanup()
减少到最小的例子,这样它就可以 运行 没有硬件。为修复添加了注释。主要问题是 stop
仅在 256*iterations*wait_ms/1000.0
或 25.6 秒后才被检查,因此停止似乎没有反应。还有一些其他错误。全局 stop
足以结束线程,因此不需要 Event
。
import time
import argparse
from threading import Thread
import sys
global stop
def rainbowCycle(strip, wait_ms=20, iterations=5):
"""Draw rainbow that uniformly distributes itself across all pixels."""
while not stop:
for j in range(256*iterations):
if stop: break # add stop check here for responsiveness.
print(f'rainbowCycle')
time.sleep(wait_ms/1000.0)
print('stopping...')
class RainbowLights(Thread):
def __init__(self, name="RainbowLights"):
print('__init__')
Thread.__init__(self, name=name)
def run(self):
print('run')
rainbowCycle(strip)
print('stopped')
if __name__ == '__main__':
strip = None
print('Press Ctrl-C to quit.')
rta = "none"
stop = False
rainbow_lights = RainbowLights() # renamed variable so it doesn't replace the class.
try:
time.sleep(2) # to stabilize sensor
while True:
trigger = 1 # pretend some motion
if trigger == 1:
print("Motion Detected...")
if rta == "running":
print("Lights Already On")
else:
rainbow_lights.start()
rta = "running"
time.sleep(5) #to avoid multiple detection
elif trigger == 0:
print("No Motion...")
time.sleep(0.1) #loop delay, should be less than detection delay
except KeyboardInterrupt:
print('KeyboardInterrupt')
print('stop')
stop = True
Press Ctrl-C to quit. __init__ Motion Detected... run rainbowCycle rainbowCycle rainbowCycle rainbowCycle rainbowCycle rainbowCycle rainbowCycle rainbowCycle rainbowCycle rainbowCycle rainbowCycle rainbowCycle KeyboardInterrupt stop stopping... stopped