Pentaho 转换树结构数据
Pentaho to convert tree structure data
我有来自 CSV 的数据流。它是一个扁平结构的数据库。
例如:
a,b,c,d
a,b,c,e
a,b,f
这基本上转化为:
Node id,Nodename,parent id,level
100, a , 0 , 1
200, b , 100 , 2
300, c , 200 , 3
400, d , 300 , 4
500, e , 300 , 4
600, f , 200 , 3
这可以使用 Pentaho 完成吗?我已经完成了转型步骤。但我觉得没有什么可以用于此目的。如果有任何我可能遗漏的步骤,请告诉我。
您的 CSV 文件包含图形或树定义。输出格式丰富(node_id
需要生成,parent_id
需要解析,level
需要设置)。在 Pentaho Data Integration 中处理此类 CSV 文件时,您会遇到一些问题:
数据加载和处理:
- 行的长度不同(有时 4 个节点,有时 3 个节点)。
- 加载整行。然后将行拆分为节点并为每个记录流项目处理一个节点。
- 您可以在节点拆分的同一步骤中计算输出值。
解决步骤:
- CSV 文件输入: 从 CSV 加载数据。 设置:没有 header 行;分隔符 = ';';一个名为
rowData
的输出列
- 修改Java脚本值:拆分
rowData
为nodes
并计算输出值:nodeId, nodeName, parentId, nodeLevel
【见代码下面]
- 行排序:行排序
nodeName
。 [a,b,c,d,a,b,c,e,a,b,f >> a,a,a,b,b,c,c,d,e,f]
- 唯一行: 删除重复行
nodeName
。 [a,a,a,b,b,c,c,d,e,f >> a,b,c,d,e,f]
- 文本文件输出:写出结果。
已修改 Java 脚本值 代码:
function writeRow(nodeId, nodeName, parentId, nodeLevel){
newRow = createRowCopy(getOutputRowMeta().size());
var rowIndex = getInputRowMeta().size();
newRow[rowIndex++] = nodeId;
newRow[rowIndex++] = nodeName;
newRow[rowIndex++] = parentId;
newRow[rowIndex++] = nodeLevel;
putRow(newRow);
}
var nodeIdsMap = {
a: "100",
b: "200",
c: "300",
d: "400",
e: "500",
f: "600",
g: "700",
h: "800",
}
// rowData from record stream (CSV input step)
var nodes = rowData.split(",");
for (i = 0; i < nodes.length; i++){
var nodeId = nodeIdsMap[nodes[i]];
var parentNodeId = (i == 0) ? "0" : nodeIdsMap[nodes[i-1]];
var level = i + 1;
writeRow(nodeId, nodes[i], parentNodeId, level);
}
trans_Status = SKIP_TRANSFORMATION;
已修改 Java 脚本值 字段设置:
- 字段名;类型;替换值'字段名'或'Rename to'
- nodeId;细绳; N
- 节点名;细绳; N
- parent_id;细绳; N
- 节点级别;细绳; N
我有来自 CSV 的数据流。它是一个扁平结构的数据库。
例如:
a,b,c,d
a,b,c,e
a,b,f
这基本上转化为:
Node id,Nodename,parent id,level
100, a , 0 , 1
200, b , 100 , 2
300, c , 200 , 3
400, d , 300 , 4
500, e , 300 , 4
600, f , 200 , 3
这可以使用 Pentaho 完成吗?我已经完成了转型步骤。但我觉得没有什么可以用于此目的。如果有任何我可能遗漏的步骤,请告诉我。
您的 CSV 文件包含图形或树定义。输出格式丰富(node_id
需要生成,parent_id
需要解析,level
需要设置)。在 Pentaho Data Integration 中处理此类 CSV 文件时,您会遇到一些问题:
数据加载和处理:
- 行的长度不同(有时 4 个节点,有时 3 个节点)。
- 加载整行。然后将行拆分为节点并为每个记录流项目处理一个节点。
- 您可以在节点拆分的同一步骤中计算输出值。
解决步骤:
- CSV 文件输入: 从 CSV 加载数据。 设置:没有 header 行;分隔符 = ';';一个名为
rowData
的输出列
- 修改Java脚本值:拆分
rowData
为nodes
并计算输出值:nodeId, nodeName, parentId, nodeLevel
【见代码下面] - 行排序:行排序
nodeName
。[a,b,c,d,a,b,c,e,a,b,f >> a,a,a,b,b,c,c,d,e,f]
- 唯一行: 删除重复行
nodeName
。[a,a,a,b,b,c,c,d,e,f >> a,b,c,d,e,f]
- 文本文件输出:写出结果。
已修改 Java 脚本值 代码:
function writeRow(nodeId, nodeName, parentId, nodeLevel){
newRow = createRowCopy(getOutputRowMeta().size());
var rowIndex = getInputRowMeta().size();
newRow[rowIndex++] = nodeId;
newRow[rowIndex++] = nodeName;
newRow[rowIndex++] = parentId;
newRow[rowIndex++] = nodeLevel;
putRow(newRow);
}
var nodeIdsMap = {
a: "100",
b: "200",
c: "300",
d: "400",
e: "500",
f: "600",
g: "700",
h: "800",
}
// rowData from record stream (CSV input step)
var nodes = rowData.split(",");
for (i = 0; i < nodes.length; i++){
var nodeId = nodeIdsMap[nodes[i]];
var parentNodeId = (i == 0) ? "0" : nodeIdsMap[nodes[i-1]];
var level = i + 1;
writeRow(nodeId, nodes[i], parentNodeId, level);
}
trans_Status = SKIP_TRANSFORMATION;
已修改 Java 脚本值 字段设置:
- 字段名;类型;替换值'字段名'或'Rename to'
- nodeId;细绳; N
- 节点名;细绳; N
- parent_id;细绳; N
- 节点级别;细绳; N