TStream 中的元组到底是什么?
what exactly is tuple in TStream?
这是取自 Apache Edgent Documentation
的代码
我无法理解元组到底是什么。代码如下:
public static void main(String[] args) throws Exception {
TempSensor sensor = new TempSensor();
DirectProvider dp = new DirectProvider();
Topology topology = dp.newTopology();
TStream<Double> tempReadings = topology.poll(sensor, 1, TimeUnit.MILLISECONDS);
TStream<Double> simpleFiltered = tempReadings.filter(tuple ->
!optimalTempRangeRef.get().contains(tuple));
simpleFiltered.sink(tuple -> System.out.println("Temperature is out of range! "
+ "It is " + tuple + "\u00b0F!"));
tempReadings.print();
dp.submit(topology);
}
我得到的错误是元组无法解析为变量。我得到的错误到底是什么?谢谢。
TStream<T>
interface 旨在模拟数据流,通常是传感器读数。本例中的 T
是用于存储单个读数的类型,但 'reading' 实际上可以表示多个数字(例如温度、湿度和风速)组合成一个复合类型,通常称为到这里作为值的 'tuple'。
但是,查看您的示例的上下文,我们正在处理一系列简单的温度读数,因此此处 T
对应于 Double
类型的单个数字。所以选择 'tuple' 作为变量名有点令人困惑(从数学上讲,它是一个 1 元组 ,但在这种情况下它只意味着 'a number')。
在您的代码中,filter()
方法接受一个 predicate,这里是
tuple -> !optimalTempRangeRef.get().contains(tuple)
和optimalTempRangeRef.get()
returns一个Range(Double)
,所以谓词是说"is our temperature value outside our optimal range?"
来自 Range
的文档:
contains() is used to check for containment: e.g.
Ranges.closed(2,4).contains(2); // returns true
Ranges.open(2,4).contains(2); // returns false
Ranges.atLeast(2).contains(2); // returns true
Ranges.greaterThan(2).contains(2); // returns false
Ranges.atMost(2).contains(2); // returns true
Ranges.lessThan(2).contains(2); // returns false
编辑:
看起来您的 IDE 在使用 Java 8 lambda 语法时遇到问题,因此您可以使用匿名内部 类 重写代码,如下所示:
import org.apache.edgent.function.Predicate;
import org.apache.edgent.function.Consumer;
public static void main( String[] args ) throws Exception
{
TempSensor sensor = new TempSensor();
DirectProvider dp = new DirectProvider();
Topology topology = dp.newTopology();
TStream<Double> tempReadings = topology.poll( sensor, 1, TimeUnit.MILLISECONDS );
TStream<Double> filteredStream = tempReadings.filter( new Predicate<Double>()
{
public boolean test( Double reading )
{
return !optimalTempRangeRef.get().contains( reading );
}
} );
filteredStream.sink( new Consumer<Double>()
{
public void accept( Double reading )
{
System.out.println( "Temperature is out of range! "
+ "It is " + reading + "\u00b0F!" )
}
} );
tempReadings.print();
dp.submit( topology );
}
这是取自 Apache Edgent Documentation
的代码我无法理解元组到底是什么。代码如下:
public static void main(String[] args) throws Exception {
TempSensor sensor = new TempSensor();
DirectProvider dp = new DirectProvider();
Topology topology = dp.newTopology();
TStream<Double> tempReadings = topology.poll(sensor, 1, TimeUnit.MILLISECONDS);
TStream<Double> simpleFiltered = tempReadings.filter(tuple ->
!optimalTempRangeRef.get().contains(tuple));
simpleFiltered.sink(tuple -> System.out.println("Temperature is out of range! "
+ "It is " + tuple + "\u00b0F!"));
tempReadings.print();
dp.submit(topology);
}
我得到的错误是元组无法解析为变量。我得到的错误到底是什么?谢谢。
TStream<T>
interface 旨在模拟数据流,通常是传感器读数。本例中的 T
是用于存储单个读数的类型,但 'reading' 实际上可以表示多个数字(例如温度、湿度和风速)组合成一个复合类型,通常称为到这里作为值的 'tuple'。
但是,查看您的示例的上下文,我们正在处理一系列简单的温度读数,因此此处 T
对应于 Double
类型的单个数字。所以选择 'tuple' 作为变量名有点令人困惑(从数学上讲,它是一个 1 元组 ,但在这种情况下它只意味着 'a number')。
在您的代码中,filter()
方法接受一个 predicate,这里是
tuple -> !optimalTempRangeRef.get().contains(tuple)
和optimalTempRangeRef.get()
returns一个Range(Double)
,所以谓词是说"is our temperature value outside our optimal range?"
来自 Range
的文档:
contains() is used to check for containment: e.g.
Ranges.closed(2,4).contains(2); // returns true
Ranges.open(2,4).contains(2); // returns false
Ranges.atLeast(2).contains(2); // returns true
Ranges.greaterThan(2).contains(2); // returns false
Ranges.atMost(2).contains(2); // returns true
Ranges.lessThan(2).contains(2); // returns false
编辑:
看起来您的 IDE 在使用 Java 8 lambda 语法时遇到问题,因此您可以使用匿名内部 类 重写代码,如下所示:
import org.apache.edgent.function.Predicate;
import org.apache.edgent.function.Consumer;
public static void main( String[] args ) throws Exception
{
TempSensor sensor = new TempSensor();
DirectProvider dp = new DirectProvider();
Topology topology = dp.newTopology();
TStream<Double> tempReadings = topology.poll( sensor, 1, TimeUnit.MILLISECONDS );
TStream<Double> filteredStream = tempReadings.filter( new Predicate<Double>()
{
public boolean test( Double reading )
{
return !optimalTempRangeRef.get().contains( reading );
}
} );
filteredStream.sink( new Consumer<Double>()
{
public void accept( Double reading )
{
System.out.println( "Temperature is out of range! "
+ "It is " + reading + "\u00b0F!" )
}
} );
tempReadings.print();
dp.submit( topology );
}