在 __main__ 中调用 While 循环函数?
Calling While loop Function in __main__?
这里我有一个程序,它轮询队列中的事件,如果找到它,它会执行一个 REST 命令 API。此外,如果发现事件,它会打印我需要用作 stopLoss
的当前价格。这段代码完全按照我的意愿运行,但是,当我尝试在 __main__
中调用函数 rates()
时,程序就停止了 运行.
删除引用 stopLoss = rates()
,程序在没有 stopLoss
的情况下运行良好,但我需要速率 -.001
作为我的 stopLoss
。
代码如下:
import Queue
import threading
import time
import json
import oandapy
from execution import Execution
from settings import STREAM_DOMAIN, API_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID
from strategy import TestRandomStrategy
from streaming import StreamingForexPrices
#polls API for Current Price
def stop():
while True:
oanda = oandapy.API(environment="practice", access_token="xxxxxx")
response = oanda.get_prices(instruments="EUR_USD")
prices = response.get("prices")
asking_price = prices[0].get("ask")
s = asking_price - .001
return s
#Checks for events and executes order
def trade(events, strategy, execution):
while True:
try:
event = events.get(False)
except Queue.Empty:
pass
else:
if event is not None:
if event.type == 'TICK':
strategy.calculate_signals(event)
elif event.type == 'ORDER':
print
execution.execute_order(event)
def rates(events):
while True:
try:
event = events.get(False)
except Queue.Empty:
pass
else:
if event.type == 'TICK':
r = stop()
print r
if __name__ == "__main__":
heartbeat = 0 # Half a second between polling
events = Queue.Queue()
# Trade 1 unit of EUR/USD
instrument = "EUR_USD"
units = 1
stopLoss = rates() #Problem area!!!!!!>>>>>>>>>>>>>>//////////////////////////
prices = StreamingForexPrices(
STREAM_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID,
instrument, events
)
execution = Execution(API_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID)
strategy = TestRandomStrategy(instrument, units, events, stopLoss)
#Threads
trade_thread = threading.Thread(target=trade, args=(events, strategy, execution))
price_thread = threading.Thread(target=prices.stream_to_queue, args=[])
stop_thread = threading.Thread(target=rates, args=(events,))
# Start both threads
trade_thread.start()
price_thread.start()
stop_thread.start()
好的,到目前为止还没有答案,所以我会试试。
您的主要问题似乎是,您不知道如何在线程之间交换数据。
首先是价格问题
循环在这里:
while True:
oanda = oandapy.API(environment="practice", access_token="xxxxxx")
response = oanda.get_prices(instruments="EUR_USD")
prices = response.get("prices")
asking_price = prices[0].get("ask")
s = asking_price - .001
return s
没有作用,因为return s
会自动跳出。
所以你需要的是一个存储s
的共享变量。您可以使用 threading.Lock
来保护对其的访问。最简单的方法是继承 Thread 并使 s
成为这样的实例属性(我将其命名为 price
):
class PricePoller(threading.Thread):
def __init__(self, interval):
super(PricePoller, self).__init__()
# private attribute, will be accessed as property via
# threadsafe getter and setter
self._price = None
# lock guarding access to _price
self._dataLock = threading.Lock()
# polling interval
self.interval = interval
# set this thread as deamon, so it will be killed when
# the main thread dies
self.deamon = True
# create an event that allows us to exit the mainloop
# and terminate the thread safely
self._stopEvent = threading.Event()
def getPrice(self):
# use _dataLock to get threadsafe access to self._price
with self._dataLock:
return self._price
def setPrice(self, price)
# use _dataLock to get threadsafe access to self._price
with self._dataLock:
self._price = price
price = property(getPrice, setPrice, None)
def run(self):
while not self.stopEvent.isSet():
oanda = oandapy.API(environment="practice", access_token="xxxxxx")
response = oanda.get_prices(instruments="EUR_USD")
prices = response.get("prices")
asking_price = prices[0].get("ask")
self.price = asking_price - .001
time.sleep(self.interval) # don't spam the server
def stop(self):
self._stopEvent.set()
然后可以开始:
poller = PricePoller(heartbeat)
poller.start()
而且poller.price
哪里都可以拿到价格!如果愿意,您甚至可以将轮询器传递给其他线程。
但!如果您尝试在 poller.start()
之后立即获取价格,您肯定会得到 None
。为什么这个? poller.start()
不会阻塞,因此当您的主线程继续运行并尝试获得第一个价格时,您的轮询器甚至还没有完成启动!
如何解决这个问题?引入另一个 threading.Event
并使用它的功能 wait
让主线程等待直到轮询线程设置它。我把实现留给你。
我只是猜测这就是您想要的...只看您的代码,您根本不必将 stop
函数放在线程中 ,您只需将 stopLess = rates()
替换为 stopLess = stop()
,因为您不会在任何地方更新价格轮询的结果!但我认为你想在某个时候这样做,否则将它放入一个线程中是没有意义的。
现在开始排队和你的'event stream'。
这个片段:
try:
event = events.get(False)
except Queue.Empty:
pass
也可以是:
event = events.get()
无论如何你在此期间什么都不做,最好让 Queue
处理等待事件。
然后,据我所知,你有两个线程调用 Queue.get
,但是这个函数会在检索元素后从队列中删除它!这意味着谁先获得事件,谁就使用它,另一个线程将永远看不到它。但是使用上述轮询器解决方案,我认为您可以摆脱 stop_thread
,这也解决了该问题。
现在是关于线程的一般说明。
一个线程有它自己的 'chain' 调用,这些调用从它的 run
方法(或者如果你不子类化的话你提供的方法 target
)开始。
这意味着由 run
调用的任何函数都由该线程执行,并且还由该线程依次调用的所有函数(依此类推)。然而,两个线程完全有可能同时执行同一个函数!如果不使用同步手段(例如事件、锁或屏障),则无法知道哪个线程在某个时间执行了哪部分代码。
如果被调用函数中使用的所有变量都是本地的或者在调用函数中是本地的,这没有问题:
def onlyLocal(x, n):
if n == 0:
return x
return onlyLocal(x*2, n-1)
或独家阅读:
def onlyRead(myarray):
t = time.time()
return t - sum(myarray)
但是一旦您同时从多个线程读取和写入一个变量,您就需要保护对这些变量的访问,否则如果您传递的对象被多个线程识别(例如 self
):
def setPrice(self, price):
self._price = price
或者如果您的函数使用来自多个线程访问的外部范围的变量:
def variableFromOutside(y):
global x
x += y
return y
您永远无法确定没有线程(2) 更改您(1) 刚刚读取的变量,同时您正在处理它并且在您使用当时无效的值更新它之前。
global x ; Thread1 ; Thread2 ;
2 ; y = x ; z = x ;
2 ; y **= 3 ; x = z+1 ;
3 ; x = y-4 ; return ;
4 ; return ; ... ;
这就是为什么您必须使用锁来保护对这些变量的访问。带锁 (l
):
global x ; Thread1 ; Thread2 ;
2 ; l.acqcuire() ; l.acquire() ;
2 ; y = x ; | ;
2 ; y **= 3 ; | ;
2 ; x = y-4 ; | ;
4 ; l.release() ; v ;
4 ; return ; z = x ;
4 ; ... ; x = z+1 ;
5 ; ... ; l.release() ;
这里Thread1先于Thread2获得锁。 Thread2 因此必须等到 Thread1 在调用 acquire
returns.
之前再次释放锁
当您使用 with lock:
时,acquire
和 release
会自动调用。
另请注意,在这个玩具示例中,Thread2 可能会先于 Thread1 获取锁,但至少它们仍然不会相互干扰。
这是对一个大主题的简要介绍,阅读一些关于线程并行化的内容并尝试一下。没有比实践更好的学习方法了。
我在浏览器中编写了这段代码,因此未测试!如果有人发现问题,请在评论中告诉我或随时直接更改。
这里我有一个程序,它轮询队列中的事件,如果找到它,它会执行一个 REST 命令 API。此外,如果发现事件,它会打印我需要用作 stopLoss
的当前价格。这段代码完全按照我的意愿运行,但是,当我尝试在 __main__
中调用函数 rates()
时,程序就停止了 运行.
删除引用 stopLoss = rates()
,程序在没有 stopLoss
的情况下运行良好,但我需要速率 -.001
作为我的 stopLoss
。
代码如下:
import Queue
import threading
import time
import json
import oandapy
from execution import Execution
from settings import STREAM_DOMAIN, API_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID
from strategy import TestRandomStrategy
from streaming import StreamingForexPrices
#polls API for Current Price
def stop():
while True:
oanda = oandapy.API(environment="practice", access_token="xxxxxx")
response = oanda.get_prices(instruments="EUR_USD")
prices = response.get("prices")
asking_price = prices[0].get("ask")
s = asking_price - .001
return s
#Checks for events and executes order
def trade(events, strategy, execution):
while True:
try:
event = events.get(False)
except Queue.Empty:
pass
else:
if event is not None:
if event.type == 'TICK':
strategy.calculate_signals(event)
elif event.type == 'ORDER':
print
execution.execute_order(event)
def rates(events):
while True:
try:
event = events.get(False)
except Queue.Empty:
pass
else:
if event.type == 'TICK':
r = stop()
print r
if __name__ == "__main__":
heartbeat = 0 # Half a second between polling
events = Queue.Queue()
# Trade 1 unit of EUR/USD
instrument = "EUR_USD"
units = 1
stopLoss = rates() #Problem area!!!!!!>>>>>>>>>>>>>>//////////////////////////
prices = StreamingForexPrices(
STREAM_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID,
instrument, events
)
execution = Execution(API_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID)
strategy = TestRandomStrategy(instrument, units, events, stopLoss)
#Threads
trade_thread = threading.Thread(target=trade, args=(events, strategy, execution))
price_thread = threading.Thread(target=prices.stream_to_queue, args=[])
stop_thread = threading.Thread(target=rates, args=(events,))
# Start both threads
trade_thread.start()
price_thread.start()
stop_thread.start()
好的,到目前为止还没有答案,所以我会试试。
您的主要问题似乎是,您不知道如何在线程之间交换数据。
首先是价格问题
循环在这里:
while True:
oanda = oandapy.API(environment="practice", access_token="xxxxxx")
response = oanda.get_prices(instruments="EUR_USD")
prices = response.get("prices")
asking_price = prices[0].get("ask")
s = asking_price - .001
return s
没有作用,因为return s
会自动跳出。
所以你需要的是一个存储s
的共享变量。您可以使用 threading.Lock
来保护对其的访问。最简单的方法是继承 Thread 并使 s
成为这样的实例属性(我将其命名为 price
):
class PricePoller(threading.Thread):
def __init__(self, interval):
super(PricePoller, self).__init__()
# private attribute, will be accessed as property via
# threadsafe getter and setter
self._price = None
# lock guarding access to _price
self._dataLock = threading.Lock()
# polling interval
self.interval = interval
# set this thread as deamon, so it will be killed when
# the main thread dies
self.deamon = True
# create an event that allows us to exit the mainloop
# and terminate the thread safely
self._stopEvent = threading.Event()
def getPrice(self):
# use _dataLock to get threadsafe access to self._price
with self._dataLock:
return self._price
def setPrice(self, price)
# use _dataLock to get threadsafe access to self._price
with self._dataLock:
self._price = price
price = property(getPrice, setPrice, None)
def run(self):
while not self.stopEvent.isSet():
oanda = oandapy.API(environment="practice", access_token="xxxxxx")
response = oanda.get_prices(instruments="EUR_USD")
prices = response.get("prices")
asking_price = prices[0].get("ask")
self.price = asking_price - .001
time.sleep(self.interval) # don't spam the server
def stop(self):
self._stopEvent.set()
然后可以开始:
poller = PricePoller(heartbeat)
poller.start()
而且poller.price
哪里都可以拿到价格!如果愿意,您甚至可以将轮询器传递给其他线程。
但!如果您尝试在 poller.start()
之后立即获取价格,您肯定会得到 None
。为什么这个? poller.start()
不会阻塞,因此当您的主线程继续运行并尝试获得第一个价格时,您的轮询器甚至还没有完成启动!
如何解决这个问题?引入另一个 threading.Event
并使用它的功能 wait
让主线程等待直到轮询线程设置它。我把实现留给你。
我只是猜测这就是您想要的...只看您的代码,您根本不必将 stop
函数放在线程中 ,您只需将 stopLess = rates()
替换为 stopLess = stop()
,因为您不会在任何地方更新价格轮询的结果!但我认为你想在某个时候这样做,否则将它放入一个线程中是没有意义的。
现在开始排队和你的'event stream'。
这个片段:
try:
event = events.get(False)
except Queue.Empty:
pass
也可以是:
event = events.get()
无论如何你在此期间什么都不做,最好让 Queue
处理等待事件。
然后,据我所知,你有两个线程调用 Queue.get
,但是这个函数会在检索元素后从队列中删除它!这意味着谁先获得事件,谁就使用它,另一个线程将永远看不到它。但是使用上述轮询器解决方案,我认为您可以摆脱 stop_thread
,这也解决了该问题。
现在是关于线程的一般说明。
一个线程有它自己的 'chain' 调用,这些调用从它的 run
方法(或者如果你不子类化的话你提供的方法 target
)开始。
这意味着由 run
调用的任何函数都由该线程执行,并且还由该线程依次调用的所有函数(依此类推)。然而,两个线程完全有可能同时执行同一个函数!如果不使用同步手段(例如事件、锁或屏障),则无法知道哪个线程在某个时间执行了哪部分代码。
如果被调用函数中使用的所有变量都是本地的或者在调用函数中是本地的,这没有问题:
def onlyLocal(x, n):
if n == 0:
return x
return onlyLocal(x*2, n-1)
或独家阅读:
def onlyRead(myarray):
t = time.time()
return t - sum(myarray)
但是一旦您同时从多个线程读取和写入一个变量,您就需要保护对这些变量的访问,否则如果您传递的对象被多个线程识别(例如 self
):
def setPrice(self, price):
self._price = price
或者如果您的函数使用来自多个线程访问的外部范围的变量:
def variableFromOutside(y):
global x
x += y
return y
您永远无法确定没有线程(2) 更改您(1) 刚刚读取的变量,同时您正在处理它并且在您使用当时无效的值更新它之前。
global x ; Thread1 ; Thread2 ;
2 ; y = x ; z = x ;
2 ; y **= 3 ; x = z+1 ;
3 ; x = y-4 ; return ;
4 ; return ; ... ;
这就是为什么您必须使用锁来保护对这些变量的访问。带锁 (l
):
global x ; Thread1 ; Thread2 ;
2 ; l.acqcuire() ; l.acquire() ;
2 ; y = x ; | ;
2 ; y **= 3 ; | ;
2 ; x = y-4 ; | ;
4 ; l.release() ; v ;
4 ; return ; z = x ;
4 ; ... ; x = z+1 ;
5 ; ... ; l.release() ;
这里Thread1先于Thread2获得锁。 Thread2 因此必须等到 Thread1 在调用 acquire
returns.
之前再次释放锁
当您使用 with lock:
时,acquire
和 release
会自动调用。
另请注意,在这个玩具示例中,Thread2 可能会先于 Thread1 获取锁,但至少它们仍然不会相互干扰。
这是对一个大主题的简要介绍,阅读一些关于线程并行化的内容并尝试一下。没有比实践更好的学习方法了。
我在浏览器中编写了这段代码,因此未测试!如果有人发现问题,请在评论中告诉我或随时直接更改。