按数字顺序对字节数组进行排序

Sorting byte arrays in numeric order

我有一个使用 byte[] 作为键的永久映射。通过采用任意比较器的迭代器进行访问。由于字节数组表示带符号的数值(整数或十进制),我想对 byte[] 进行数字排序,这样 1 后面的数字是 2,而不是 11.

该数组是使用 UTF8 从数字的字符串表示形式编码而来的。

我一直在寻找可靠的线程安全代码来执行此操作,类似于 Guava 比较器 SignedBytes.lexicographicalComparator,但用于数字排序。

某处有这样的东西吗?

如果没有,是否有快速可靠的方法来做到这一点?

为了使用例更清楚一些:我正在使用 Leveldb 的 Java 端口。 Leveldb 将键和值存储为 byte[]。它支持对键的范围查询,但默认比较器是词典顺序的,因此当值表示数字时,它会产生较差的范围查询结果。

我宁愿不转换回 String 然后再转换为 Int(或 Double),因为比较方法将被调用数十亿次。

在我看来,您只想使用排序的地图。 Java 有几个(ConcurrentSkipListMap and TreeMap 是最大的两个)。这个问题似乎有点含糊,特别是关于线程安全点(你有人能够通过迭代更新并发写入,读写等吗?)。如果你只需要同时写入地图,我不会把我的时间浪费在像 ConcurrentSkipListMap 或像 java.util.Collections.synchronizedMap(java.util.Map) 这样的包装器上,而只会在你需要的地方做一个同步块写入的方法。

如果你写了很多并且只是偶尔需要获取一个迭代器,尝试使用一个常规的 HashMap 然后你有的方法 returns 迭代器可以简单地创建一个新的 SortedMap,使用比较器,并将数据推送到新地图中,但这很昂贵,因此只有在您真的不经常读取,没有那么多数据并且可以等待读取操作的情况下才会这样做。即便如此,这也不是一个非常昂贵的行动。

在我看来,您真的想要一个可以执行此操作的 Comparable? Comparable 决定了 11 还是 2 先出现。因为它是一个字节数组,所以你会有这样的东西(请注意我没有测试过这个,我假设你使用的是大端):

class NumberComparator implements Comparator<byte[]> {
    /*
     * (non-Javadoc)
     * 
     * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
     */
    @Override
    public int compare(byte[] o1, byte[] o2) {
        if (o1 == null) {
            if (o2 == null)
                return 0;
            return -1;
        }

        if (o2 == null)
            return 1;

        // I'm going to cheat and assume that if the lengths aren't the same, you didn't pad... but really, the
        // lengths should always be the same because you shouldn't allow pushing doubles and ints
        if (o1.length != o2.length) {
            return o1.length - o2.length;
        }
        if (o1.length == 0)
            return 0;

        // For the sake of things, I'm assuming you've taken care of endianness and that we're using big-endian
        // We're an int (note that you can make the size of an int a constant)
        if (o1.length == Integer.SIZE >> 3) {
            int o1Integer = 0;
            int o2Integer = 0;

            int shift = 0;
            for (int i = 0; i < o1.length; i++) {
                o1Integer |= ((o1[i] & 0xFF) << shift);
                o2Integer |= ((o2[i] & 0xFF) << shift);
                shift += 0x8;
            }

            return Integer.compare(o1Integer, o2Integer);
        }
        // We're a double (note that you can make the size of a double a constant)
        else if (o1.length == Double.SIZE >> 3) {
            long o1Bits = 0L;
            long o2Bits = 0L;

            int shift = 0;
            for (int i = 0; i < o1.length; i++) {
                o1Bits |= ((o1[i] & 0xFFL) << shift);
                o2Bits |= ((o2[i] & 0xFFL) << shift);
            }

            return Double.compare(Double.longBitsToDouble(o1Bits), Double.longBitsToDouble(o2Bits));
        }

        // Who knows what we are...but again, we're assuming big-endian
        final boolean o1Neg = ((o1[0] & 0x80) == 0) ? false : true;
        final boolean o2Neg = ((o2[0] & 0x80) == 0) ? false : true;
        // o1 is negative and o2 is positive
        if (o1Neg && !o2Neg)
            return -1;
        // o1 is positive and o2 is negative
        if (!o1Neg && o2Neg)
            return 1;

        // o1 is positive and o2 is positive
        if (!o1Neg && !o2Neg)
            for (int pos = 0; pos < o1.length; pos++) {
                int comp = (o1[pos] & 0xFF) - (o2[pos] & 0xFF);
                if (comp != 0)
                    return comp;
            }
        // TODO I leave handling if both are negatives to the reader

        // Everything was the same!  We are equal :-)
        return 0;
    }
}

