tensorflow federated 梯度更新的定制聚合算法
Customized aggregation algorithm for gradient updates in tensorflow federated
我一直在努力实现这个paper。基本上我想做的是总结每个客户的损失,并将其与前一个时期进行比较。然后对于模型的每个构成层,比较服务器和客户端模型的权重之间的 KL 散度,以获得特定于层的参数更新,然后进行 softmax 并决定是否需要自适应更新或正常的 FedAvg 方法。
算法如下——
FedMed
我尝试使用代码 here to build a custom federated avg process. I got the basic understanding that there are some tf.computations and some tff.computations which are involved. I get that I need to make changes in the orchestration logic in the run_one_round function and basically manipulate the client outputs to do adaptive averaging instead of the vanilla federated averaging. The client_update tf.computation 函数基本上 returns 我需要的所有值,即 weights_delta(可用于基于客户端的模型权重),model_output(可用于计算损失)。
但我不确定我应该在哪里进行更改。
@tff.federated_computation(federated_server_state_type,
federated_dataset_type)
def run_one_round(server_state, federated_dataset):
server_message = tff.federated_map(server_message_fn, server_state)
server_message_at_client = tff.federated_broadcast(server_message)
client_outputs = tff.federated_map(
client_update_fn, (federated_dataset, server_message_at_client))
weight_denom = client_outputs.client_weight
# todo
# instead of using tff.federated_mean I wish to do a adaptive aggregation based on the client_outputs.weights_delta and server_state model
round_model_delta = tff.federated_mean(
client_outputs.weights_delta, weight=weight_denom)
#client_outputs.weights_delta has all the client model weights.
#client_outputs.client_weight has the number of examples per client.
#client_outputs.model_output has the output of the model per client example.
我想利用 server_state 对象的服务器模型权重。
我想计算服务器模型权重与每层每个客户端模型之间的 KL 散度。然后使用相对权重来聚合客户端权重,而不是香草联合平均。
而不是使用 tff.federated_mean 我希望使用一种不同的策略,基本上是一种基于上述算法的自适应策略。
所以我需要一些关于如何实施它的建议。
基本上我想做的是:
1)将客户损失的所有值相加。
2)计算所有clients with server的每layerbasis的KL divergence,然后决定是使用adaptive optimization还是FedAvg。
还有一种方法可以将此值作为 python 值进行操作,这将有助于调试目的(我尝试使用 tf.print 但这也没有帮助)。谢谢!
最简单的选项:计算客户端均值的权重
如果我正确阅读了上面的算法,我们只需要计算一些即时的平均权重。 tff.federated_mean
接受一个可选的 CLIENTS-placed weight
参数,所以这里最简单的选择可能是计算所需的客户端权重并将它们传递给平均值。
这看起来像(假设下面使用的变量的适当定义,我们将对此进行评论):
@tff.federated_computation(...)
def round_function(...):
...
# We assume there is a tff.Computation training_fn that performs training,
# and we're calling it here on the correct arguments
trained_clients = tff.federated_map(training_fn, clients_placed_arguments)
# Next we assume there is a variable in-scope server_model,
# representing the 'current global model'.
global_model_at_clients = tff.federated_broadcast(server_model)
# Here we assume a function compute_kl_divergence, which takes
# two structures of tensors and computes the KL divergence
# (as a scalar) between them. The two arguments here are clients-placed,
# so the result will be as well.
kl_div_at_clients = tff.federated_map(compute_kl_divergence,
(global_model_at_clients, trained_clients))
# Perhaps we wish to not use raw KL divergence as the weight, but rather
# some function thereof; if so, we map a postprocessing function to
# the computed divergences. The result will still be clients-placed.
mean_weight = tff.federated_map(postprocess_divergence, kl_div_at_clients)
# Now we simply use the computed weights in the mean.
return tff.federated_mean(trained_clients, weight=mean_weight)
更灵活的工具:tff.federated_reduce
TFF 通常鼓励算法开发人员实现他们所能实现的任何东西 'in the aggregation',因此公开了一些高度可定制的原语,例如 tff.federated_reduce
,它允许您 运行 任意 TensorFlow“在流”在客户端和服务器之间。如果上述对所需算法的阅读不正确并且需要更多涉及的东西,或者您希望灵活地尝试完全不同的聚合概念(TFF 鼓励并旨在支持),这可能是您的选择。
在 TFF 的启发式打字语言中,tff.federated_reduce
具有签名:
<{T}@CLIENTS, U, (<U, T> -> U)> -> U@SERVER
意思是,federated_reduce
取一个放在客户端的 T
类型的值,一个 U
类型的归约代数中的一个 'zero',以及一个接受一个U
和一个 T
并产生一个 U
,并在客户端和服务器之间应用此函数 'in the stream',产生一个放置在服务器上的 U
。函数 (<U, T> -> U)
将应用于部分累加值 U
和流中的 'next' 元素 T
(但请注意,TFF 不保证这些值的顺序) ,返回另一个部分累加的值 U
。 'zero' 应该代表 'partially accumulated' 在您的应用程序中的空集上的任何含义;这将是减少的起点。
这个问题的应用
组件
您的缩减函数需要访问两部分数据:全局模型状态和给定客户端的训练结果。这很好地映射到 T
类型。在这个应用程序中,我们将有类似的东西:
T = <server_model=server_model_type, trained_model=trained_model_type>
这两种类型很可能相同,但不一定相同。
您的归约函数将接受部分聚合、您的服务器模型和您的客户端训练模型,返回一个新的部分聚合。在这里,我们将开始假设算法的读数与上述相同,即具有特定权重的加权平均值。通常,计算平均值的最简单方法是保留两个累加器,一个用于分子,一个用于分母。这会影响下面zero
和归约函数的选择
您的 zero
应该包含一个张量结构,其值为 0 映射到您的模型的权重——这将是分子。如果你有像 tff.federated_sum
这样的聚合(因为 TFF 知道零应该是什么),这将为你生成,但对于这种情况,你必须自己动手处理这样的张量。 tf.nest.map_structure
and tf.zeros_like
.
应该不会太难
对于分母,我们假设我们只需要一个标量。 TFF 和 TF 比这灵活得多——如果需要,您可以保留每层或每参数的分母——但为了简单起见,我们假设我们最后只想除以一个浮点数。
因此我们的类型 U
将类似于:
U = <numerator=server_model_type, denominator=tf.float32>
终于到了我们的归约函数。它或多或少是上面相同部分的不同组成;我们将在这里对它们做出稍微严格的假设(特别是,所有本地函数都是 tff.tf_computations
——一个技术假设,可以说是 TFF 上的一个错误)。我们的缩减功能将沿着这条线(假设适当的类型别名):
@tff.tf_computation(U, T)
def reduction(partial_accumulate, next_element):
kl_div = compute_kl_divergence(
next_element.server_model, next_element.trained_model)
weight = postprocess_divergence(kl_div)
new_numerator = partial_accumulate.numerator + weight * next_element.trained_model
new_denominator = partial_accumulate.denominator + weight
return collections.OrderedDict(
numerator=new_numerator, denominator=new_denominator)
将它们放在一起
一轮的基本轮廓将与上述类似;但是我们已经进行了更多的计算 'in the stream',因此客户端的计算量会减少。我们在这里假设相同的变量定义。
@tff.federated_computation(...)
def round_function(...):
...
trained_clients = tff.federated_map(training_fn, clients_placed_arguments)
global_model_at_clients = tff.federated_broadcast(server_model)
# This zip I believe is not necessary, but it helps my mental model.
reduction_arg = tff.federated_zip(
collections.OrderedDict(server_model=global_model_at_clients,
trained_model=trained_clients))
# We assume a zero as specified above
return tff.federated_reduce(reduction_arg,
zero,
reduction)
我一直在努力实现这个paper。基本上我想做的是总结每个客户的损失,并将其与前一个时期进行比较。然后对于模型的每个构成层,比较服务器和客户端模型的权重之间的 KL 散度,以获得特定于层的参数更新,然后进行 softmax 并决定是否需要自适应更新或正常的 FedAvg 方法。
算法如下—— FedMed
我尝试使用代码 here to build a custom federated avg process. I got the basic understanding that there are some tf.computations and some tff.computations which are involved. I get that I need to make changes in the orchestration logic in the run_one_round function and basically manipulate the client outputs to do adaptive averaging instead of the vanilla federated averaging. The client_update tf.computation 函数基本上 returns 我需要的所有值,即 weights_delta(可用于基于客户端的模型权重),model_output(可用于计算损失)。
但我不确定我应该在哪里进行更改。
@tff.federated_computation(federated_server_state_type,
federated_dataset_type)
def run_one_round(server_state, federated_dataset):
server_message = tff.federated_map(server_message_fn, server_state)
server_message_at_client = tff.federated_broadcast(server_message)
client_outputs = tff.federated_map(
client_update_fn, (federated_dataset, server_message_at_client))
weight_denom = client_outputs.client_weight
# todo
# instead of using tff.federated_mean I wish to do a adaptive aggregation based on the client_outputs.weights_delta and server_state model
round_model_delta = tff.federated_mean(
client_outputs.weights_delta, weight=weight_denom)
#client_outputs.weights_delta has all the client model weights.
#client_outputs.client_weight has the number of examples per client.
#client_outputs.model_output has the output of the model per client example.
我想利用 server_state 对象的服务器模型权重。
我想计算服务器模型权重与每层每个客户端模型之间的 KL 散度。然后使用相对权重来聚合客户端权重,而不是香草联合平均。
而不是使用 tff.federated_mean 我希望使用一种不同的策略,基本上是一种基于上述算法的自适应策略。
所以我需要一些关于如何实施它的建议。
基本上我想做的是:
1)将客户损失的所有值相加。
2)计算所有clients with server的每layerbasis的KL divergence,然后决定是使用adaptive optimization还是FedAvg。
还有一种方法可以将此值作为 python 值进行操作,这将有助于调试目的(我尝试使用 tf.print 但这也没有帮助)。谢谢!
最简单的选项:计算客户端均值的权重
如果我正确阅读了上面的算法,我们只需要计算一些即时的平均权重。 tff.federated_mean
接受一个可选的 CLIENTS-placed weight
参数,所以这里最简单的选择可能是计算所需的客户端权重并将它们传递给平均值。
这看起来像(假设下面使用的变量的适当定义,我们将对此进行评论):
@tff.federated_computation(...)
def round_function(...):
...
# We assume there is a tff.Computation training_fn that performs training,
# and we're calling it here on the correct arguments
trained_clients = tff.federated_map(training_fn, clients_placed_arguments)
# Next we assume there is a variable in-scope server_model,
# representing the 'current global model'.
global_model_at_clients = tff.federated_broadcast(server_model)
# Here we assume a function compute_kl_divergence, which takes
# two structures of tensors and computes the KL divergence
# (as a scalar) between them. The two arguments here are clients-placed,
# so the result will be as well.
kl_div_at_clients = tff.federated_map(compute_kl_divergence,
(global_model_at_clients, trained_clients))
# Perhaps we wish to not use raw KL divergence as the weight, but rather
# some function thereof; if so, we map a postprocessing function to
# the computed divergences. The result will still be clients-placed.
mean_weight = tff.federated_map(postprocess_divergence, kl_div_at_clients)
# Now we simply use the computed weights in the mean.
return tff.federated_mean(trained_clients, weight=mean_weight)
更灵活的工具:tff.federated_reduce
TFF 通常鼓励算法开发人员实现他们所能实现的任何东西 'in the aggregation',因此公开了一些高度可定制的原语,例如 tff.federated_reduce
,它允许您 运行 任意 TensorFlow“在流”在客户端和服务器之间。如果上述对所需算法的阅读不正确并且需要更多涉及的东西,或者您希望灵活地尝试完全不同的聚合概念(TFF 鼓励并旨在支持),这可能是您的选择。
在 TFF 的启发式打字语言中,tff.federated_reduce
具有签名:
<{T}@CLIENTS, U, (<U, T> -> U)> -> U@SERVER
意思是,federated_reduce
取一个放在客户端的 T
类型的值,一个 U
类型的归约代数中的一个 'zero',以及一个接受一个U
和一个 T
并产生一个 U
,并在客户端和服务器之间应用此函数 'in the stream',产生一个放置在服务器上的 U
。函数 (<U, T> -> U)
将应用于部分累加值 U
和流中的 'next' 元素 T
(但请注意,TFF 不保证这些值的顺序) ,返回另一个部分累加的值 U
。 'zero' 应该代表 'partially accumulated' 在您的应用程序中的空集上的任何含义;这将是减少的起点。
这个问题的应用
组件
您的缩减函数需要访问两部分数据:全局模型状态和给定客户端的训练结果。这很好地映射到 T
类型。在这个应用程序中,我们将有类似的东西:
T = <server_model=server_model_type, trained_model=trained_model_type>
这两种类型很可能相同,但不一定相同。
您的归约函数将接受部分聚合、您的服务器模型和您的客户端训练模型,返回一个新的部分聚合。在这里,我们将开始假设算法的读数与上述相同,即具有特定权重的加权平均值。通常,计算平均值的最简单方法是保留两个累加器,一个用于分子,一个用于分母。这会影响下面zero
和归约函数的选择
您的 zero
应该包含一个张量结构,其值为 0 映射到您的模型的权重——这将是分子。如果你有像 tff.federated_sum
这样的聚合(因为 TFF 知道零应该是什么),这将为你生成,但对于这种情况,你必须自己动手处理这样的张量。 tf.nest.map_structure
and tf.zeros_like
.
对于分母,我们假设我们只需要一个标量。 TFF 和 TF 比这灵活得多——如果需要,您可以保留每层或每参数的分母——但为了简单起见,我们假设我们最后只想除以一个浮点数。
因此我们的类型 U
将类似于:
U = <numerator=server_model_type, denominator=tf.float32>
终于到了我们的归约函数。它或多或少是上面相同部分的不同组成;我们将在这里对它们做出稍微严格的假设(特别是,所有本地函数都是 tff.tf_computations
——一个技术假设,可以说是 TFF 上的一个错误)。我们的缩减功能将沿着这条线(假设适当的类型别名):
@tff.tf_computation(U, T)
def reduction(partial_accumulate, next_element):
kl_div = compute_kl_divergence(
next_element.server_model, next_element.trained_model)
weight = postprocess_divergence(kl_div)
new_numerator = partial_accumulate.numerator + weight * next_element.trained_model
new_denominator = partial_accumulate.denominator + weight
return collections.OrderedDict(
numerator=new_numerator, denominator=new_denominator)
将它们放在一起
一轮的基本轮廓将与上述类似;但是我们已经进行了更多的计算 'in the stream',因此客户端的计算量会减少。我们在这里假设相同的变量定义。
@tff.federated_computation(...)
def round_function(...):
...
trained_clients = tff.federated_map(training_fn, clients_placed_arguments)
global_model_at_clients = tff.federated_broadcast(server_model)
# This zip I believe is not necessary, but it helps my mental model.
reduction_arg = tff.federated_zip(
collections.OrderedDict(server_model=global_model_at_clients,
trained_model=trained_clients))
# We assume a zero as specified above
return tff.federated_reduce(reduction_arg,
zero,
reduction)