如何创建使用少数客户端权重的 FL 算法?

How do I create an FL algorithm that uses the weights of a few clients?

基于此link我正在尝试编写一种新的FL算法。我训练所有的client,把所有client的模型参数传给server,server在聚合过程中只会对所有client的30%的模型参数进行加权平均。作为选择30%客户模型参数的标准,我想用30%客户的weights_delta和较少的loss_sum客户做一个加权平均。

下面的代码是这个link的修改代码。

@tf.function
def client_update(model, dataset, server_message, client_optimizer):

model_weights = model.weights
initial_weights = server_message.model_weights
tff.utils.assign(model_weights, initial_weights)

num_examples = tf.constant(0, dtype=tf.int32)
loss_sum = tf.constant(0, dtype=tf.float32)

for batch in iter(dataset):
    with tf.GradientTape() as tape:
        outputs = model.forward_pass(batch)
    grads = tape.gradient(outputs.loss, model_weights.trainable)
    grads_and_vars = zip(grads, model_weights.trainable)
    client_optimizer.apply_gradients(grads_and_vars)
    batch_size = tf.shape(batch['x'])[0]
    num_examples += batch_size
    loss_sum += outputs.loss * tf.cast(batch_size, tf.float32)        

weights_delta = tf.nest.map_structure(lambda a, b: a - b,
                                      model_weights.trainable,
                                      initial_weights.trainable)
client_weight = tf.cast(num_examples, tf.float32)

client_loss = loss_sum #add

return ClientOutput(weights_delta, client_weight, loss_sum / client_weight,client_loss) 

client_output

中有以下属性
weights_delta = attr.ib()
client_weight = attr.ib()
model_output = attr.ib()
client_loss = attr.ib() 

之后,我通过 collected_output = tff.federated_collect(client_output)round_model_delta = tff.federated_map(selecting_fn,(collected_output,weight_denom))here .

   @tff.federated_computation(federated_server_state_type,
                           federated_dataset_type)

    def run_one_round(server_state, federated_dataset):
    
    server_message = tff.federated_map(server_message_fn, server_state)
    server_message_at_client = tff.federated_broadcast(server_message)

    client_outputs = tff.federated_map(
        client_update_fn, (federated_dataset, server_message_at_client))

    weight_denom = client_outputs.client_weight

    collected_output = tff.federated_collect(client_outputs)  # add        
    
    round_model_delta = tff.federated_map(selecting_fn,(collected_output,weight_denom)) #add       

    server_state = tff.federated_map(server_update_fn,(server_state, round_model_delta))

    round_loss_metric = tff.federated_mean(client_outputs.model_output, weight=weight_denom)

    return server_state, round_loss_metric

另外,添加如下代码here实现selecting_fn功能

@tff.tf_computation()  # append
def selecting_fn(collected_output,weight_denom):
    #TODO
    return round_model_delta

我不确定按照上面的方式写代码是否正确。 我尝试了各种方法,但主要是 TypeError: The value to be mapped must be a FederatedType or implicitly convertible to a FederatedType (got a <<model_weights=<trainable=<float32[5,5,1,32],float32[32] ,float32[5,5,32,64],float32[64],float32[3136,512],float32[512],float32[512,10],float32[10]>,non_trainable=<>>,optimizer_state= <int64>,round_num=int32>@SERVER,{<float32[5,5,1,32],float32[32],float32[5,5,32,64],float32[64],float32[3136,512 ],float32[512],float32[512,10],float32[10]>}@CLIENTS>) 我遇到了这个错误。

我想知道序列类型collected_output是如何访问每个客户端的client_loss(= loss_sum)并对它们进行排序的,也想知道在应用weight_denom计算加权平均值时使用什么方法。

我看到的一个问题是,在调用 tff.federated_map(selecting_fn,(collected_output,weight_denom) 中,collected_output 将被放置在 tff.SERVERweight_denom 将被放置在 tff.CLIENTS,这样就不行了。我想你想先把所有东西都送到 tff.SERVER

我不确定您需要什么行为,但这里有一个示例代码,您可以从中着手开发。它从一个客户端值(比如它的 ID)开始,对一个随机值进行采样,将(ID,值)对收集到服务器,然后选择具有最大值的对 - 似乎与您描述的相似。

@tff.tf_computation()
def client_sample_fn():
  return tf.random.uniform((1,))

# Type annotation optional for tff.tf_computation. Added here for clarity.
idx_sample_type = tff.to_type(((tf.int32, (1,)), (tf.float32, (1,))))
@tff.tf_computation(tff.SequenceType(idx_sample_type))
def select_fn(idx_sample_dataset):  # Inside, this is a tf.data.Dataset.
  # Concatenate all pairs in the dataset.
  concat_fn = lambda a, b: tf.concat([a, b], axis=0)
  reduce_fn = lambda x, y: (concat_fn(x[0], y[0]), concat_fn(x[1], y[1]))
  reduce_zero = (tf.constant((), dtype=tf.int32, shape=(0,)),
                 tf.constant((), dtype=tf.float32, shape=(0,)))
  idx_tensor, sample_tensor = idx_sample_dataset.reduce(reduce_zero, reduce_fn)
  # Find 3 largest samples.
  top_3_val, top_3_idx = tf.math.top_k(sample_tensor, k=3)
  return tf.gather(idx_tensor, top_3_idx), top_3_val

@tff.federated_computation(tff.type_at_clients((tf.int32, (1,))))
def fed_fn(client_idx):
  client_sample = tff.federated_eval(client_sample_fn, tff.CLIENTS)
  # First zip, to have a dataset of pairs, rather than a pair of datasets.
  client_idx_sample_pair = tff.federated_zip((client_idx, client_sample))
  collected_idx_sample_pair = tff.federated_collect(client_idx_sample_pair)
  return tff.federated_map(select_fn, collected_idx_sample_pair)

client_idx = [(10,), (11,), (12,), (13,), (14,)]
fed_fn(client_idx)

带有示例输出:

(array([11, 10, 14], dtype=int32), array([0.8220736 , 0.81413555, 0.6984291 ], dtype=float32))