在 Ray 中使用 Prophet 或 Auto ARIMA
Using Prophet or Auto ARIMA with Ray
关于 Ray 的一些事情我找不到明确的答案。 Ray 是一个用于数据处理和训练的分布式框架。为了使其以分布式方式工作,必须使用 Modin 或 Ray 支持的其他分布式数据分析工具,以便数据可以在整个集群上流动,但是如果我想使用像 Facebook 的 Prophet 或 ARIMA 这样的模型,它需要pandas 数据帧作为输入?当我使用 pandas 数据帧作为模型函数的参数时,它会只在单个节点上工作还是有可能的解决方法让它在集群上工作?
Ray 能够使用 pandas 个数据帧作为输入来训练模型!
目前,ARIMA 需要一些变通方法,因为它通常在幕后使用 statsmodels 库。为了确保模型正确序列化,需要一个额外的 pickle 步骤。 Ray 可能会在未来消除对 pickle 变通方法的需求。
查看 pickle 解决方法的解释:https://alkaline-ml.com/pmdarima/1.0.0/serialization.html
这里是 python 3.8 和 ray 1.8 的代码摘录。请注意,train_model() 和 inference_model() 函数的输入是 pandas 数据帧。额外的 pickle 步骤嵌入在这些函数中。 https://github.com/christy/AnyscaleDemos/blob/main/forecasting_demos/nyctaxi_arima_simple.ipynb
import ray
import pandas as pd
import pmdarima as pm
from pmdarima.model_selection import train_test_split
# read 8 months of clean, aggregated monthly taxi data
filename = "https://github.com/christy/MachineLearningTools/blob/master/data/clean_taxi_monthly.parquet?raw=true"
g_month = pd.read_parquet(filename)
# Define a train_model function, default train on 6 months, inference 2
def train_model(theDF:pd.DataFrame, item_col:str
, item_value:str, target_col:str
, train_size:int=6) -> list:
# split data into train/test
train, test = train_test_split(theDF.loc[(theDF[item_col]==item_value), :], train_size=train_size)
# train and fit auto.arima model
model = pm.auto_arima(y=train[target_col]
,X=train.loc[:, (train.columns!=target_col)
& (train.columns!=item_col)]
)
# here is the extra pickle step to handle statsmodel objects
return [train, test, pickle.dumps(model)]
# Define inference_model function
def inference_model(model_pickle:bytes, test:pd.DataFrame
, timestamp_col:str, item_col:str, target_col:str) -> pd.DataFrame:
# unpickle the model
model = pickle.loads(model_pickle)
# inference on test data
forecast = pd.DataFrame(model.predict(n_periods=test.shape[0]
, X=test.loc[:, (test.columns!=target_col) & (test.columns!=item_col)]
, index=test.index))
return forecast
# start-up ray on your laptop for testing purposes
import ray
NUM_CPU = 2
ray.init(
ignore_reinit_error=True
, num_cpus = NUM_CPU
)
###########
# run your training as distributed jobs by using ray remote function calls
###########
# Convert your regular python functions to ray remote functions
train_model_remote = ray.remote(train_model).options(num_returns=3)
inference_model_remote = ray.remote(inference_model)
# Train every model
item_list = list(g_month['pulocationid'].unique())
model = []
train = []
test = []
for p,v in enumerate(item_list):
# ray remote eval
temp_train, temp_test, temp_model = \
train_model_remote.remote(g_month
, item_col='pulocationid', item_value=v
, target_col='trip_quantity'
, train_size=6)
train.append(temp_train)
test.append(temp_test)
model.append(temp_model)
# Inference every test dataset
result=[]
for p,v in enumerate(item_list):
# ray remote eval
result.append(inference_model_remote.remote(model[p], test[p]
, timestamp_col='pickup_monthly'
, item_col='pulocationid'
, target_col='trip_quantity'))
# ray.get() means block until all objectIDs requested are available
forecast = ray.get(result)
关于 Ray 的一些事情我找不到明确的答案。 Ray 是一个用于数据处理和训练的分布式框架。为了使其以分布式方式工作,必须使用 Modin 或 Ray 支持的其他分布式数据分析工具,以便数据可以在整个集群上流动,但是如果我想使用像 Facebook 的 Prophet 或 ARIMA 这样的模型,它需要pandas 数据帧作为输入?当我使用 pandas 数据帧作为模型函数的参数时,它会只在单个节点上工作还是有可能的解决方法让它在集群上工作?
Ray 能够使用 pandas 个数据帧作为输入来训练模型!
目前,ARIMA 需要一些变通方法,因为它通常在幕后使用 statsmodels 库。为了确保模型正确序列化,需要一个额外的 pickle 步骤。 Ray 可能会在未来消除对 pickle 变通方法的需求。
查看 pickle 解决方法的解释:https://alkaline-ml.com/pmdarima/1.0.0/serialization.html
这里是 python 3.8 和 ray 1.8 的代码摘录。请注意,train_model() 和 inference_model() 函数的输入是 pandas 数据帧。额外的 pickle 步骤嵌入在这些函数中。 https://github.com/christy/AnyscaleDemos/blob/main/forecasting_demos/nyctaxi_arima_simple.ipynb
import ray
import pandas as pd
import pmdarima as pm
from pmdarima.model_selection import train_test_split
# read 8 months of clean, aggregated monthly taxi data
filename = "https://github.com/christy/MachineLearningTools/blob/master/data/clean_taxi_monthly.parquet?raw=true"
g_month = pd.read_parquet(filename)
# Define a train_model function, default train on 6 months, inference 2
def train_model(theDF:pd.DataFrame, item_col:str
, item_value:str, target_col:str
, train_size:int=6) -> list:
# split data into train/test
train, test = train_test_split(theDF.loc[(theDF[item_col]==item_value), :], train_size=train_size)
# train and fit auto.arima model
model = pm.auto_arima(y=train[target_col]
,X=train.loc[:, (train.columns!=target_col)
& (train.columns!=item_col)]
)
# here is the extra pickle step to handle statsmodel objects
return [train, test, pickle.dumps(model)]
# Define inference_model function
def inference_model(model_pickle:bytes, test:pd.DataFrame
, timestamp_col:str, item_col:str, target_col:str) -> pd.DataFrame:
# unpickle the model
model = pickle.loads(model_pickle)
# inference on test data
forecast = pd.DataFrame(model.predict(n_periods=test.shape[0]
, X=test.loc[:, (test.columns!=target_col) & (test.columns!=item_col)]
, index=test.index))
return forecast
# start-up ray on your laptop for testing purposes
import ray
NUM_CPU = 2
ray.init(
ignore_reinit_error=True
, num_cpus = NUM_CPU
)
###########
# run your training as distributed jobs by using ray remote function calls
###########
# Convert your regular python functions to ray remote functions
train_model_remote = ray.remote(train_model).options(num_returns=3)
inference_model_remote = ray.remote(inference_model)
# Train every model
item_list = list(g_month['pulocationid'].unique())
model = []
train = []
test = []
for p,v in enumerate(item_list):
# ray remote eval
temp_train, temp_test, temp_model = \
train_model_remote.remote(g_month
, item_col='pulocationid', item_value=v
, target_col='trip_quantity'
, train_size=6)
train.append(temp_train)
test.append(temp_test)
model.append(temp_model)
# Inference every test dataset
result=[]
for p,v in enumerate(item_list):
# ray remote eval
result.append(inference_model_remote.remote(model[p], test[p]
, timestamp_col='pickup_monthly'
, item_col='pulocationid'
, target_col='trip_quantity'))
# ray.get() means block until all objectIDs requested are available
forecast = ray.get(result)