ValueError: Layer "sequential" expects 1 input(s), but it received 10 input tensors
ValueError: Layer "sequential" expects 1 input(s), but it received 10 input tensors
我正在按照 TFF 教程构建我的 FL 模型
我的数据包含在不同的 CSV 文件中,这些文件被视为不同的客户端。
按照此 tutorial,构建 Keras 模型函数如下
@tf.function
def create_tf_dataset_for_client_fn(dataset_path):
return tf.data.experimental.CsvDataset(dataset_path,
record_defaults=record_defaults,
header=True)
@tf.function
def add_parsing(dataset):
def parse_dataset(*x):
return OrderedDict([('y', x[-1]), ('x', x[1:-1])])
return dataset.map(parse_dataset, num_parallel_calls=tf.data.AUTOTUNE)
source = tff.simulation.datasets.FilePerUserClientData(
dataset_paths, create_tf_dataset_for_client_fn)
client_ids = sorted(source.client_ids)
# Make sure the client ids are tensor strings when splitting data.
source._client_ids = [tf.cast(c, tf.string) for c in source.client_ids]
source = source.preprocess(add_parsing)
train, test = source.train_test_client_split(source, 1)
train_client_ids = train.client_ids
train_data = train.create_tf_dataset_for_client(train_client_ids[0])
def create_keras_model():
initializer = tf.keras.initializers.GlorotNormal(seed=0)
return tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(32,)),
tf.keras.layers.Dense(10, kernel_initializer=initializer),
tf.keras.layers.Softmax(),
])
def model_fn():
keras_model = create_keras_model()
return tff.learning.from_keras_model(
keras_model,
input_spec=train_data.element_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
然后我按照说明和 运行 其他 @tff.tf_computation
功能作为教程,如 def server_init()
、def initialize_fn()
、def client_update()
和 def server_update()
.但是当我 运行 def client_update_fn()
我得到这个错误
ValueError: in user code:
File "<ipython-input-14-cada45ffae0f>", line 12, in client_update *
for batch in dataset:
File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 455, in forward_pass *
return self._forward_pass(batch_input, training=training)
File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 408, in _forward_pass *
predictions = self.predict_on_batch(inputs, training)
File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 398, in predict_on_batch *
return self._keras_model(x, training=training)
File "/usr/local/lib/python3.7/dist-packages/keras/engine/base_layer_v1.py", line 740, in __call__ **
self.name)
File "/usr/local/lib/python3.7/dist-packages/keras/engine/input_spec.py", line 200, in assert_input_compatibility
raise ValueError(f'Layer "{layer_name}" expects {len(input_spec)} input(s),'
ValueError: Layer "sequential" expects 1 input(s), but it received 10 input tensors. Inputs received: [<tf.Tensor 'x:0' shape=() dtype=int32>, <tf.Tensor 'x_1:0' shape=() dtype=int32>, <tf.Tensor 'x_2:0' shape=() dtype=int32>, <tf.Tensor 'x_3:0' shape=() dtype=float32>, <tf.Tensor 'x_4:0' shape=() dtype=float32>, <tf.Tensor 'x_5:0' shape=() dtype=float32>, <tf.Tensor 'x_6:0' shape=() dtype=float32>, <tf.Tensor 'x_7:0' shape=() dtype=float32>, <tf.Tensor 'x_8:0' shape=() dtype=float32>, <tf.Tensor 'x_9:0' shape=() dtype=int32>]
备注:
- 每个 CSV 文件有 10 列作为特征(输入)和一列作为标签(输出)。
- 我添加了
shape=(32,)
任意,我真的不知道每列中的数据是什么形状?
所以,问题是,如何将数据提供给keras模型并克服这个错误
提前致谢
几个问题:您的数据有十个独立的特征,这意味着您的模型实际上需要 10 个独立的输入。但是,您也可以将特征堆叠到张量中,然后使用形状为 (10,)
的单个输入。这是一个工作示例,但请注意它使用了虚拟数据,因此在现实中可能没有多大意义。
创建虚拟数据:
import tensorflow as tf
import tensorflow_federated as tff
import pandas as pd
from collections import OrderedDict
import nest_asyncio
nest_asyncio.apply()
# Dummy data
samples = 5
data = [[tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist()]]
df = pd.DataFrame(data)
df = df.explode(list(df.columns))
df.to_csv('client1.csv', index= False)
df.to_csv('client2.csv', index= False)
加载并处理数据集:
import tensorflow as tf
record_defaults = [int(), int(), int(), int(), float(),float(),float(),float(),float(),float(), int(), int()]
@tf.function
def create_tf_dataset_for_client_fn(dataset_path):
return tf.data.experimental.CsvDataset(dataset_path,
record_defaults=record_defaults,
header=True)
@tf.function
def add_parsing(dataset):
def parse_dataset(*x):
return OrderedDict([('y', x[-1]), ('x', x[1:-1])])
return dataset.map(parse_dataset, num_parallel_calls=tf.data.AUTOTUNE)
dataset_paths = {'client1': '/content/client1.csv', 'client2': '/content/client2.csv'}
source = tff.simulation.datasets.FilePerUserClientData(
dataset_paths, create_tf_dataset_for_client_fn)
client_ids = sorted(source.client_ids)
# Make sure the client ids are tensor strings when splitting data.
source._client_ids = [tf.cast(c, tf.string) for c in source.client_ids]
source = source.preprocess(add_parsing)
train, test = source.train_test_client_split(source, 1)
train_client_ids = train.client_ids
def reshape_data(d):
d['x'] = tf.stack([tf.cast(x, dtype=tf.float32) for x in d['x']])
return d
train_data = [train.create_tf_dataset_for_client(c).map(reshape_data).batch(1) for c in train_client_ids]
创建并运行模型:
def create_keras_model():
initializer = tf.keras.initializers.GlorotNormal(seed=0)
return tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(10,)),
tf.keras.layers.Dense(75, kernel_initializer=initializer),
tf.keras.layers.Dense(50, kernel_initializer=initializer),
tf.keras.layers.Softmax(),
])
def model_fn():
keras_model = create_keras_model()
return tff.learning.from_keras_model(
keras_model,
input_spec=train_data[0].element_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
def initialize_fn():
model = model_fn()
return model.trainable_variables
@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
"""Performs training (using the server model weights) on the client's dataset."""
client_weights = model.trainable_variables
tf.nest.map_structure(lambda x, y: x.assign(y),
client_weights, server_weights)
for batch in dataset:
with tf.GradientTape() as tape:
outputs = model.forward_pass(batch)
grads = tape.gradient(outputs.loss, client_weights)
grads_and_vars = zip(grads, client_weights)
client_optimizer.apply_gradients(grads_and_vars)
return client_weights
@tf.function
def server_update(model, mean_client_weights):
"""Updates the server model weights as the average of the client model weights."""
model_weights = model.trainable_variables
tf.nest.map_structure(lambda x, y: x.assign(y),
model_weights, mean_client_weights)
return model_weights
federated_float_on_clients = tff.FederatedType(tf.float32, tff.CLIENTS)
@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(client_temperatures):
return tff.federated_mean(client_temperatures)
str(get_average_temperature.type_signature)
get_average_temperature([68.5, 70.3, 69.8])
@tff.tf_computation
def server_init():
model = model_fn()
return model.trainable_variables
@tff.federated_computation
def initialize_fn():
return tff.federated_value(server_init(), tff.SERVER)
whimsy_model = model_fn()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)
model_weights_type = server_init.type_signature.result
@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
model = model_fn()
client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
return client_update(model, tf_dataset, server_weights, client_optimizer)
@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
model = model_fn()
return server_update(model, mean_client_weights)
federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)
@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
server_weights_at_client = tff.federated_broadcast(server_weights)
client_weights = tff.federated_map(
client_update_fn, (federated_dataset, server_weights_at_client))
mean_client_weights = tff.federated_mean(client_weights)
server_weights = tff.federated_map(server_update_fn, mean_client_weights)
return server_weights
federated_algorithm = tff.templates.IterativeProcess(
initialize_fn=initialize_fn,
next_fn=next_fn
)
server_state = federated_algorithm.initialize()
for round in range(15):
server_state = federated_algorithm.next(server_state, train_data)
关于模型中的这一行:tf.keras.layers.Dense(50, kernel_initializer=initializer)
,我使用了 50 个输出节点,因为我创建了可以在 0 到 49 之间变化的虚拟标签。这在使用 SparseCategoricalCrossentropy
损失时是必要的函数。
我正在按照 TFF 教程构建我的 FL 模型 我的数据包含在不同的 CSV 文件中,这些文件被视为不同的客户端。 按照此 tutorial,构建 Keras 模型函数如下
@tf.function
def create_tf_dataset_for_client_fn(dataset_path):
return tf.data.experimental.CsvDataset(dataset_path,
record_defaults=record_defaults,
header=True)
@tf.function
def add_parsing(dataset):
def parse_dataset(*x):
return OrderedDict([('y', x[-1]), ('x', x[1:-1])])
return dataset.map(parse_dataset, num_parallel_calls=tf.data.AUTOTUNE)
source = tff.simulation.datasets.FilePerUserClientData(
dataset_paths, create_tf_dataset_for_client_fn)
client_ids = sorted(source.client_ids)
# Make sure the client ids are tensor strings when splitting data.
source._client_ids = [tf.cast(c, tf.string) for c in source.client_ids]
source = source.preprocess(add_parsing)
train, test = source.train_test_client_split(source, 1)
train_client_ids = train.client_ids
train_data = train.create_tf_dataset_for_client(train_client_ids[0])
def create_keras_model():
initializer = tf.keras.initializers.GlorotNormal(seed=0)
return tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(32,)),
tf.keras.layers.Dense(10, kernel_initializer=initializer),
tf.keras.layers.Softmax(),
])
def model_fn():
keras_model = create_keras_model()
return tff.learning.from_keras_model(
keras_model,
input_spec=train_data.element_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
然后我按照说明和 运行 其他 @tff.tf_computation
功能作为教程,如 def server_init()
、def initialize_fn()
、def client_update()
和 def server_update()
.但是当我 运行 def client_update_fn()
我得到这个错误
ValueError: in user code:
File "<ipython-input-14-cada45ffae0f>", line 12, in client_update *
for batch in dataset:
File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 455, in forward_pass *
return self._forward_pass(batch_input, training=training)
File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 408, in _forward_pass *
predictions = self.predict_on_batch(inputs, training)
File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 398, in predict_on_batch *
return self._keras_model(x, training=training)
File "/usr/local/lib/python3.7/dist-packages/keras/engine/base_layer_v1.py", line 740, in __call__ **
self.name)
File "/usr/local/lib/python3.7/dist-packages/keras/engine/input_spec.py", line 200, in assert_input_compatibility
raise ValueError(f'Layer "{layer_name}" expects {len(input_spec)} input(s),'
ValueError: Layer "sequential" expects 1 input(s), but it received 10 input tensors. Inputs received: [<tf.Tensor 'x:0' shape=() dtype=int32>, <tf.Tensor 'x_1:0' shape=() dtype=int32>, <tf.Tensor 'x_2:0' shape=() dtype=int32>, <tf.Tensor 'x_3:0' shape=() dtype=float32>, <tf.Tensor 'x_4:0' shape=() dtype=float32>, <tf.Tensor 'x_5:0' shape=() dtype=float32>, <tf.Tensor 'x_6:0' shape=() dtype=float32>, <tf.Tensor 'x_7:0' shape=() dtype=float32>, <tf.Tensor 'x_8:0' shape=() dtype=float32>, <tf.Tensor 'x_9:0' shape=() dtype=int32>]
备注:
- 每个 CSV 文件有 10 列作为特征(输入)和一列作为标签(输出)。
- 我添加了
shape=(32,)
任意,我真的不知道每列中的数据是什么形状?
所以,问题是,如何将数据提供给keras模型并克服这个错误
提前致谢
几个问题:您的数据有十个独立的特征,这意味着您的模型实际上需要 10 个独立的输入。但是,您也可以将特征堆叠到张量中,然后使用形状为 (10,)
的单个输入。这是一个工作示例,但请注意它使用了虚拟数据,因此在现实中可能没有多大意义。
创建虚拟数据:
import tensorflow as tf
import tensorflow_federated as tff
import pandas as pd
from collections import OrderedDict
import nest_asyncio
nest_asyncio.apply()
# Dummy data
samples = 5
data = [[tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist()]]
df = pd.DataFrame(data)
df = df.explode(list(df.columns))
df.to_csv('client1.csv', index= False)
df.to_csv('client2.csv', index= False)
加载并处理数据集:
import tensorflow as tf
record_defaults = [int(), int(), int(), int(), float(),float(),float(),float(),float(),float(), int(), int()]
@tf.function
def create_tf_dataset_for_client_fn(dataset_path):
return tf.data.experimental.CsvDataset(dataset_path,
record_defaults=record_defaults,
header=True)
@tf.function
def add_parsing(dataset):
def parse_dataset(*x):
return OrderedDict([('y', x[-1]), ('x', x[1:-1])])
return dataset.map(parse_dataset, num_parallel_calls=tf.data.AUTOTUNE)
dataset_paths = {'client1': '/content/client1.csv', 'client2': '/content/client2.csv'}
source = tff.simulation.datasets.FilePerUserClientData(
dataset_paths, create_tf_dataset_for_client_fn)
client_ids = sorted(source.client_ids)
# Make sure the client ids are tensor strings when splitting data.
source._client_ids = [tf.cast(c, tf.string) for c in source.client_ids]
source = source.preprocess(add_parsing)
train, test = source.train_test_client_split(source, 1)
train_client_ids = train.client_ids
def reshape_data(d):
d['x'] = tf.stack([tf.cast(x, dtype=tf.float32) for x in d['x']])
return d
train_data = [train.create_tf_dataset_for_client(c).map(reshape_data).batch(1) for c in train_client_ids]
创建并运行模型:
def create_keras_model():
initializer = tf.keras.initializers.GlorotNormal(seed=0)
return tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(10,)),
tf.keras.layers.Dense(75, kernel_initializer=initializer),
tf.keras.layers.Dense(50, kernel_initializer=initializer),
tf.keras.layers.Softmax(),
])
def model_fn():
keras_model = create_keras_model()
return tff.learning.from_keras_model(
keras_model,
input_spec=train_data[0].element_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
def initialize_fn():
model = model_fn()
return model.trainable_variables
@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
"""Performs training (using the server model weights) on the client's dataset."""
client_weights = model.trainable_variables
tf.nest.map_structure(lambda x, y: x.assign(y),
client_weights, server_weights)
for batch in dataset:
with tf.GradientTape() as tape:
outputs = model.forward_pass(batch)
grads = tape.gradient(outputs.loss, client_weights)
grads_and_vars = zip(grads, client_weights)
client_optimizer.apply_gradients(grads_and_vars)
return client_weights
@tf.function
def server_update(model, mean_client_weights):
"""Updates the server model weights as the average of the client model weights."""
model_weights = model.trainable_variables
tf.nest.map_structure(lambda x, y: x.assign(y),
model_weights, mean_client_weights)
return model_weights
federated_float_on_clients = tff.FederatedType(tf.float32, tff.CLIENTS)
@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(client_temperatures):
return tff.federated_mean(client_temperatures)
str(get_average_temperature.type_signature)
get_average_temperature([68.5, 70.3, 69.8])
@tff.tf_computation
def server_init():
model = model_fn()
return model.trainable_variables
@tff.federated_computation
def initialize_fn():
return tff.federated_value(server_init(), tff.SERVER)
whimsy_model = model_fn()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)
model_weights_type = server_init.type_signature.result
@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
model = model_fn()
client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
return client_update(model, tf_dataset, server_weights, client_optimizer)
@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
model = model_fn()
return server_update(model, mean_client_weights)
federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)
@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
server_weights_at_client = tff.federated_broadcast(server_weights)
client_weights = tff.federated_map(
client_update_fn, (federated_dataset, server_weights_at_client))
mean_client_weights = tff.federated_mean(client_weights)
server_weights = tff.federated_map(server_update_fn, mean_client_weights)
return server_weights
federated_algorithm = tff.templates.IterativeProcess(
initialize_fn=initialize_fn,
next_fn=next_fn
)
server_state = federated_algorithm.initialize()
for round in range(15):
server_state = federated_algorithm.next(server_state, train_data)
关于模型中的这一行:tf.keras.layers.Dense(50, kernel_initializer=initializer)
,我使用了 50 个输出节点,因为我创建了可以在 0 到 49 之间变化的虚拟标签。这在使用 SparseCategoricalCrossentropy
损失时是必要的函数。