SQL:查找内容匹配的行(独立于列)

SQL: Find rows with matching content (independant of column)

我在一个 vertica 数据库中有两个 table。第一个 table 有 N+1 列,第二个有 N 列。我想找到具有相同内容但不需要按相同顺序排列的行,link 到第二个 table 的额外列。这是 N=3 的示例:

Table1:

+-------+-------+-------+-------+
| Item1 | Item2 | Item3 | Value |
+-------+-------+-------+-------+
| A     | B     | C     |     3 |
| C     | D     | E     |     2 |
+-------+-------+-------+-------+

Table2:

+-------+-------+-------+
| Item1 | Item2 | Item3 |
+-------+-------+-------+
| C     | F     | E     |
| C     | A     | B     |
+-------+-------+-------+

如您所见,就行内容而言,Table1 中的第一行等于 Table2 中的第二行,只是(除了 Value 列)顺序不同。所以我的问题是:有没有办法 link 和 table 以便将值复制到 Table。得到最后的Table.

TableF:

+-------+-------+-------+-------+
| Item1 | Item2 | Item3 | Value |
+-------+-------+-------+-------+
| C     | A     | B     |     3 |
+-------+-------+-------+-------+

可能的解决方案是同时订购 table 字母数字(Table1_sorted、Table2_sorted),如下所示:

SELECT T2.Item1, T2.Item2, T2.Item3, T1.Value 
FROM Table1_sorted T1, Table2_sorted T2 
WHERE T1.Item1=T2.Item1 AND T1.Item2 = T2.Item2 AND T1.Item3 = T2.Item3

然而,排序字母数字相当复杂。另一种方法是使用一堆 AND OR 组合,这也不理想。我想知道是否有更简单的解决方案。

谢谢!

您可以创建一个 UDx Scalar Function 来检查行级别的相等性。您可以对元组进行排序或从元组创建集合。我将在 Python 上展示一个示例,但强烈建议在 JAVA 或 C++ 上编写 UDF。 (youtube 上的完整演示)

dbadmin=> select * from t1;
 item1 | item2 | item3 | value
-------+-------+-------+-------
 A     | B     | C     |     3
 C     | D     | E     |     2
(2 rows)

dbadmin=> select * from t2;
 item1 | item2 | item3
-------+-------+-------
 C     | A     | B
 C     | F     | E
(2 rows)

dbadmin=> select t1.* from t1, t2 where perm(t1.item1, t1.item2, t1.item3, t2.*);
 item1 | item2 | item3 | value
-------+-------+-------+-------
 A     | B     | C     |     3
(1 row)

如果元组的顺序很重要:

dbadmin=> select t2.*, t1.value from t1, t2
dbadmin-> where perm(t1.item1, t1.item2, t1.item3, t2.*);
 item1 | item2 | item3 | value
-------+-------+-------+-------
 C     | A     | B     |     3
(1 row)

例如 python(对于 Vertica 8.x)

版本 1:

def processBlock(self, server_interface, arg_reader, res_writer):
    while(True):
        cols = arg_reader.getNumCols()
        if cols % 2 != 0:
            raise ValueError("num of columns must be even")
        s1, s2 = set(), set()
        for i in range(cols):
            if i < cols / 2:
                s1.add(arg_reader.getString(i))
            else:
                s2.add(arg_reader.getString(i))
        res_writer.setBool(s1 == s2)
        res_writer.next()
        if not arg_reader.next():
            break

版本 2:

def processBlock(self, server_interface, arg_reader, res_writer):
    while(True):
        cols = arg_reader.getNumCols()
        if cols % 2 != 0:
            raise ValueError("num of cols must be even")
        s = set()
        for i in range(cols):
            s.add(arg_reader.getString(i))
        res_writer.setBool(len(s) == cols / 2)
        res_writer.next()
        if not arg_reader.next():
            break

完整代码:

import vertica_sdk

class perm(vertica_sdk.ScalarFunction):

    def __init__(self):
        pass

    def setup(self, server_interface, col_types):
        pass

    def processBlock(self, server_interface, arg_reader, res_writer):
        #server_interface.log("log msg")
        while(True):
            # Example of error checking best practices.
            cols = arg_reader.getNumCols()
            if cols % 2 != 0:
                raise ValueError("num of cols must be even")
            s = set()
            for i in range(cols):
                s.add(arg_reader.getString(i))
            res_writer.setBool(len(s) == cols / 2)
            res_writer.next()
            if not arg_reader.next():
                break

    def destroy(self, server_interface, col_types):
        pass

