ksqlDB 流加入 table 并出现聚合错误

ksqlDB stream join table with aggregation error

当我执行下面的sql错误时,谁知道如何解决?非常感谢。

错误信息: class io.confluent.ksql.execution.expression.tree.LogicalBinaryExpression 无法转换为 class io.confluent.ksql.execution.expression.tree.ComparisonExpression(io.confluent.ksql.execution.expression.tree.LogicalBinaryExpression 和 io.confluent.ksql.execution.expression.tree.ComparisonExpression 在加载程序 'app' 的未命名模块中)

select 
a.mo_order_id,
    a.mo_lot_no,
    b.tenant_id,
    a.line_id,
    substring((TIMESTAMPTOSTRING(a.creation_time, 'yyyy-MM-dd HH:mm:ss', 'GMT')),1,10) as produce_date,
    substring(substring((TIMESTAMPTOSTRING(a.creation_time, 'yyyy-MM-dd HH:mm:ss', 'GMT')),12,12),1,2) as produce_hour,
    (
    case 
    when a.result_code =1  then 1 
    when a.result_code =3  then 1 
    when a.result_code =8  then 1 
    when a.result_code =2 then 2 
    else 0 
    end) cause_type,
    SUM((
    case 
    when ((a.result_code =1) and is_reset = false) then sn_count
    when ((a.result_code =3) and is_reset = false) then sn_count 
    when ((a.result_code =8) and is_reset = false) then sn_count 
    when ((a.result_code =1) and is_reset = true)  then -1*sn_count 
    when (a.result_code = 2) then sn_count else 0 
    end)) stats
from
    STREAM_ORI_SACMES_INSPECTION a
inner join KSQL_TABLE_GP_MO_ORDER b on
    (a.mo_order_id = b.id and (a.result_code =1 or a.result_code =3 or a.result_code =8))
group by 
    a.mo_order_id,
    a.mo_lot_no,
    b.tenant_id,
    a.line_id,
    substring((TIMESTAMPTOSTRING(creation_time, 'yyyy-MM-dd HH:mm:ss', 'GMT')),1,10),
    substring(substring((TIMESTAMPTOSTRING(creation_time, 'yyyy-MM-dd HH:mm:ss', 'GMT')),12,12),1,2),
    (
    case 
    when a.result_code =1  then 1 
    when a.result_code =3  then 1 
    when a.result_code =8  then 1 
    when a.result_code =2 then 2 
    else 0 
    end)
    emit changes;

嗯...看起来像个错误!

问题是您的加入条件无效:(a.mo_order_id = b.id and (a.result_code =1 or a.result_code =3 or a.result_code =8))

看起来您的加入条件应该是 a.mo_order_id = b.id,其他位应该移至 WHERE 子句。

a.mo_order_id,
    a.mo_lot_no,
    b.tenant_id,
    a.line_id,
    substring((TIMESTAMPTOSTRING(a.creation_time, 'yyyy-MM-dd HH:mm:ss', 'GMT')),1,10) as produce_date,
    substring(substring((TIMESTAMPTOSTRING(a.creation_time, 'yyyy-MM-dd HH:mm:ss', 'GMT')),12,12),1,2) as produce_hour,
    (
    case 
    when a.result_code =1  then 1 
    when a.result_code =3  then 1 
    when a.result_code =8  then 1 
    when a.result_code =2 then 2 
    else 0 
    end) cause_type,
    SUM((
    case 
    when ((a.result_code =1) and is_reset = false) then sn_count
    when ((a.result_code =3) and is_reset = false) then sn_count 
    when ((a.result_code =8) and is_reset = false) then sn_count 
    when ((a.result_code =1) and is_reset = true)  then -1*sn_count 
    when (a.result_code = 2) then sn_count else 0 
    end)) stats
from
    STREAM_ORI_SACMES_INSPECTION a
inner join KSQL_TABLE_GP_MO_ORDER b on
    a.mo_order_id = b.id
where
    a.result_code =1 or a.result_code =3 or a.result_code =8
group by 
    a.mo_order_id,
    a.mo_lot_no,
    b.tenant_id,
    a.line_id,
    substring((TIMESTAMPTOSTRING(creation_time, 'yyyy-MM-dd HH:mm:ss', 'GMT')),1,10),
    substring(substring((TIMESTAMPTOSTRING(creation_time, 'yyyy-MM-dd HH:mm:ss', 'GMT')),12,12),1,2),
    (
    case 
    when a.result_code =1  then 1 
    when a.result_code =3  then 1 
    when a.result_code =8  then 1 
    when a.result_code =2 then 2 
    else 0 
    end)
    emit changes;
join ksqldb