如何一次将自定义编码器应用于多个客户端?如何在 run_one_round 中使用自定义编码器?

how to apply custom encoders to multiple clients at once? how to use custom encoders in run_one_round?

所以我的目标基本上是实现global top-k subsampling. Gradient sparsification is quite simple and I have already done this building on stateful clients example, but now I would like to use encoders as you have recommended here at page 28。此外,我只想对非零梯度进行平均,比如我们有 10 个客户,但只有 4 个在给定位置的通信回合具有非零梯度,那么我想将这些梯度的总和除以 4,而不是 10。我希望通过在分母处对分子和掩码、1 和 0 处的梯度求和来实现这一点。此外,我将向渐变选择添加随机性,因此我必须在渐变选择的同时创建这些蒙版。我现在的代码是

import tensorflow as tf

from tensorflow_model_optimization.python.core.internal import tensor_encoding as te


@te.core.tf_style_adaptive_encoding_stage
class GrandienrSparsificationEncodingStage(te.core.AdaptiveEncodingStageInterface):
  """An example custom implementation of an `EncodingStageInterface`.
  Note: This is likely not what one would want to use in practice. Rather, this
  serves as an illustration of how a custom compression algorithm can be
  provided to `tff`.
  This encoding stage is expected to be run in an iterative manner, and
  alternatively zeroes out values corresponding to odd and even indices. Given
  the determinism of the non-zero indices selection, the encoded structure does
  not need to be represented as a sparse vector, but only the non-zero values
  are necessary. In the decode mehtod, the state (i.e., params derived from the
  state) is used to reconstruct the corresponding indices.
  Thus, this example encoding stage can realize representation saving of 2x.
  """

  ENCODED_VALUES_KEY = 'stateful_topk_values'
  INDICES_KEY = 'indices'
  SHAPES_KEY = 'shapes'
  ERROR_COMPENSATION_KEY = 'error_compensation'

  def encode(self, x, encode_params):
    shapes_list = [tf.shape(y) for y in x]
    flattened = tf.nest.map_structure(lambda y: tf.reshape(y, [-1]), x)
    gradients = tf.concat(flattened, axis=0)
    error_compensation = encode_params[self.ERROR_COMPENSATION_KEY]
    
    gradients_and_error_compensation = tf.math.add(gradients, error_compensation)

    percentage = tf.constant(0.1, dtype=tf.float32)
    k_float = tf.multiply(percentage, tf.cast(tf.size(gradients_and_error_compensation), tf.float32))
    k_int = tf.cast(tf.math.round(k_float), dtype=tf.int32)

    values, indices = tf.math.top_k(tf.math.abs(gradients_and_error_compensation), k = k_int, sorted = False)
    indices = tf.expand_dims(indices, 1)
    sparse_gradients_and_error_compensation = tf.scatter_nd(indices, values, tf.shape(gradients_and_error_compensation))

    new_error_compensation = tf.math.subtract(gradients_and_error_compensation, sparse_gradients_and_error_compensation)
    state_update_tensors = {self.ERROR_COMPENSATION_KEY: new_error_compensation}
    
    encoded_x = {self.ENCODED_VALUES_KEY: values,
                 self.INDICES_KEY: indices,
                 self.SHAPES_KEY: shapes_list}

    return encoded_x, state_update_tensors

  def decode(self,
             encoded_tensors,
             decode_params,
             num_summands=None,
             shape=None):
    del num_summands, decode_params, shape  # Unused.
    flat_shape = tf.math.reduce_sum([tf.math.reduce_prod(shape) for shape in encoded_tensors[self.SHAPES_KEY]])
    sizes_list = [tf.math.reduce_prod(shape) for shape in encoded_tensors[self.SHAPES_KEY]]
    scatter_tensor = tf.scatter_nd(
        indices=encoded_tensors[self.INDICES_KEY],
        updates=encoded_tensors[self.ENCODED_VALUES_KEY],
        shape=[flat_shape])
    nonzero_locations = tf.nest.map_structure(lambda x: tf.cast(tf.where(tf.math.greater(x, 0), 1, 0), tf.float32) , scatter_tensor)
    reshaped_tensor = [tf.reshape(flat_tensor, shape=shape) for flat_tensor, shape in
            zip(tf.split(scatter_tensor, sizes_list), encoded_tensors[self.SHAPES_KEY])]
    reshaped_nonzero = [tf.reshape(flat_tensor, shape=shape) for flat_tensor, shape in
            zip(tf.split(nonzero_locations, sizes_list), encoded_tensors[self.SHAPES_KEY])]
    return  reshaped_tensor, reshaped_nonzero


  def initial_state(self):
    return {self.ERROR_COMPENSATION_KEY: tf.constant(0, dtype=tf.float32)}

  def update_state(self, state, state_update_tensors):
    return {self.ERROR_COMPENSATION_KEY: state_update_tensors[self.ERROR_COMPENSATION_KEY]}

  def get_params(self, state):
    encode_params = {self.ERROR_COMPENSATION_KEY: state[self.ERROR_COMPENSATION_KEY]}
    decode_params = {}
    return encode_params, decode_params

  @property
  def name(self):
    return 'gradient_sparsification_encoding_stage'

  @property
  def compressible_tensors_keys(self):
    return False

  @property
  def commutes_with_sum(self):
    return False

  @property
  def decode_needs_input_shape(self):
    return False

  @property
  def state_update_aggregation_modes(self):
    return {}

