统计 Postgresql 和 Talend 中多个 table 的当前周记录

Count the current week records from multiple table in Postgresql and Talend

我在 Postgresql 中有多个 table。比如说,A table 有 A 列、B 列、C 列,refresh_date,财政周。 B table 有 D 列,E 列,B 列,财政周,refresh_date。我想查找 当前周 的 table A 的记录总数和 table B 的 的列 E 的总数]本周。我正在使用 Talend 从 table A 和 table B 加载数据,而那些 tables 在 Postgresql 中。此外,如果当前周的 E 列的值等于零,那么它应该向我自己发送一封邮件。 我想为此创建一个通用代码,因为这是针对 table A 和 table B,将有多个 table 与此类似。如何在 TalendPostgresql 中做到这一点?

几个月前我已经创建了类似您需要的东西,它基本上执行随机查询并解析结果集并将其非规范化存储在数据库中table。请注意,我使用的企业 Talend 具有称为动态模式的巧妙功能: https://help.talend.com/pages/viewpage.action?pageId=190513179

那么我们应该从哪里开始呢? 我的典型查询如下所示:

select pk1 as rcr_grby_pk1, pk2 as rcr_grpby_pk2, 
       count(*) as cnt, sum(amount) as sum_amount
from mySchema.myTable
group by pk1, pk2

显然 Select 查询可以是任何内容,可以包含任意数量的列。我们执行它并将结果存储在 table 中,如下所示:

--------------------------------------------------------
|  schema  |  table  |  pk     |measure_name| value_n  |
--------------------------------------------------------
| mySchema | myTable | foo2015 | cnt        | 1234     |
--------------------------------------------------------
| mySchema | myTable | foo2015 | sum_amount | 987.65   |
--------------------------------------------------------
| mySchema | myTable | bar2014 | cnt        | 4321     |
--------------------------------------------------------
| mySchema | myTable | bar2014 | sum_amount | 567.89   |
--------------------------------------------------------

我们区分了 3 种基本类型:文本、数字、日期。

我想您知道如何编写和存储这些 SQL 查询,以及可以传递给 talend 并存储在目标 table 中的 ID,因此您可以看到是什么产生了那个结果。

所以处理片。 tFlowToIterate -> tJavaFlex -> tLogRow

我已将所有内容放入一个 joblet 中,因为我们正在使用它来协调不同数据库之间的数据。 (例如 Oracle 和 Postgres) 职位内容:

tJavaFlex 具有如下输出模式:

tJavaFlex 内容是这样的:

开始:

Dynamic record = ((Dynamic)globalMap.get("input.line"));

String group_by_columns = "";

for(int i = 0 ; i < record.getColumnCount() ; i++) {
    DynamicMetadata meta = record.getColumnMetadata(i);
    if(meta.getDbName().toUpperCase().startsWith("RCR_GRPBY_") ){
        group_by_columns  += "" + record.getColumnValue(i);
    }
}
for(int i = 0 ; i < record.getColumnCount() ; i++) {
    DynamicMetadata meta = record.getColumnMetadata(i);
    if(false == meta.getDbName().toUpperCase().startsWith("RCR_GRPBY_") ){

主要内容:

out.grp_id = context.grp_id;
out.job_id = context.job_id;
out.table_test_id = context.table_test_id;

out.group_by_columns = group_by_columns;
out.measure_column_name = meta.getDbName().toUpperCase();

out.result_n = /* Float */ null;
out.result_v = /* String */ null;
out.result_d = /* Date */ null;

if( (record.getColumnValue(i)!=null) && ( meta.getType().equals("id_String") ) ){
    out.result_v = String.valueOf(record.getColumnValue(i));
} else if( (record.getColumnValue(i)!=null) && meta.getType().equals("id_Date") ) {
    System.out.println(String.valueOf(record.getColumnValue(i)));   
    out.result_d = (Date)record.getColumnValue(i);
} else if( (record.getColumnValue(i)!=null) && ( meta.getType().equals("id_Integer") 
        || meta.getType().equals("id_Double") 
        || meta.getType().equals("id_Float")
        || meta.getType().equals("id_Long") )) {
    out.result_n = Float.valueOf(String.valueOf(record.getColumnValue(i)));
} else if( (record.getColumnValue(i)!=null) && ( meta.getType().equals("id_BigDecimal") ) ) {
    out.result_n = new BigDecimal( String.valueOf(record.getColumnValue(i)) ).floatValue();
} else {
    //Should not happen
    System.out.println("\n Unhandled type: " + meta.getType() );    
}

结束:

    } // if
} //for

PS:我知道 Float 是存储数字的糟糕选择,但没有时间对其进行修改,它仍然给出了 acceptable 结果。