在另一个内部创建新的数据流
Create new data stream inside another
我有两种数据类型。
type1 and type2
我有一个type1
的数据流。
DataStream<type1> stream1 =...
在 stream1
中,我想创建 type2
的对象,并且我想同时收集 type1
和 type2
的对象。
一个数据流是否可以有一种输入类型和两种输出类型?或者是否可以在 stream1
中创建一个新的数据流 (DataStream<type2> stream2
)?
或者有没有其他方法可以收集从一种类型评估的两种不同类型的数据?
您需要先创建一个包装器类型,然后再拆分并select您的流。对于包装器,只有一个成员是 not-null;
class TypeWrapper {
// keeping this short for brevity
public TypeA firstType;
public TypeB secondType;
}
拆分和 select:
DataStream<TypeWrapper> stream1 = ...
DataStream<TypeA> streamA = stream1.filter(new FilterFunction<TypeWrapper>() {
@Override
public boolean filter(TypeWrapper value) throws Exception {
return value.firstType != null;
}
})
.map(new MapFunction<TypeWrapper, TypeA>() {
@Override
public TypeA map(TypeWrapper value) throws Exception {
return value.firstType;
}
});
DataStream<TypeB> streamB = stream1.filter(new FilterFunction<TypeWrapper>() {
@Override
public boolean filter(TypeWrapper value) throws Exception {
return value.secondType != null;
}
})
.map(new MapFunction<TypeWrapper, TypeB>() {
@Override
public TypeB map(TypeWrapper value) throws Exception {
return value.secondType;
}
});
因为 filter()
和 map()
将被链接到 stream1
两者都在同一个线程上执行并且操作很便宜。
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
case class Type1(){}
case class Type2(){}
object MultipleOutputJob {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Stream of Type1
val stream1 = env.addSource((sc: SourceFunction.SourceContext[Type1]) => {
while(true){
Thread.sleep(1000)
sc.collect(Type1())
}
})
// Mapping from Type1 to Type2
val stream2 = stream1.map(t1 => Type2())
// Collect both the original and the derived data
stream1.print
stream2.print
env.execute()
}
}
我有两种数据类型。
type1 and type2
我有一个type1
的数据流。
DataStream<type1> stream1 =...
在 stream1
中,我想创建 type2
的对象,并且我想同时收集 type1
和 type2
的对象。
一个数据流是否可以有一种输入类型和两种输出类型?或者是否可以在 stream1
中创建一个新的数据流 (DataStream<type2> stream2
)?
或者有没有其他方法可以收集从一种类型评估的两种不同类型的数据?
您需要先创建一个包装器类型,然后再拆分并select您的流。对于包装器,只有一个成员是 not-null;
class TypeWrapper {
// keeping this short for brevity
public TypeA firstType;
public TypeB secondType;
}
拆分和 select:
DataStream<TypeWrapper> stream1 = ...
DataStream<TypeA> streamA = stream1.filter(new FilterFunction<TypeWrapper>() {
@Override
public boolean filter(TypeWrapper value) throws Exception {
return value.firstType != null;
}
})
.map(new MapFunction<TypeWrapper, TypeA>() {
@Override
public TypeA map(TypeWrapper value) throws Exception {
return value.firstType;
}
});
DataStream<TypeB> streamB = stream1.filter(new FilterFunction<TypeWrapper>() {
@Override
public boolean filter(TypeWrapper value) throws Exception {
return value.secondType != null;
}
})
.map(new MapFunction<TypeWrapper, TypeB>() {
@Override
public TypeB map(TypeWrapper value) throws Exception {
return value.secondType;
}
});
因为 filter()
和 map()
将被链接到 stream1
两者都在同一个线程上执行并且操作很便宜。
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
case class Type1(){}
case class Type2(){}
object MultipleOutputJob {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Stream of Type1
val stream1 = env.addSource((sc: SourceFunction.SourceContext[Type1]) => {
while(true){
Thread.sleep(1000)
sc.collect(Type1())
}
})
// Mapping from Type1 to Type2
val stream2 = stream1.map(t1 => Type2())
// Collect both the original and the derived data
stream1.print
stream2.print
env.execute()
}
}