编辑

好的,经过一点点的澄清,你的比较器应该更像是:

class NumberStringComparator implements Comparator<byte[]> {
    private static final Pattern leftZeroPadding = Pattern.compile("^[\-\+]?0+");
    private static final Pattern rightZeroPadding=Pattern.compile("0+$");

    /*
     * (non-Javadoc)
     * 
     * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
     */
    @Override
    public int compare(byte[] o1, byte[] o2) {
        if (o1 == null) {
            if (o2 == null)
                return 0;
            return -1;
        }

        if (o2 == null)
            return 1;

        String o1String = new String(o1, "UTF-8").trim();
        String o2String = new String(o2, "UTF-8").trim();

        final boolean o1Neg = o1String.charAt(0) == '-';
        final boolean o2Neg = o2String.charAt(0) == '-';
        // o1 is negative and o2 is positive
        if (o1Neg && !o2Neg)
            return -1;
        // o1 is positive and o2 is negative
        if (!o1Neg && o2Neg)
            return 1;

        String o1WithoutZeroPadding = leftZeroPadding.matcher(o1String).replaceAll("");
        String o2WithoutZeroPadding = leftZeroPadding.matcher(o2String).replaceAll("");
        // We're the same thing
        if (o1WithoutZeroPadding.equals(o2WithoutZeroPadding))
            return 0;

        int o1Dec = o1WithoutZeroPadding.indexOf('.');
        int o2Dec = o2WithoutZeroPadding.indexOf('.');

        final int o1LeftLength;
        final int o2LeftLength;
        final String o1Left;
        final String o2Left;
        final String o1Right;
        final String o2Right;

        if (o1Dec == -1) {
            o1LeftLength = o1WithoutZeroPadding.length();
            o1Left = o1WithoutZeroPadding;
            o1Right = "";
        } else {
            o1LeftLength = o1Dec;
            if (o1LeftLength == 0)
                o1Left = "";
            else
                o1Left = o1WithoutZeroPadding.substring(0, o1Dec);
            if (o1Dec + 1 == o1LeftLength) {
                o1Right = "";
            } else {
                o1Right = rightZeroPadding.matcher(o1WithoutZeroPadding.substring(o1Dec + 1)).replaceAll("");
            }
        }
        if (o2Dec == -1) {
            o2LeftLength = o2WithoutZeroPadding.length();
            o2Left = o2WithoutZeroPadding;
            o2Right = "";
        } else {
            o2LeftLength = o2Dec;
            if (o2LeftLength == 0)
                o2Left = "";
            else
                o2Left = o2WithoutZeroPadding.substring(0, o2Dec);
            if (o2Dec + 1 == o2LeftLength) {
                o2Right = "";
            } else {
                o2Right = rightZeroPadding.matcher(o2WithoutZeroPadding.substring(o2Dec + 1)).replaceAll("");
            }
        }

        // If o1 is shorter than o2 (left of the decimal)...
        if (o1LeftLength < o2LeftLength) {
            // and we're negative numbers...
            if (o1Neg)
                // Than o1 is closer to 0
                return 1;
            // Than o1 is farther from 0
            return -1;
        }

        // If o2 is shorter than o1 (left of the decimal)...
        if (o1LeftLength > o2LeftLength) {
            // and we're negative numbers...
            if (o2Neg)
                // Than o2 is closer to 0
                return -1;
            // Than o2 is farther from 0
            return -1;
        }

        // Left of the decimal is the same length...
        // March through the left
        char o1Char;
        char o2Char;
        for (int pos = 0; pos < o1LeftLength; pos++) {
            o1Char = o1Left.charAt(pos);
            o2Char = o2Left.charAt(pos);
            if (o1Char != o2Char) {
                // Lower digits in o1Char make this negative, higher make it positive...
                return o1Char - o2Char;
            }
        }

        // Okay... everything was the same to the left... check the right
        int rightLength = Math.min(o1Right.length(), o2Right.length());
        for (int pos = 0; pos < rightLength; pos++) {
            o1Char = o1Right.charAt(pos);
            o2Char = o2Right.charAt(pos);
            if (o1Char != o2Char) {
                int multiplier = 1;
                if (o1Neg)
                    multiplier = -1;
                // Lower digits in o1Char make this negative, higher make it positive...
                return (o1Char - o2Char) * multiplier;
            }
        }

        // Everything was the same... now it comes down to this... if o1's right side is bigger, it is the bigger of
        // the two numbers
        int multiplier = 1;
        if (o1Neg)
            multiplier = -1;
        return (o1Right.length() - o2Right.length()) * multiplier;
    }
}

