Tensorflow RNN:如何推断没有重复的序列?
Tensorflow RNN: how to infer a sequence without duplicates?
我正在使用 seq2seq RNN 生成给定种子标签的标签输出序列。在推理步骤中,我想生成仅包含唯一标签的序列(即跳过已经添加到输出序列的标签)。为此,我创建了一个采样器对象,它试图记住已添加到输出中的标签并将它们的 logit 值减少到 -np.inf
。
这是采样器代码:
class InferenceSampler(object):
def __init__(self, out_weights, out_biases):
self._out_weights = tf.transpose(out_weights)
self._out_biases = out_biases
self._n_tracks = out_weights.shape[0]
self.ids_mask = tf.zeros([self._n_tracks], name="playlist_mask")
def __call__(self, decoder_outputs):
_logits = tf.matmul(decoder_outputs, self._out_weights)
_logits = tf.nn.bias_add(_logits, self._out_biases)
# apply mask
_logits = _logits + self.ids_mask
_sample_ids = tf.cast(tf.argmax(_logits, axis=-1), tf.int32)
# update mask
step_ids_mask = tf.sparse_to_dense(_sample_ids, [self._n_tracks], -np.inf)
self.ids_mask = self.ids_mask + step_ids_mask
return _sample_ids
推理图的代码如下所示:
self._max_playlist_len = tf.placeholder(tf.int32, ())
self._start_tokens = tf.placeholder(tf.int32, [None])
sample_fn = InferenceSampler(out_weights, out_biases)
with tf.name_scope("inf_decoder"):
def _end_fn(sample_ids):
return tf.equal(sample_ids, PAD_ITEM_ID)
def _next_inputs_fn(sample_ids):
return tf.nn.embedding_lookup(
track_embs,
sample_ids
)
_start_inputs = tf.nn.embedding_lookup(
track_embs,
self._start_tokens
)
helper = tf.contrib.seq2seq.InferenceHelper(
sample_fn=sample_fn,
sample_shape=[],
sample_dtype=tf.int32,
start_inputs=_start_inputs,
end_fn=_end_fn,
next_inputs_fn=_next_inputs_fn
)
decoder = tf.contrib.seq2seq.BasicDecoder(
rnn_cell,
helper,
rnn_cell.zero_state(tf.shape(self._start_tokens)[0], tf.float32),
output_layer=projection_layer
)
outputs, _, _ = tf.contrib.seq2seq.dynamic_decode(
decoder,
maximum_iterations=self._max_playlist_len
)
self.playlists = outputs.sample_id
不幸的是,结果仍然有重复的标签。此外,当我尝试访问 sample_fn.ids_mask
时,我收到一条错误消息:ValueError: Operation 'inf_decoder/decoder/while/BasicDecoderStep/add_1' has been marked as not fetchable.
我做错了什么?创建这样的 sample_fn
有多合法?
为了克服这个问题,我更新了推理,在每个 RNN 步骤中我输出嵌入向量而不是 item_id
。推理完成后,我将嵌入转换为 item_ids
。
首先,该解决方案最大限度地减少了操作次数。其次,由于我使用了 LSTM/GRU 个单元格,因此它们最大限度地降低了在 RNN 推理的不同步骤中观察到两个绝对相似输出的概率。
新代码如下所示:
with tf.name_scope("inf_decoder"):
def _sample_fn(decoder_outputs):
return decoder_outputs
def _end_fn(sample_ids):
# infinite
return tf.tile([False], [n_seeds])
_start_inputs = tf.nn.embedding_lookup(
track_embs,
self._seed_items
)
helper = tf.contrib.seq2seq.InferenceHelper(
sample_fn=_sample_fn,
sample_shape=[self.emb_size],
sample_dtype=tf.float32,
start_inputs=_start_inputs,
end_fn=_end_fn,
)
decoder = tf.contrib.seq2seq.BasicDecoder(
rnn_cell,
helper,
rnn_cell.zero_state(n_seeds, tf.float32),
output_layer=projection_layer
)
outputs, _, _ = tf.contrib.seq2seq.dynamic_decode(
decoder,
maximum_iterations=self._max_playlist_len
)
flat_rnn_output = tf.reshape(outputs.rnn_output, [-1, self.emb_size])
flat_logits = tf.matmul(flat_rnn_output, out_weights, transpose_b=True)
flat_logits = tf.nn.bias_add(flat_logits, out_biases)
item_ids = tf.cast(tf.argmax(flat_logits, axis=-1), tf.int32)
playlists = tf.reshape(item_ids, [n_seeds, -1])
self.playlists = playlists
因此,经过一些调查,我找到了与此线程相关的所有问题的答案。主要问题是:为什么 InferenceSampler
中的 self.ids_mask
不更新?原因在 dynamic_decode
的内部。根据 Tensorflow 问题跟踪器中的 this answer:
... only tensors defined inside the loop will be evaluated
every loop iteration. All tensors defined outside a loop will be
evaluated exactly once.
在我的例子中,self.ids_mask
是在循环外指定的。这意味着我需要重写 dynamic_decode
才能得到我想要的。下面的代码是初始任务的一个稍微修改的版本,但它的作用几乎相同。
让我们从一个新的 dynamic_decode
开始,它应该创建和更新已经预测的掩码收集 sample_ids
。我删除了我没有修改的代码,遵循 initial_mask
和 mask
变量。
新 dynamic_decode
:
def dynamic_decode(decoder,
output_time_major=False,
impute_finished=False,
maximum_iterations=None,
parallel_iterations=32,
swap_memory=False,
scope=None):
...
initial_finished, initial_inputs, initial_mask, initial_state = decoder.initialize()
...
def body(time, outputs_ta, state, inputs, finished, sequence_lengths, mask):
"""Internal while_loop body.
Args:
time: scalar int32 tensor.
outputs_ta: structure of TensorArray.
state: (structure of) state tensors and TensorArrays.
inputs: (structure of) input tensors.
finished: bool tensor (keeping track of what's finished).
sequence_lengths: int32 tensor (keeping track of time of finish).
mask: SparseTensor to remove already predicted items
Returns:
`(time + 1, outputs_ta, next_state, next_inputs, next_finished,
next_sequence_lengths, next_mask)`.
```
"""
(next_outputs, decoder_state, next_inputs, next_mask,
decoder_finished) = decoder.step(time, inputs, state, mask)
...
nest.assert_same_structure(state, decoder_state)
nest.assert_same_structure(outputs_ta, next_outputs)
nest.assert_same_structure(inputs, next_inputs)
nest.assert_same_structure(mask, next_mask)
...
return (time + 1, outputs_ta, next_state, next_inputs, next_finished,
next_sequence_lengths, next_mask)
res = control_flow_ops.while_loop(
condition,
body,
loop_vars=[
initial_time, initial_outputs_ta, initial_state, initial_inputs,
initial_finished, initial_sequence_lengths, initial_mask,
],
parallel_iterations=parallel_iterations,
swap_memory=swap_memory)
...
return final_outputs, final_state, final_sequence_lengths
下一步应该将 mask
传递给 Decoder
和 Helper
。以下是 BasicDecoder
和 InferenceHelper
的更新版本:
MaskedDecoder
:
class MaskedDecoder(BasicDecoder):
def step(self, time, inputs, state, mask, name=None):
with ops.name_scope(name, "MaskedDecoderStep", (time, inputs, state, mask)):
cell_outputs, cell_state = self._cell(inputs, state)
if self._output_layer is not None:
cell_outputs = self._output_layer(cell_outputs)
sample_ids = self._helper.sample(
time=time,
outputs=cell_outputs,
state=cell_state,
mask=mask)
(finished, next_inputs, next_state, next_mask) = self._helper.next_inputs(
time=time,
outputs=cell_outputs,
state=cell_state,
mask=mask,
sample_ids=sample_ids)
outputs = BasicDecoderOutput(cell_outputs, sample_ids)
return (outputs, next_state, next_inputs, next_mask, finished)
MaskedInferenceHelper
:
class MaskedInferenceHelper(Helper):
"""A helper to use during inference with a custom sampling function."""
def __init__(self, norm_track_embs, features, start_sample_ids):
self._norm_track_embs = norm_track_embs
self._batch_size = tf.shape(start_sample_ids)[0]
self._n_tracks = tf.shape(norm_track_embs)[0]
self._start_sample_ids = start_sample_ids
self._sample_shape = tf.TensorShape([])
self._sample_dtype = tf.int32
self._features = features
def _get_sparse_mask(self, sample_ids):
_mask_shape = tf.convert_to_tensor([
tf.cast(self._batch_size, dtype=tf.int64),
tf.cast(self._n_tracks, dtype=tf.int64)
])
_st_rows = tf.range(0, self._batch_size)
_st_cols = sample_ids
_st_indices = tf.cast(tf.stack([_st_rows, _st_cols], axis=1), dtype=tf.int64)
_st_values = tf.fill([self._batch_size], np.inf)
return tf.SparseTensor(_st_indices, _st_values, _mask_shape)
...
def initialize(self, name=None):
finished = tf.tile([False], [self._batch_size])
start_embs = tf.nn.embedding_lookup(self._norm_track_embs, self._start_sample_ids)
start_inputs = tf.concat([start_embs, self._features], axis=1)
mask = self._get_sparse_mask(self._start_sample_ids)
return finished, start_inputs, mask
def sample(self, time, outputs, state, mask, name=None):
del time, state # unused by sample
outputs = tf.nn.l2_normalize(outputs, axis=-1)
cos_sims = tf.matmul(outputs, self._norm_track_embs, transpose_b=True)
cos_sims = cos_sims - tf.sparse_tensor_to_dense(mask)
sample_ids = tf.cast(tf.argmax(cos_sims, axis=-1), tf.int32)
return sample_ids
def next_inputs(self, time, outputs, state, sample_ids, mask, name=None):
del time, outputs # unused by next_inputs
finished = tf.tile([False], [self._batch_size])
next_embs = tf.nn.embedding_lookup(self._norm_track_embs, sample_ids)
next_inputs = tf.concat([next_embs, self._features], axis=1)
next_mask = tf.sparse_add(mask, self._get_sparse_mask(sample_ids))
return finished, next_inputs, state, next_mask
所以,现在我可以在不重复已经预测的项目的情况下生成推论。
我正在使用 seq2seq RNN 生成给定种子标签的标签输出序列。在推理步骤中,我想生成仅包含唯一标签的序列(即跳过已经添加到输出序列的标签)。为此,我创建了一个采样器对象,它试图记住已添加到输出中的标签并将它们的 logit 值减少到 -np.inf
。
这是采样器代码:
class InferenceSampler(object):
def __init__(self, out_weights, out_biases):
self._out_weights = tf.transpose(out_weights)
self._out_biases = out_biases
self._n_tracks = out_weights.shape[0]
self.ids_mask = tf.zeros([self._n_tracks], name="playlist_mask")
def __call__(self, decoder_outputs):
_logits = tf.matmul(decoder_outputs, self._out_weights)
_logits = tf.nn.bias_add(_logits, self._out_biases)
# apply mask
_logits = _logits + self.ids_mask
_sample_ids = tf.cast(tf.argmax(_logits, axis=-1), tf.int32)
# update mask
step_ids_mask = tf.sparse_to_dense(_sample_ids, [self._n_tracks], -np.inf)
self.ids_mask = self.ids_mask + step_ids_mask
return _sample_ids
推理图的代码如下所示:
self._max_playlist_len = tf.placeholder(tf.int32, ())
self._start_tokens = tf.placeholder(tf.int32, [None])
sample_fn = InferenceSampler(out_weights, out_biases)
with tf.name_scope("inf_decoder"):
def _end_fn(sample_ids):
return tf.equal(sample_ids, PAD_ITEM_ID)
def _next_inputs_fn(sample_ids):
return tf.nn.embedding_lookup(
track_embs,
sample_ids
)
_start_inputs = tf.nn.embedding_lookup(
track_embs,
self._start_tokens
)
helper = tf.contrib.seq2seq.InferenceHelper(
sample_fn=sample_fn,
sample_shape=[],
sample_dtype=tf.int32,
start_inputs=_start_inputs,
end_fn=_end_fn,
next_inputs_fn=_next_inputs_fn
)
decoder = tf.contrib.seq2seq.BasicDecoder(
rnn_cell,
helper,
rnn_cell.zero_state(tf.shape(self._start_tokens)[0], tf.float32),
output_layer=projection_layer
)
outputs, _, _ = tf.contrib.seq2seq.dynamic_decode(
decoder,
maximum_iterations=self._max_playlist_len
)
self.playlists = outputs.sample_id
不幸的是,结果仍然有重复的标签。此外,当我尝试访问 sample_fn.ids_mask
时,我收到一条错误消息:ValueError: Operation 'inf_decoder/decoder/while/BasicDecoderStep/add_1' has been marked as not fetchable.
我做错了什么?创建这样的 sample_fn
有多合法?
为了克服这个问题,我更新了推理,在每个 RNN 步骤中我输出嵌入向量而不是 item_id
。推理完成后,我将嵌入转换为 item_ids
。
首先,该解决方案最大限度地减少了操作次数。其次,由于我使用了 LSTM/GRU 个单元格,因此它们最大限度地降低了在 RNN 推理的不同步骤中观察到两个绝对相似输出的概率。
新代码如下所示:
with tf.name_scope("inf_decoder"):
def _sample_fn(decoder_outputs):
return decoder_outputs
def _end_fn(sample_ids):
# infinite
return tf.tile([False], [n_seeds])
_start_inputs = tf.nn.embedding_lookup(
track_embs,
self._seed_items
)
helper = tf.contrib.seq2seq.InferenceHelper(
sample_fn=_sample_fn,
sample_shape=[self.emb_size],
sample_dtype=tf.float32,
start_inputs=_start_inputs,
end_fn=_end_fn,
)
decoder = tf.contrib.seq2seq.BasicDecoder(
rnn_cell,
helper,
rnn_cell.zero_state(n_seeds, tf.float32),
output_layer=projection_layer
)
outputs, _, _ = tf.contrib.seq2seq.dynamic_decode(
decoder,
maximum_iterations=self._max_playlist_len
)
flat_rnn_output = tf.reshape(outputs.rnn_output, [-1, self.emb_size])
flat_logits = tf.matmul(flat_rnn_output, out_weights, transpose_b=True)
flat_logits = tf.nn.bias_add(flat_logits, out_biases)
item_ids = tf.cast(tf.argmax(flat_logits, axis=-1), tf.int32)
playlists = tf.reshape(item_ids, [n_seeds, -1])
self.playlists = playlists
因此,经过一些调查,我找到了与此线程相关的所有问题的答案。主要问题是:为什么 InferenceSampler
中的 self.ids_mask
不更新?原因在 dynamic_decode
的内部。根据 Tensorflow 问题跟踪器中的 this answer:
... only tensors defined inside the loop will be evaluated every loop iteration. All tensors defined outside a loop will be evaluated exactly once.
在我的例子中,self.ids_mask
是在循环外指定的。这意味着我需要重写 dynamic_decode
才能得到我想要的。下面的代码是初始任务的一个稍微修改的版本,但它的作用几乎相同。
让我们从一个新的 dynamic_decode
开始,它应该创建和更新已经预测的掩码收集 sample_ids
。我删除了我没有修改的代码,遵循 initial_mask
和 mask
变量。
新 dynamic_decode
:
def dynamic_decode(decoder,
output_time_major=False,
impute_finished=False,
maximum_iterations=None,
parallel_iterations=32,
swap_memory=False,
scope=None):
...
initial_finished, initial_inputs, initial_mask, initial_state = decoder.initialize()
...
def body(time, outputs_ta, state, inputs, finished, sequence_lengths, mask):
"""Internal while_loop body.
Args:
time: scalar int32 tensor.
outputs_ta: structure of TensorArray.
state: (structure of) state tensors and TensorArrays.
inputs: (structure of) input tensors.
finished: bool tensor (keeping track of what's finished).
sequence_lengths: int32 tensor (keeping track of time of finish).
mask: SparseTensor to remove already predicted items
Returns:
`(time + 1, outputs_ta, next_state, next_inputs, next_finished,
next_sequence_lengths, next_mask)`.
```
"""
(next_outputs, decoder_state, next_inputs, next_mask,
decoder_finished) = decoder.step(time, inputs, state, mask)
...
nest.assert_same_structure(state, decoder_state)
nest.assert_same_structure(outputs_ta, next_outputs)
nest.assert_same_structure(inputs, next_inputs)
nest.assert_same_structure(mask, next_mask)
...
return (time + 1, outputs_ta, next_state, next_inputs, next_finished,
next_sequence_lengths, next_mask)
res = control_flow_ops.while_loop(
condition,
body,
loop_vars=[
initial_time, initial_outputs_ta, initial_state, initial_inputs,
initial_finished, initial_sequence_lengths, initial_mask,
],
parallel_iterations=parallel_iterations,
swap_memory=swap_memory)
...
return final_outputs, final_state, final_sequence_lengths
下一步应该将 mask
传递给 Decoder
和 Helper
。以下是 BasicDecoder
和 InferenceHelper
的更新版本:
MaskedDecoder
:
class MaskedDecoder(BasicDecoder):
def step(self, time, inputs, state, mask, name=None):
with ops.name_scope(name, "MaskedDecoderStep", (time, inputs, state, mask)):
cell_outputs, cell_state = self._cell(inputs, state)
if self._output_layer is not None:
cell_outputs = self._output_layer(cell_outputs)
sample_ids = self._helper.sample(
time=time,
outputs=cell_outputs,
state=cell_state,
mask=mask)
(finished, next_inputs, next_state, next_mask) = self._helper.next_inputs(
time=time,
outputs=cell_outputs,
state=cell_state,
mask=mask,
sample_ids=sample_ids)
outputs = BasicDecoderOutput(cell_outputs, sample_ids)
return (outputs, next_state, next_inputs, next_mask, finished)
MaskedInferenceHelper
:
class MaskedInferenceHelper(Helper):
"""A helper to use during inference with a custom sampling function."""
def __init__(self, norm_track_embs, features, start_sample_ids):
self._norm_track_embs = norm_track_embs
self._batch_size = tf.shape(start_sample_ids)[0]
self._n_tracks = tf.shape(norm_track_embs)[0]
self._start_sample_ids = start_sample_ids
self._sample_shape = tf.TensorShape([])
self._sample_dtype = tf.int32
self._features = features
def _get_sparse_mask(self, sample_ids):
_mask_shape = tf.convert_to_tensor([
tf.cast(self._batch_size, dtype=tf.int64),
tf.cast(self._n_tracks, dtype=tf.int64)
])
_st_rows = tf.range(0, self._batch_size)
_st_cols = sample_ids
_st_indices = tf.cast(tf.stack([_st_rows, _st_cols], axis=1), dtype=tf.int64)
_st_values = tf.fill([self._batch_size], np.inf)
return tf.SparseTensor(_st_indices, _st_values, _mask_shape)
...
def initialize(self, name=None):
finished = tf.tile([False], [self._batch_size])
start_embs = tf.nn.embedding_lookup(self._norm_track_embs, self._start_sample_ids)
start_inputs = tf.concat([start_embs, self._features], axis=1)
mask = self._get_sparse_mask(self._start_sample_ids)
return finished, start_inputs, mask
def sample(self, time, outputs, state, mask, name=None):
del time, state # unused by sample
outputs = tf.nn.l2_normalize(outputs, axis=-1)
cos_sims = tf.matmul(outputs, self._norm_track_embs, transpose_b=True)
cos_sims = cos_sims - tf.sparse_tensor_to_dense(mask)
sample_ids = tf.cast(tf.argmax(cos_sims, axis=-1), tf.int32)
return sample_ids
def next_inputs(self, time, outputs, state, sample_ids, mask, name=None):
del time, outputs # unused by next_inputs
finished = tf.tile([False], [self._batch_size])
next_embs = tf.nn.embedding_lookup(self._norm_track_embs, sample_ids)
next_inputs = tf.concat([next_embs, self._features], axis=1)
next_mask = tf.sparse_add(mask, self._get_sparse_mask(sample_ids))
return finished, next_inputs, state, next_mask
所以,现在我可以在不重复已经预测的项目的情况下生成推论。