Build Custom Federated averaging process with ValueError: Layer sequential expects 1 inputs, but it received 3 input tensors

Build Custom Federated averaging process with ValueError: Layer sequential expects 1 inputs, but it received 3 input tensors

我正在尝试从 csv 加载数据集并对可用数据执行一些联合学习。

我设法从给定的 csv 文件加载联合数据集并加载训练数据和测试数据。

我现在的问题是如何重现一个工作示例来构建一个迭代过程,对这些数据执行自定义联合平均。

这是我的代码,但它不起作用:

import collections
import os

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_federated as tff
from absl import app
from tensorflow.keras import layers

from src.main import Parameters

global input_spec


def main(args):
    working_dir = "D:/User/Documents/GitHub/TriaBaseMLBackup/input/fakehdfs/nms/ystr=2016/ymstr=1/ymdstr=26"
    client_id_colname = 'counter'
    SHUFFLE_BUFFER = 1000
    NUM_EPOCHS = 1

    for root, dirs, files in os.walk(working_dir):
        file_list = []

        for filename in files:
            if filename.endswith('.csv'):
                file_list.append(os.path.join(root, filename))
        df_list = []
        for file in file_list:
            df = pd.read_csv(file, delimiter="|", usecols=[1, 2, 6, 7], header=None, na_values=["NIL"],
                             na_filter=True, names=["time", "meas_info", "counter", "value"])
            # df_list.append(df[["value"]])

        if df_list:
            rawdata = pd.concat(df_list)

    client_ids = df.get(client_id_colname)
    train_client_ids = client_ids.sample(frac=0.5).tolist()

    # test_client_ids = [x for x in client_ids if x not in train_client_ids]
    example_dataset = train_data.create_tf_dataset_for_client(
    train_data.client_ids[0]
     )
    def create_tf_dataset_for_client_fn(client_id):
        # a function which takes a client_id and returns a
        # tf.data.Dataset for that client
        # target = df.pop('value')
        client_data = df[df['value'] == client_id]
        print(df.head())
        features = ['time', 'meas_info', 'value']
        LABEL_COLUMN = 'counter'
        dataset = tf.data.Dataset.from_tensor_slices(
            (collections.OrderedDict(df[features].to_dict('list')),
             df[LABEL_COLUMN].to_list())
        )
        global input_spec
        input_spec = dataset.element_spec
        dataset = dataset.shuffle(SHUFFLE_BUFFER).batch(1).repeat(NUM_EPOCHS)
        return dataset

    train_data = tff.simulation.ClientData.from_clients_and_fn(
        client_ids=train_client_ids,
        create_tf_dataset_for_client_fn=create_tf_dataset_for_client_fn
    )

    # split client id into train and test clients
    loss_builder = tf.keras.losses.SparseCategoricalCrossentropy
    metrics_builder = lambda: [tf.keras.metrics.SparseCategoricalAccuracy()]

    def retrieve_model():
        initializer = tf.keras.initializers.GlorotNormal(seed=0)
        model = tf.keras.models.Sequential([
            tf.keras.layers.LSTM(2, input_shape=(1, 2), return_sequences=True),
            tf.keras.layers.Dense(256, activation=tf.nn.relu),
            tf.keras.layers.Activation(tf.nn.softmax),
        ])

        return model

    print(input_spec)

    def tff_model_fn() -> tff.learning.Model:
        return tff.learning.from_keras_model(
            keras_model=retrieve_model(),
            input_spec=example_dataset.element_spec,
            loss=loss_builder(),
            metrics=metrics_builder())

    iterative_process = tff.learning.build_federated_averaging_process(
        tff_model_fn, Parameters.server_adam_optimizer_fn, Parameters.client_adam_optimizer_fn)
    server_state = iterative_process.initialize()

    for round_num in range(Parameters.FLAGS.total_rounds):
        sampled_clients = np.random.choice(
            train_data.client_ids,
            size=Parameters.FLAGS.train_clients_per_round,
            replace=False)
        sampled_train_data = [
            train_data.create_tf_dataset_for_client(client)
            for client in sampled_clients
        ]
        server_state, metrics = iterative_process.next(server_state, sampled_train_data)
        train_metrics = metrics['train']
        print(metrics)


if __name__ == '__main__':
    app.run(main)


def start():
    app.run(main)

这是input_spec输出

(OrderedDict([('time', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('meas_info', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('value', TensorSpec(shape=(), dtype=tf.int64, name=None))]), TensorSpec(shape=(), dtype=tf.float32, name=None))

这是我得到的错误

ValueError: Layer sequential expects 1 inputs, but it received 3 input tensors. Inputs received: [<tf.Tensor 'batch_input:0' shape=() dtype=int32>, <tf.Tensor 'batch_input_1:0' shape=() dtype=int32>, <tf.Tensor 'batch_input_2:0' shape=() dtype=int64>]

谁能帮我解决这个问题?

作为错误信息: ValueError: Layer sequential expects 1 inputs, but it received 3 input tensors. 表示,Keras 模型仅使用单个输入(列表中的第一层)定义:

model = tf.keras.models.Sequential([
  tf.keras.layers.LSTM(2, input_shape=(1, 2), return_sequences=True),
  tf.keras.layers.Dense(256, activation=tf.nn.relu),
  tf.keras.layers.Activation(tf.nn.softmax),
])

尝试检查 model.input_spec 以查看模型期望将哪些对象作为输入提供。

>>> [InputSpec(shape=(None, None, 2), ndim=3)]

其中数据集定义了输入特征的 3 个张量 OrderedDict

features = ['time', 'meas_info', 'value']
LABEL_COLUMN = 'counter'
dataset = tf.data.Dataset.from_tensor_slices(
   (collections.OrderedDict(df[features].to_dict('list')),
   df[LABEL_COLUMN].to_list())
)

尝试检查 dataset.element_spec 的值以查看数据集将为模型提供哪些对象。

要使它们兼容,需要更改模型定义或数据集。我假设需要数据集中的三个特征,在这种情况下,我们想告诉 Keras 我们有 OrderedDict 中的三个特征。我们需要使用 Functional model API from Keras.

SEQUENCE_LENGTH = 5
input_dict =  {f: tf.keras.layers.Input(shape=(SEQUENCE_LENGTH, 1), name=f) for f in features}
concatenated_inputs = tf.keras.layers.Concatenate()(input_dict.values())
lstm_output = tf.keras.layers.LSTM(2, input_shape=(1, 2), return_sequences=True)(concatenated_inputs)
logits = tf.keras.layers.Dense(256, activation=tf.nn.relu)(lstm_output)
predictions = tf.keras.layers.Activation(tf.nn.softmax)(logits)
model = tf.keras.models.Model(inputs=input_dict, outputs=predictions

请注意,对于 LSTM 层,我需要提供额外的 SEQUENCE_LENGTH 变量和维度。 shape=(SEQUENCE_LENGTH, 1) 将需要进行修改以适应来自数据集的特征的形状。

要测试模型和数据集是否快速兼容(没有所有其他机器),请确保以下内容不会引发错误:

model(next(iter(dataset))[0])