我相信这是一个正确的解决方案,即使没有经过特别优化。最明显的优化是避免数组复制。

/**
 *  Compares byte arrays numerically. The byte arrays must contain the  
 *  UTF-8 encodings of the string representations of numeric values. 
 *  The values can be either integers or decimals.
 *  To get the numbers in the right format, you could create as:
 *  "466".getBytes(StandardCharsets.UTF_8);
 */
class NumericByteArrayComparator implements Comparator<byte[]> {

    static final byte ZERO = (byte) '0';
    static final byte POINT = (byte) '.';
    static final byte DASH = (byte) '-';


    @Override
    public int compare(byte[] leftArray, byte[] rightArray) {
        Numeric left = new Numeric(leftArray);
        Numeric right = new Numeric(rightArray);

        if (notSameSign(left, right)) {
            if (left.isNegative()) {
                return -1;
            }
            return 1;
        }
        // If we haven't returned we know they are the same sign
        if (!left.isDecimal() && !right.isDecimal()) {
            // the values are both whole numbers
            return compareIntegers(left, right);
        }
        // The whole numbers have been dealt with
        // Now we have left two numbers with the same sign. At least one is a decimal
        // Repeat the comparison above with the whole number portion

        if (left.decimalOffset != right.decimalOffset) {
            if (left.isNegative()) { // the values are both negative
                // the shorter is the greater
                return right.decimalOffset - left.decimalOffset;
            } // the values are both positive. The longer is the greater
            return left.decimalOffset - right.decimalOffset;
        }
        // the remaining whole number portions have the same length, compare byte-by-byte
        byte[] leftWhole = Arrays.copyOf(left.bytes, left.decimalOffset);
        byte[] rightWhole = Arrays.copyOf(right.bytes, right.decimalOffset);

        int result;
        if (left.isPositive()) {
            result = compareToSameLength(leftWhole, rightWhole);
        } else {
            result = compareToSameLength(rightWhole, leftWhole);
        }
        if (result != 0) {
            return result;
        }

        // Now we have nothing left to compare but the decimal portions
        if (left.isPositive()) {
            result = compareTo(left.decimalPortion(), right.decimalPortion());
        } else {
            result = compareTo(right.decimalPortion(), left.decimalPortion());
        }
        return result;
    }

    /**
     * Compares the given values, assumed to represent whole numbers
     */
    private int compareIntegers(Numeric left, Numeric right) {
        if (left.length() != right.length()) {
            if (left.isNegative()) { // the values are both negative
                // the shorter is the greater when numbers are negative
                return right.length() - left.length();
            } else {
                // the shorter is the lesser when numbers are positive
                return left.length() - right.length();
            }
        } else {
            // they are the same length & sign and both whole numbers. Compare byte by byte
            int result;
            if (left.isPositive()) {
                result = compareToSameLength(left.bytes, right.bytes);
            } else {
                result = compareToSameLength(right.bytes, left.bytes);
            }
            return result;
        }
    }