class perm_factory(vertica_sdk.ScalarFunctionFactory):

    def createScalarFunction(self, srv):
        return perm()

    def getPrototype(self, srv_interface, arg_types, return_type):
        arg_types.addAny()
        return_type.addBool()

    def getReturnType(self, srv_interface, arg_types, return_type):
        return_type.addBool()

这里是 Java 版本

+package com.iav.udsf.misc;
+
+import com.vertica.sdk.*;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *
+ * @author gdpassar
+ */
+public class RowCompareFactory  extends ScalarFunctionFactory {
+
+    // Access to Vertica services (logging)
+    protected ServerInterface si;
+
+    /**
+     * Method called by Vertica in order to obtain the UDF interface (input and output parameters).
+     * Default is to have two input parameteres (a String representing the time intervals and a binary
+     * object containing the signal to analyse) and a String output parameter (representing a time
+     * interval, can be NULL if the UDF did not successfully apply to the given signal)
+     * @param srvInterface Vertica server interface
+     * @param argTypes Object for adding input parameters
+     * @param returnType Object for adding output parameters
+     */
+    @Override
+    public void getPrototype(ServerInterface srvInterface,
+                             ColumnTypes argTypes,
+                             ColumnTypes returnType)
+    {
+        argTypes.addAny();
+
+        returnType.addBool();
+    }
+
+    /**
+     * Method called by Vertica in order to obtain the return value type.
+     * @param srvInterface Vertica server interface
+     * @param argTypes Not used
+     * @param returnType Not used
+     */
+    @Override
+    public void getReturnType(ServerInterface srvInterface, SizedColumnTypes argTypes, SizedColumnTypes returnType) {
+        returnType.addBool();
+    }
+
+    /**
+     * UDF standard class, containing the parsing logic of the input parameters
+     */
+    public class udf extends ScalarFunction {
+
+        @Override
+        public void processBlock(ServerInterface srvInterface,
+                                 BlockReader argReader,
+                                 BlockWriter resWriter)
+                    throws UdfException, DestroyInvocation {
+            si = srvInterface;
+
+            do {
+                Set<Object> itemSet = new HashSet<>();
+                final int numCols = argReader.getNumCols();
+                if ((numCols % 2) != 0) {
+                    throw new UdfException(0, "Number of columns must be even");
+                }
+                try {
+                    itemSet.clear();
+                    for (int i = 0; i < numCols; i++) {
+                        if (argReader.getTypeMetaData().getColumnType(i).isVarchar()) {
+                            itemSet.add(argReader.getString(i));
+                        } else if (argReader.getTypeMetaData().getColumnType(i).isInt()) {
+                            itemSet.add(argReader.getLong(i));
+                        } else if (argReader.getTypeMetaData().getColumnType(i).isDate()) {
+                            itemSet.add(argReader.getDate(i));
+                        } else if (argReader.getTypeMetaData().getColumnType(i).isFloat()) {
+                            itemSet.add(argReader.getDouble(i));
+                        } else if (argReader.getTypeMetaData().getColumnType(i).isTimestamp()) {
+                            itemSet.add(argReader.getTimestamp(i));
+                        } else if (argReader.getTypeMetaData().getColumnType(i).isBool()) {
+                            itemSet.add(argReader.getBoolean(i));
+                        } else {
+                            throw new UdfException(0, "Data type not supported");
+                        }
+                    }
+                    
+                    resWriter.setBoolean(itemSet.size() == (numCols / 2));
+
+                } catch (Exception ex) {
+                    StringWriter sw = new StringWriter();
+                    PrintWriter pw = new PrintWriter(sw);
+                    ex.printStackTrace(pw);
+                    si.log(sw.toString());
+                    resWriter.setBoolean(false);
+                }
+                resWriter.next();
+            } while (argReader.next());
+        }
+    }
+
+    /**
+     * Method called by Vertica in order to instantiate a new UDF when the query runs.
+     * @param srvInterface Vertica server interface
+     * @return Instance of a scalar function
+     */
+    @Override
+    public ScalarFunction createScalarFunction(ServerInterface srvInterface)
+    {
+        return new udf();
+    }
+}