在 TensorFlow 中计算克罗内克积的最有效方法是什么?
What is the most efficient way to compute a Kronecker Product in TensorFlow?
我有兴趣在 TensorFlow 中的 Kronecker 循环单元上实现 this paper。
这涉及克罗内克积的计算。 TensorFlow 没有针对 Kronecker Products 的操作。我正在寻找一种有效且可靠的方法来计算它。
这是否存在,或者我是否需要手动定义 TensorFlow 操作?
这是我为此使用的实用程序。有关用法示例,请参见 kronecker_test
def fix_shape(tf_shape):
return tuple(int(dim) for dim in tf_shape)
def concat_blocks(blocks, validate_dims=True):
"""Takes 2d grid of blocks representing matrices and concatenates to single
matrix (aka ArrayFlatten)"""
if validate_dims:
col_dims = np.array([[int(b.shape[1]) for b in row] for row in blocks])
col_sums = col_dims.sum(1)
assert (col_sums[0] == col_sums).all()
row_dims = np.array([[int(b.shape[0]) for b in row] for row in blocks])
row_sums = row_dims.sum(0)
assert (row_sums[0] == row_sums).all()
block_rows = [tf.concat(row, axis=1) for row in blocks]
return tf.concat(block_rows, axis=0)
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
from tensorflow.python.framework import ops
original_shape_func = ops.set_shapes_for_outputs
def disable_shape_inference():
ops.set_shapes_for_outputs = lambda _: _
def enable_shape_inference():
ops.set_shapes_for_outputs = original_shape_func
def kronecker(A, B, do_shape_inference=True):
"""Kronecker product of A,B.
turn_off_shape_inference: if True, makes 10x10 kron go 2.4 sec -> 0.9 sec
"""
Arows, Acols = fix_shape(A.shape)
Brows, Bcols = fix_shape(B.shape)
Crows, Ccols = Arows*Brows, Acols*Bcols
temp = tf.reshape(A, [-1, 1, 1])*tf.expand_dims(B, 0)
Bshape = tf.constant((Brows, Bcols))
# turn off shape inference
if not do_shape_inference:
disable_shape_inference()
# [1, n, m] => [n, m]
slices = [tf.reshape(s, Bshape) for s in tf.split(temp, Crows)]
# import pdb; pdb.set_trace()
grid = list(chunks(slices, Acols))
assert len(grid) == Arows
result = concat_blocks(grid, validate_dims=do_shape_inference)
if not do_shape_inference:
enable_shape_inference()
result.set_shape((Arows*Brows, Acols*Bcols))
return result
def kronecker_test():
A0 = [[1,2],[3,4]]
B0 = [[6,7],[8,9]]
A = tf.constant(A0)
B = tf.constant(B0)
C = kronecker(A, B)
sess = tf.Session()
C0 = sess.run(C)
Ct = [[6, 7, 12, 14], [8, 9, 16, 18], [18, 21, 24, 28], [24, 27, 32, 36]]
Cnp = np.kron(A0, B0)
check_equal(C0, Ct)
check_equal(C0, Cnp)
如果您阅读 conv2d_transpose
and see what Kronecker product 计算的数学定义,您会发现 conv2d_tranpose
的适当步幅大小(第二个矩阵的宽度、高度),它的作用相同东西。
此外,您甚至可以使用 conv2d_transpose
开箱即用地批处理 Kronecker 的产品。
这是您计算维基矩阵克罗内克积的示例。
import tensorflow as tf
a = [[1, 2], [3, 4]]
b = [[0, 5], [6, 7]]
i, k, s = len(a), len(b), len(b)
o = s * (i - 1) + k
a_tf = tf.reshape(tf.constant(a, dtype=tf.float32), [1, i, i, 1])
b_tf = tf.reshape(tf.constant(b, dtype=tf.float32), [k, k, 1, 1])
res = tf.squeeze(tf.nn.conv2d_transpose(a_tf, b_tf, (1, o, o, 1), [1, s, s, 1], "VALID"))
with tf.Session() as sess:
print sess.run(res)
请注意,在非方阵的情况下,您需要在以下行中计算更多维度:
i, k, s = len(a), len(b), len(b)
o = s * (i - 1) + k
并正确使用它们作为你的 strides/outputs 参数。
尝试以下解决方案,看看它是否适合您:
def tf_kron(a,b):
a_shape = [a.shape[0].value,a.shape[1].value]
b_shape = [b.shape[0].value,b.shape[1].value]
return tf.reshape(tf.reshape(a,[a_shape[0],1,a_shape[1],1])*tf.reshape(b,[1,b_shape[0],1,b_shape[1]]),[a_shape[0]*b_shape[0],a_shape[1]*b_shape[1]])
TensorFlow 1.7+ 在 tf.contrib.kfac.utils.kronecker_product
:
中提供函数 kronecker_product
a = tf.eye(3)
b = tf.constant([[1., 2.], [3., 4.]])
kron = tf.contrib.kfac.utils.kronecker_product(a, b)
tf.Session().run(kron)
输出:
array([[1., 2., 0., 0., 0., 0.],
[3., 4., 0., 0., 0., 0.],
[0., 0., 1., 2., 0., 0.],
[0., 0., 3., 4., 0., 0.],
[0., 0., 0., 0., 1., 2.],
[0., 0., 0., 0., 3., 4.]], dtype=float32)
这样的事情怎么样:
def kron(x, y):
"""Computes the Kronecker product of two matrices.
Args:
x: A matrix (or batch thereof) of size m x n.
y: A matrix (or batch thereof) of size p x q.
Returns:
z: Kronecker product of matrices x and y of size mp x nq
"""
with tf.name_scope('kron'):
x = tf.convert_to_tensor(x, dtype_hint=tf.float32)
y = tf.convert_to_tensor(y, dtype_hint=x.dtype)
def _maybe_expand(x):
xs = tf.pad(
tf.shape(x),
paddings=[[tf.maximum(2 - tf.rank(x), 0), 0]],
constant_values=1)
x = tf.reshape(x, xs)
_, mx, nx = tf.split(xs, num_or_size_splits=[-1, 1, 1])
return x, mx, nx
x, mx, nx = _maybe_expand(x)
y, my, ny = _maybe_expand(y)
x = x[..., :, tf.newaxis, :, tf.newaxis]
y = y[..., tf.newaxis, :, tf.newaxis, :]
z = x * y
bz = tf.shape(z)[:-4]
z = tf.reshape(z, tf.concat([bz, mx * my, nx * ny], axis=0))
return z
这个解决方案:
- 支持批处理
- 支持广播
- 在 xla 工作
- 清楚地显示了 numpy 广播和 kronecker 产品之间的关系。
我有兴趣在 TensorFlow 中的 Kronecker 循环单元上实现 this paper。
这涉及克罗内克积的计算。 TensorFlow 没有针对 Kronecker Products 的操作。我正在寻找一种有效且可靠的方法来计算它。
这是否存在,或者我是否需要手动定义 TensorFlow 操作?
这是我为此使用的实用程序。有关用法示例,请参见 kronecker_test
def fix_shape(tf_shape):
return tuple(int(dim) for dim in tf_shape)
def concat_blocks(blocks, validate_dims=True):
"""Takes 2d grid of blocks representing matrices and concatenates to single
matrix (aka ArrayFlatten)"""
if validate_dims:
col_dims = np.array([[int(b.shape[1]) for b in row] for row in blocks])
col_sums = col_dims.sum(1)
assert (col_sums[0] == col_sums).all()
row_dims = np.array([[int(b.shape[0]) for b in row] for row in blocks])
row_sums = row_dims.sum(0)
assert (row_sums[0] == row_sums).all()
block_rows = [tf.concat(row, axis=1) for row in blocks]
return tf.concat(block_rows, axis=0)
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
from tensorflow.python.framework import ops
original_shape_func = ops.set_shapes_for_outputs
def disable_shape_inference():
ops.set_shapes_for_outputs = lambda _: _
def enable_shape_inference():
ops.set_shapes_for_outputs = original_shape_func
def kronecker(A, B, do_shape_inference=True):
"""Kronecker product of A,B.
turn_off_shape_inference: if True, makes 10x10 kron go 2.4 sec -> 0.9 sec
"""
Arows, Acols = fix_shape(A.shape)
Brows, Bcols = fix_shape(B.shape)
Crows, Ccols = Arows*Brows, Acols*Bcols
temp = tf.reshape(A, [-1, 1, 1])*tf.expand_dims(B, 0)
Bshape = tf.constant((Brows, Bcols))
# turn off shape inference
if not do_shape_inference:
disable_shape_inference()
# [1, n, m] => [n, m]
slices = [tf.reshape(s, Bshape) for s in tf.split(temp, Crows)]
# import pdb; pdb.set_trace()
grid = list(chunks(slices, Acols))
assert len(grid) == Arows
result = concat_blocks(grid, validate_dims=do_shape_inference)
if not do_shape_inference:
enable_shape_inference()
result.set_shape((Arows*Brows, Acols*Bcols))
return result
def kronecker_test():
A0 = [[1,2],[3,4]]
B0 = [[6,7],[8,9]]
A = tf.constant(A0)
B = tf.constant(B0)
C = kronecker(A, B)
sess = tf.Session()
C0 = sess.run(C)
Ct = [[6, 7, 12, 14], [8, 9, 16, 18], [18, 21, 24, 28], [24, 27, 32, 36]]
Cnp = np.kron(A0, B0)
check_equal(C0, Ct)
check_equal(C0, Cnp)
如果您阅读 conv2d_transpose
and see what Kronecker product 计算的数学定义,您会发现 conv2d_tranpose
的适当步幅大小(第二个矩阵的宽度、高度),它的作用相同东西。
此外,您甚至可以使用 conv2d_transpose
开箱即用地批处理 Kronecker 的产品。
这是您计算维基矩阵克罗内克积的示例。
import tensorflow as tf
a = [[1, 2], [3, 4]]
b = [[0, 5], [6, 7]]
i, k, s = len(a), len(b), len(b)
o = s * (i - 1) + k
a_tf = tf.reshape(tf.constant(a, dtype=tf.float32), [1, i, i, 1])
b_tf = tf.reshape(tf.constant(b, dtype=tf.float32), [k, k, 1, 1])
res = tf.squeeze(tf.nn.conv2d_transpose(a_tf, b_tf, (1, o, o, 1), [1, s, s, 1], "VALID"))
with tf.Session() as sess:
print sess.run(res)
请注意,在非方阵的情况下,您需要在以下行中计算更多维度:
i, k, s = len(a), len(b), len(b)
o = s * (i - 1) + k
并正确使用它们作为你的 strides/outputs 参数。
尝试以下解决方案,看看它是否适合您:
def tf_kron(a,b):
a_shape = [a.shape[0].value,a.shape[1].value]
b_shape = [b.shape[0].value,b.shape[1].value]
return tf.reshape(tf.reshape(a,[a_shape[0],1,a_shape[1],1])*tf.reshape(b,[1,b_shape[0],1,b_shape[1]]),[a_shape[0]*b_shape[0],a_shape[1]*b_shape[1]])
TensorFlow 1.7+ 在 tf.contrib.kfac.utils.kronecker_product
:
kronecker_product
a = tf.eye(3)
b = tf.constant([[1., 2.], [3., 4.]])
kron = tf.contrib.kfac.utils.kronecker_product(a, b)
tf.Session().run(kron)
输出:
array([[1., 2., 0., 0., 0., 0.],
[3., 4., 0., 0., 0., 0.],
[0., 0., 1., 2., 0., 0.],
[0., 0., 3., 4., 0., 0.],
[0., 0., 0., 0., 1., 2.],
[0., 0., 0., 0., 3., 4.]], dtype=float32)
这样的事情怎么样:
def kron(x, y):
"""Computes the Kronecker product of two matrices.
Args:
x: A matrix (or batch thereof) of size m x n.
y: A matrix (or batch thereof) of size p x q.
Returns:
z: Kronecker product of matrices x and y of size mp x nq
"""
with tf.name_scope('kron'):
x = tf.convert_to_tensor(x, dtype_hint=tf.float32)
y = tf.convert_to_tensor(y, dtype_hint=x.dtype)
def _maybe_expand(x):
xs = tf.pad(
tf.shape(x),
paddings=[[tf.maximum(2 - tf.rank(x), 0), 0]],
constant_values=1)
x = tf.reshape(x, xs)
_, mx, nx = tf.split(xs, num_or_size_splits=[-1, 1, 1])
return x, mx, nx
x, mx, nx = _maybe_expand(x)
y, my, ny = _maybe_expand(y)
x = x[..., :, tf.newaxis, :, tf.newaxis]
y = y[..., tf.newaxis, :, tf.newaxis, :]
z = x * y
bz = tf.shape(z)[:-4]
z = tf.reshape(z, tf.concat([bz, mx * my, nx * ny], axis=0))
return z
这个解决方案:
- 支持批处理
- 支持广播
- 在 xla 工作
- 清楚地显示了 numpy 广播和 kronecker 产品之间的关系。