    /**
     * Compares byte by byte
     */
    private int compareTo(byte[] left, byte[] right) {
        int minLength = Math.min(left.length, right.length);
        for (int i = 0; i < minLength; i++) {
            int result = NumericByteArrayComparator.compare(left[i], right[i]);
            if (result != 0) {
                return result;
            }
        }
        return left.length - right.length;
    }

    /**
     * Compares byte-by-byte when both of the given arrays are the same length
     */
    private int compareToSameLength(byte[] left, byte[] right) {
        for (int i = 0; i < left.length; i++) {
            int result = NumericByteArrayComparator.compare(left[i], right[i]);
            if (result != 0) {
                return result;
            }
        }
        return 0;
    }

    /**
     * Compares the two specified {@code byte} values. The sign of the value
     * returned is the same as that of {@code ((Byte) a).compareTo(b)}.
     *
     * @param a the first {@code byte} to compare
     * @param b the second {@code byte} to compare
     * @return a negative value if {@code a} is less than {@code b}; a positive
     * value if {@code a} is greater than {@code b}; or zero if they are equal
     */
    public static int compare(byte a, byte b) {
        return a - b; // safe due to restricted range
    }

    /**
     * Returns true if the arguments have different signs (positive or negative)
     */
    private boolean notSameSign(Numeric p1, Numeric p2) {
        return p1.negative != p2.negative;
    }

    static class Numeric {
        final boolean negative;
        int decimalOffset;
        byte[] bytes;

        Numeric(byte[] n) {

            if (n.length == 0) {
                negative = true;
                decimalOffset = 0;
                bytes = n;
            } else {
                negative = (DASH == n[0]);
                bytes = trimSignAndLeadingZeros(n);
                decimalOffset = indexOf(POINT, bytes);
                if (decimalOffset == -1) {
                    decimalOffset = length();
                } else {  // trim trailing zeros only if this is a decimal number
                    bytes = trimTrailingZeros(bytes);
                }
            }
        }

        boolean isDecimal() {
            return decimalOffset < length();
        }

        int length() {
            return bytes.length;
        }

        /**
         * Returns true if this alphanumeric looks like a negative number
         */
        boolean isNegative() {
            return negative;
        }

        /**
         * Returns true if this alphanumeric looks like a positive number
         */
        boolean isPositive() {
            return !negative;
        }

        /** 
         * Returns the index of the given byte in the given byte array
         */
        int indexOf(byte ch, byte[] bytes) {
            final int max = bytes.length;
            int fromIndex = 0;

            for (int i = fromIndex; i < max; i++) {
                if (bytes[i] == ch) {
                    return i;
                }
            }
            return -1;
        }

        @VisibleForTesting
        byte[] trimSignAndLeadingZeros(byte[] array) {
            int len = array.length;
            int st = 0;

            while ((st < len) && (array[st] == ZERO)
                    || (st < len) && (array[st] == DASH)) {
                st++;
            }
            return ((st > 0) ? substring(st, len, array) : array);
        }

        @VisibleForTesting
        byte[] trimTrailingZeros(byte[] array) {
            int len = array.length;
            int st = len - 1;

            while ((st > 0) && (array[st] == ZERO)) {
                st--;
            }
            return ((st < len - 1) ? substring(0, st + 1, array) : array);
        }

        @VisibleForTesting
        byte[] substring(int start, int end, byte[] original) {
            int length = end - start;
            byte[] part = new byte[length];
            System.arraycopy(original, start, part, 0, length);
            return part;
        }

        @VisibleForTesting
        byte[] decimalPortion() {
            byte[] part = new byte[length() - decimalOffset];
            System.arraycopy(this.bytes, decimalOffset, part, 0, length() - decimalOffset);
            return part;
        }
    }
}