在 Ray 中使用 Prophet 或 Auto ARIMA

Using Prophet or Auto ARIMA with Ray

关于 Ray 的一些事情我找不到明确的答案。 Ray 是一个用于数据处理和训练的分布式框架。为了使其以分布式方式工作,必须使用 Modin 或 Ray 支持的其他分布式数据分析工具,以便数据可以在整个集群上流动,但是如果我想使用像 Facebook 的 Prophet 或 ARIMA 这样的模型,它需要pandas 数据帧作为输入?当我使用 pandas 数据帧作为模型函数的参数时,它会只在单个节点上工作还是有可能的解决方法让它在集群上工作?

Ray 能够使用 pandas 个数据帧作为输入来训练模型!

目前,ARIMA 需要一些变通方法,因为它通常在幕后使用 statsmodels 库。为了确保模型正确序列化,需要一个额外的 pickle 步骤。 Ray 可能会在未来消除对 pickle 变通方法的需求。

查看 pickle 解决方法的解释:https://alkaline-ml.com/pmdarima/1.0.0/serialization.html

这里是 python 3.8 和 ray 1.8 的代码摘录。请注意,train_model() 和 inference_model() 函数的输入是 pandas 数据帧。额外的 pickle 步骤嵌入在这些函数中。 https://github.com/christy/AnyscaleDemos/blob/main/forecasting_demos/nyctaxi_arima_simple.ipynb

import ray
import pandas as pd
import pmdarima as pm
from pmdarima.model_selection import train_test_split

# read 8 months of clean, aggregated monthly taxi data
filename = "https://github.com/christy/MachineLearningTools/blob/master/data/clean_taxi_monthly.parquet?raw=true"
g_month = pd.read_parquet(filename) 

# Define a train_model function, default train on 6 months, inference 2
def train_model(theDF:pd.DataFrame, item_col:str
                , item_value:str, target_col:str
                , train_size:int=6) -> list:

    # split data into train/test
    train, test = train_test_split(theDF.loc[(theDF[item_col]==item_value), :], train_size=train_size)
    
    # train and fit auto.arima model
    model = pm.auto_arima(y=train[target_col]
                          ,X=train.loc[:, (train.columns!=target_col) 
                                          & (train.columns!=item_col)]
                         )
    # here is the extra pickle step to handle statsmodel objects
    return [train, test, pickle.dumps(model)]


# Define inference_model function
def inference_model(model_pickle:bytes, test:pd.DataFrame
                    , timestamp_col:str, item_col:str, target_col:str) -> pd.DataFrame:

    # unpickle the model
    model = pickle.loads(model_pickle)
    
    # inference on test data
    forecast = pd.DataFrame(model.predict(n_periods=test.shape[0]
                         , X=test.loc[:, (test.columns!=target_col) & (test.columns!=item_col)]
                         , index=test.index))
    
    return forecast


# start-up ray on your laptop for testing purposes
import ray
NUM_CPU = 2
ray.init(
    ignore_reinit_error=True
    , num_cpus = NUM_CPU
)

###########
# run your training as distributed jobs by using ray remote function calls
###########
    
# Convert your regular python functions to ray remote functions
train_model_remote = ray.remote(train_model).options(num_returns=3)  
inference_model_remote = ray.remote(inference_model)
    
# Train every model
item_list = list(g_month['pulocationid'].unique())
model = []
train = []
test = []

for p,v in enumerate(item_list):
    # ray remote eval
    temp_train, temp_test, temp_model = \
        train_model_remote.remote(g_month
                                  , item_col='pulocationid', item_value=v
                                  , target_col='trip_quantity'
                                  , train_size=6)
    train.append(temp_train)
    test.append(temp_test)
    model.append(temp_model)

# Inference every test dataset
result=[]
for p,v in enumerate(item_list):
    # ray remote eval
    result.append(inference_model_remote.remote(model[p], test[p]
                                                , timestamp_col='pickup_monthly'
                                                , item_col='pulocationid'
                                                , target_col='trip_quantity'))

# ray.get() means block until all objectIDs requested are available
forecast = ray.get(result)