我已经 运行 按照您列出的步骤手动进行了一些简单的测试 here at page 45。它有效,但我有一些 questions/problems.

  1. 当我使用相同形状的张量列表(例如:2 个 2x25 张量)作为编码的输入 x 时,它可以正常工作,但是当我尝试使用不同形状的张量列表(2x20 和 6x10)时它给出了错误的说法

InvalidArgumentError: Shapes of all inputs must match: values[0].shape = [2,20] != values1.shape = [6,10] [Op:Pack] name: packed

我该如何解决这个问题?正如我所说,我想使用全局 top-k,所以我必须一次对整个可训练模型权重进行编码。以cnn model used here为例,所有的张量都有不同的形状。

  1. 如何进行开头所述的平均?例如here你已经完成了

mean_factory = tff.aggregators.MeanFactory( tff.aggregators.EncodedSumFactory(mean_encoder_fn), # numerator tff.aggregators.EncodedSumFactory(mean_encoder_fn), # denominator )

有没有办法重复此操作,解码的一个输出为分子,另一个为分母?我如何处理 0 除以 0? tensorflow 具有 divide_no_nan 功能,我可以以某种方式使用它还是需要为每个功能添加 eps?

  1. 使用编码器时如何处理分区?每个客户端是否都有一个唯一的编码器来保存它的唯一状态?正如您所讨论的那样 here at page 6 客户端状态用于跨筒仓设置,但如果客户端顺序发生变化会怎样?

  2. you have recommended using stateful clients example。你能进一步解释一下吗?我的意思是在 run_one_round 中,编码器的确切位置以及它们如何 used/combined 客户端更新和聚合?

  3. 我有一些额外的信息,例如我想传递给编码的稀疏性。建议的方法是什么?

以下是一些答案,希望对您有所帮助:

  1. 如果您想将所有聚合结构视为单个张量,请使用 concat_factory 作为最外层聚合器。这会将整个结构连接到客户端的 rank-1 Tensor,然后在最后解压回原始结构。使用示例:tff.aggregators.concat_factory(tff.aggregators.MeanFactory(...))

请注意,编码阶段对象旨在使用单个张量,因此您使用相同张量描述的内容可能只是偶然起作用。

  1. 有两种选择。

    一个。修改客户端训练代码,使传递给加权聚合器的权重已经是您想要的 (zero/one 面具)。在有状态客户端示例中,您 link,即 here。然后您将默认获得所需的内容(通过对分子求和)。

    b。修改UnweightedMeanFactory to do exactly the variant of averaging you describe and use that. Start would be modifying this

  2. (和 4.) 我认为这就是您需要实现的。现有客户端状态在示例 here, you would need extend it to contain the aggregator states, and make sure those are sampled together with the clients, as done here. Then, to integrate the aggregators in the example you would need to replace this hard-coded tff.federated_mean. An example of such integration is in the implementation of tff.learning.build_federated_averaging_process, primarily here

    中的初始化方式相同
  3. 我不太确定问题是什么。也许得到以前的工作(对我来说似乎是先决条件),然后澄清并在新的 post?

    中提问