在级联中构建自定义连接逻辑,确保仅 MAP_SIDE
Build custom join logic in Cascading ensuring MAP_SIDE only
我有 3 个级联管道(一个连接另外两个),描述如下,
- LHSPipe -(更大尺寸)
- RHSPipes -(更小 可能适合内存的大小)
伪代码如下,本例涉及两个join
IF F1DecidingFactor = YES 然后
将 LHSPipe 加入 RHS Lookup#1 BY (LHSPipe.F1Input = RHS Lookup#1.Join#F1) 并设置查找结果 (SET LHSPipe.F1Output = Result#F1)
否则
SET LHSPipe.F1Output = N/A
同样的逻辑适用于 F2 计算。
预期输出,
这种情况迫使我使用自定义加入操作,因为 IF-ELSE 决定是否加入。
考虑到上述情况,我想进行 MAP-SIDE 连接(将 RHSPipe 保留在 MAP 任务节点的内存中),我在考虑以下可能的解决方案,每个方案各有利弊。需要你的建议。
选项#1:
CoGroup - 我们可以使用 CoGroup 和 BufferJoiner 构建自定义连接逻辑,然后是自定义连接(操作),但这不能确保 MAP-SIDE 连接。
选项#2:
HashJoin - 它确保 MAP-SIDE 连接,但据我所知无法使用它构建自定义连接。
请指正我的理解并提出您的意见以解决此需求。
提前致谢。
解决这个问题的最佳方法(我可以想到)是修改较小的数据集。您可以向较小的数据集添加一个新字段 (F1DecidingFactor
)。 F1Result
的值应该是这样的:
Sudo 代码
if F1DecidingFactor == "Yes" then
F1Result = ACTUAL_VALUE
else
F1Result = "N/A"
结果Table
|F1#Join|F1#Result|F1#DecidingFactor|
| Yes| 0| True|
| Yes| 1| False|
| No| 0| N/A|
| No| 1| N/A|
您也可以通过级联完成上述操作。
在此之后,您可以进行地图端连接。
如果修改较小的数据集是不可能的,那么我有 2 个选项来解决问题。
选项 1
向您的小管道添加新字段,这相当于您的决定因素(即 F1DecidingFactor_RHS = Yes
)。然后将其包含在您的加入条件中。加入完成后,您将只拥有与此条件匹配的那些行的值。否则它将是 null/blank。示例代码:
主要Class
import cascading.operation.Insert;
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Discard;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;
public class StackHashJoinTestOption2 {
public StackHashJoinTestOption2() {
Fields f1Input = new Fields("F1Input");
Fields f2Input = new Fields("F2Input");
Fields f1Join = new Fields("F1Join");
Fields f2Join = new Fields("F2Join");
Fields f1DecidingFactor = new Fields("F1DecidingFactor");
Fields f2DecidingFactor = new Fields("F2DecidingFactor");
Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS");
Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS");
Fields lhsJoinerOne = f1DecidingFactor.append(f1Input);
Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input);
Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join);
Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join);
Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");
// Large Pipe fields :
// F1DecidingFactor F1Input F2DecidingFactor F2Input
Pipe largePipe = new Pipe("large-pipe");
// Small Pipe 1 Fields :
// F1Join F1Result
Pipe rhsOne = new Pipe("small-pipe-1");
// New field to small pipe. Expected Fields:
// F1Join F1Result F1DecidingFactor_RHS
rhsOne = new Each(rhsOne, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);
// Small Pipe 2 Fields :
// F2Join F2Result
Pipe rhsTwo = new Pipe("small-pipe-2");
// New field to small pipe. Expected Fields:
// F2Join F2Result F2DecidingFactor_RHS
rhsTwo = new Each(rhsTwo, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);
// Joining first small pipe. Expected fields after join:
// F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS
Pipe resultsOne = new HashJoin(largePipe, lhsJoinerOne, rhsOne, rhsJoinerOne, new LeftJoin());
// Joining second small pipe. Expected fields after join:
// F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS
Pipe resultsTwo = new HashJoin(resultsOne, lhsJoinerTwo, rhsTwo, rhsJoinerTwo, new LeftJoin());
Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);
result = new Discard(result, f1DecidingFactorRhs);
result = new Discard(result, f2DecidingFactorRhs);
// result Pipe should have expected result
}
}
选项 2
如果您想使用默认值而不是 null/blank,那么我建议您首先使用默认连接器执行 HashJoin
,然后使用一个函数来使用适当的值更新元组。类似于:
主要Class
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;
public class StackHashJoinTest {
public StackHashJoinTest() {
Fields f1Input = new Fields("F1Input");
Fields f2Input = new Fields("F2Input");
Fields f1Join = new Fields("F1Join");
Fields f2Join = new Fields("F2Join");
Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");
// Large Pipe fields :
// F1DecidingFactor F1Input F2DecidingFactor F2Input
Pipe largePipe = new Pipe("large-pipe");
// Small Pipe 1 Fields :
// F1Join F1Result
Pipe rhsOne = new Pipe("small-pipe-1");
// Small Pipe 2 Fields :
// F2Join F2Result
Pipe rhsTwo = new Pipe("small-pipe-2");
// Joining first small pipe.
// Expected fields after join:
// F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result
Pipe resultsOne = new HashJoin(largePipe, f1Input, rhsOne, f1Join, new LeftJoin());
// Joining second small pipe.
// Expected fields after join:
// F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result
Pipe resultsTwo = new HashJoin(resultsOne, f2Input, rhsTwo, f2Join, new LeftJoin());
Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);
// result Pipe should have expected result
}
}
更新函数
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
public class TestFunction extends BaseOperation<Void> implements Function<Void> {
private static final long serialVersionUID = 1L;
private static final String DECIDING_FACTOR = "No";
private static final String DEFAULT_VALUE = "N/A";
// Expected Fields: "F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"
public TestFunction() {
super(Fields.ARGS);
}
@Override
public void operate(@SuppressWarnings("rawtypes") FlowProcess process, FunctionCall<Void> call) {
TupleEntry arguments = call.getArguments();
TupleEntry result = new TupleEntry(arguments);
if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
result.setString("F1Output", DEFAULT_VALUE);
}
if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
result.setString("F2Output", DEFAULT_VALUE);
}
call.getOutputCollector().add(result);
}
}
参考资料
这应该可以解决您的问题。如果这有帮助,请告诉我。
我有 3 个级联管道(一个连接另外两个),描述如下,
- LHSPipe -(更大尺寸)
- RHSPipes -(更小 可能适合内存的大小)
伪代码如下,本例涉及两个join
IF F1DecidingFactor = YES 然后 将 LHSPipe 加入 RHS Lookup#1 BY (LHSPipe.F1Input = RHS Lookup#1.Join#F1) 并设置查找结果 (SET LHSPipe.F1Output = Result#F1) 否则 SET LHSPipe.F1Output = N/A
同样的逻辑适用于 F2 计算。
预期输出,
这种情况迫使我使用自定义加入操作,因为 IF-ELSE 决定是否加入。
考虑到上述情况,我想进行 MAP-SIDE 连接(将 RHSPipe 保留在 MAP 任务节点的内存中),我在考虑以下可能的解决方案,每个方案各有利弊。需要你的建议。
选项#1:
CoGroup - 我们可以使用 CoGroup 和 BufferJoiner 构建自定义连接逻辑,然后是自定义连接(操作),但这不能确保 MAP-SIDE 连接。
选项#2:
HashJoin - 它确保 MAP-SIDE 连接,但据我所知无法使用它构建自定义连接。
请指正我的理解并提出您的意见以解决此需求。
提前致谢。
解决这个问题的最佳方法(我可以想到)是修改较小的数据集。您可以向较小的数据集添加一个新字段 (F1DecidingFactor
)。 F1Result
的值应该是这样的:
Sudo 代码
if F1DecidingFactor == "Yes" then
F1Result = ACTUAL_VALUE
else
F1Result = "N/A"
结果Table
|F1#Join|F1#Result|F1#DecidingFactor|
| Yes| 0| True|
| Yes| 1| False|
| No| 0| N/A|
| No| 1| N/A|
您也可以通过级联完成上述操作。
在此之后,您可以进行地图端连接。
如果修改较小的数据集是不可能的,那么我有 2 个选项来解决问题。
选项 1
向您的小管道添加新字段,这相当于您的决定因素(即 F1DecidingFactor_RHS = Yes
)。然后将其包含在您的加入条件中。加入完成后,您将只拥有与此条件匹配的那些行的值。否则它将是 null/blank。示例代码:
主要Class
import cascading.operation.Insert;
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Discard;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;
public class StackHashJoinTestOption2 {
public StackHashJoinTestOption2() {
Fields f1Input = new Fields("F1Input");
Fields f2Input = new Fields("F2Input");
Fields f1Join = new Fields("F1Join");
Fields f2Join = new Fields("F2Join");
Fields f1DecidingFactor = new Fields("F1DecidingFactor");
Fields f2DecidingFactor = new Fields("F2DecidingFactor");
Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS");
Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS");
Fields lhsJoinerOne = f1DecidingFactor.append(f1Input);
Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input);
Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join);
Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join);
Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");
// Large Pipe fields :
// F1DecidingFactor F1Input F2DecidingFactor F2Input
Pipe largePipe = new Pipe("large-pipe");
// Small Pipe 1 Fields :
// F1Join F1Result
Pipe rhsOne = new Pipe("small-pipe-1");
// New field to small pipe. Expected Fields:
// F1Join F1Result F1DecidingFactor_RHS
rhsOne = new Each(rhsOne, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);
// Small Pipe 2 Fields :
// F2Join F2Result
Pipe rhsTwo = new Pipe("small-pipe-2");
// New field to small pipe. Expected Fields:
// F2Join F2Result F2DecidingFactor_RHS
rhsTwo = new Each(rhsTwo, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);
// Joining first small pipe. Expected fields after join:
// F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS
Pipe resultsOne = new HashJoin(largePipe, lhsJoinerOne, rhsOne, rhsJoinerOne, new LeftJoin());
// Joining second small pipe. Expected fields after join:
// F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS
Pipe resultsTwo = new HashJoin(resultsOne, lhsJoinerTwo, rhsTwo, rhsJoinerTwo, new LeftJoin());
Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);
result = new Discard(result, f1DecidingFactorRhs);
result = new Discard(result, f2DecidingFactorRhs);
// result Pipe should have expected result
}
}
选项 2
如果您想使用默认值而不是 null/blank,那么我建议您首先使用默认连接器执行 HashJoin
,然后使用一个函数来使用适当的值更新元组。类似于:
主要Class
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;
public class StackHashJoinTest {
public StackHashJoinTest() {
Fields f1Input = new Fields("F1Input");
Fields f2Input = new Fields("F2Input");
Fields f1Join = new Fields("F1Join");
Fields f2Join = new Fields("F2Join");
Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");
// Large Pipe fields :
// F1DecidingFactor F1Input F2DecidingFactor F2Input
Pipe largePipe = new Pipe("large-pipe");
// Small Pipe 1 Fields :
// F1Join F1Result
Pipe rhsOne = new Pipe("small-pipe-1");
// Small Pipe 2 Fields :
// F2Join F2Result
Pipe rhsTwo = new Pipe("small-pipe-2");
// Joining first small pipe.
// Expected fields after join:
// F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result
Pipe resultsOne = new HashJoin(largePipe, f1Input, rhsOne, f1Join, new LeftJoin());
// Joining second small pipe.
// Expected fields after join:
// F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result
Pipe resultsTwo = new HashJoin(resultsOne, f2Input, rhsTwo, f2Join, new LeftJoin());
Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);
// result Pipe should have expected result
}
}
更新函数
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
public class TestFunction extends BaseOperation<Void> implements Function<Void> {
private static final long serialVersionUID = 1L;
private static final String DECIDING_FACTOR = "No";
private static final String DEFAULT_VALUE = "N/A";
// Expected Fields: "F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"
public TestFunction() {
super(Fields.ARGS);
}
@Override
public void operate(@SuppressWarnings("rawtypes") FlowProcess process, FunctionCall<Void> call) {
TupleEntry arguments = call.getArguments();
TupleEntry result = new TupleEntry(arguments);
if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
result.setString("F1Output", DEFAULT_VALUE);
}
if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
result.setString("F2Output", DEFAULT_VALUE);
}
call.getOutputCollector().add(result);
}
}
参考资料
这应该可以解决您的问题。如果这有帮助,请告诉我。