提交spark streaming receiver时,不带"failing through"如何指定host?
When submitting a spark streaming recevier, how to specify host without "failing through"?
我想在我提前知道 ip 和主机名的主机上创建一个服务器套接字来监听(它在 yarn 节点列表中显示该主机名)。但是我似乎无法让它在不让它事先失败任意次数的情况下在该主机上收听。
有一个 Flume receiver 具有我正在寻找的那种特定于主机的功能。
FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
我的接收者代码:
class TCPServerReceiver(hostname: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
}
private def receive() {
/* This is where the job fails until it happens to start on the correct host */
val server = new ServerSocket(port, 50, InetAddress.getByName(hostname))
var userInput: String = null
while (true) {
try {
val s = server.accept()
val in = new BufferedReader(new InputStreamReader(s.getInputStream()))
userInput = in.readLine()
while (!isStopped && userInput != null) {
store(userInput)
userInput = in.readLine()
}
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + port, e)
case t: Throwable =>
restart("Error receiving data", t)
}
}
}
}
然后在 运行ning 时对其进行测试:
echo 'this is a test' | nc <hostname> <port>
当我 运行 作为本地客户端时,这一切都有效,但是当它被提交到 yarn 集群时,日志显示它试图 运行 在不同主机上的其他容器中以及所有这些失败,因为主机名与容器的主机名不匹配:
java.net.BindException: Cannot assign requested address
最终(几分钟后),一旦接收器尝试在正确的主机上启动,它就会创建套接字,所以上面的代码确实 工作,但它需要大量的 "boot time",我担心添加更多节点会导致它花费更长的时间!
有没有办法确保此接收器在第一次尝试时在正确的主机上启动?
自定义 TCPServerReceiver
实现还应实现:
def preferredLocation: Option[String]
Override this to specify a preferred location (hostname).
在这种情况下,类似于:
def preferredLocation = Some(hostname)
我想在我提前知道 ip 和主机名的主机上创建一个服务器套接字来监听(它在 yarn 节点列表中显示该主机名)。但是我似乎无法让它在不让它事先失败任意次数的情况下在该主机上收听。
有一个 Flume receiver 具有我正在寻找的那种特定于主机的功能。
FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
我的接收者代码:
class TCPServerReceiver(hostname: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
}
private def receive() {
/* This is where the job fails until it happens to start on the correct host */
val server = new ServerSocket(port, 50, InetAddress.getByName(hostname))
var userInput: String = null
while (true) {
try {
val s = server.accept()
val in = new BufferedReader(new InputStreamReader(s.getInputStream()))
userInput = in.readLine()
while (!isStopped && userInput != null) {
store(userInput)
userInput = in.readLine()
}
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + port, e)
case t: Throwable =>
restart("Error receiving data", t)
}
}
}
}
然后在 运行ning 时对其进行测试:
echo 'this is a test' | nc <hostname> <port>
当我 运行 作为本地客户端时,这一切都有效,但是当它被提交到 yarn 集群时,日志显示它试图 运行 在不同主机上的其他容器中以及所有这些失败,因为主机名与容器的主机名不匹配:
java.net.BindException: Cannot assign requested address
最终(几分钟后),一旦接收器尝试在正确的主机上启动,它就会创建套接字,所以上面的代码确实 工作,但它需要大量的 "boot time",我担心添加更多节点会导致它花费更长的时间!
有没有办法确保此接收器在第一次尝试时在正确的主机上启动?
自定义 TCPServerReceiver
实现还应实现:
def preferredLocation: Option[String]
Override this to specify a preferred location (hostname).
在这种情况下,类似于:
def preferredLocation = Some(hostname)