除了使用系统 ping 实用程序或发送 ICMP 数据包来创建网络状态监视器之外,还有其他选择吗?
Are there any alternatives to using system ping utility or sending ICMP packets for creating a network condition monitor?
我一直在编写和维护一个个人使用的实用程序,它每隔 1 秒 ping 8.8.8.8
并显示一个指示网络状况的 pip 已有一段时间了,而且由于我的网络似乎具有固有的~1.7% 的数据包丢失,我使它 运行 两个 PING.EXE 进程并行,超时为 950 毫秒,以便区分随机丢失的数据包和短暂的中断(这些也会发生)
现在,问题是:我对 ping 没有足够的控制,而且我 运行 遇到了 PING.EXE 的 1 秒延迟不准确的问题足够了,随着流程的漂移,我随机得到 heavily degenerate 个案例,例如与 107.24s 相关的 60 个点,而不是预期的 60s。
我可能可以通过每秒启动一个新的 ping 进程来解决这个问题,但这是一个非常不优雅的解决方案,而且开销不可忽略(至少在 Windows 上),而且我会喜欢避免需要手动发送 ICMP 数据包,因为那样我需要 运行 我的实用程序具有管理权限。
对于开放式 "is there anything" 问题表示歉意,但在一无所知的情况下,我不知道该问什么。
编辑:我已经尝试将超时从 999 毫秒降低到 900 毫秒甚至 750 毫秒,但是 PING.EXE 要么不遵守它,要么这样做不正确,因为这种情况一直在发生,尽管问题是多 更小 - 在最大网络负载下 +15s。可能足够接受,但是...
如果您需要在 Python 中精确复制 ping
,您需要管理员权限才能将 ICMP 数据包发送到主机。
您可以改为依赖 TCP 数据包并点击 http 端口。这不需要管理员权限。它也相当准确,但它更准确地描述了网络浏览能力。
简化this TCP ping代码:
import sys
import socket
import time
import signal
from timeit import default_timer as timer
host = "google.com"
port = 80
# Default to 10000 connections max
maxCount = 10000
count = 0
# Pass/Fail counters
passed = 0
failed = 0
def getResults():
""" Summarize Results """
lRate = 0
if failed != 0:
lRate = failed / (count) * 100
lRate = "%.2f" % lRate
print("\nTCP Ping Results: Connections (Total/Pass/Fail): [{:}/{:}/{:}] (Failed: {:}%)".format((count), passed, failed, str(lRate)))
def signal_handler(signal, frame):
""" Catch Ctrl-C and Exit """
getResults()
sys.exit(0)
# Register SIGINT Handler
signal.signal(signal.SIGINT, signal_handler)
# Loop while less than max count or until Ctrl-C caught
while count < maxCount:
# Increment Counter
count += 1
success = False
# New Socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 1sec Timeout
s.settimeout(1)
# Start a timer
s_start = timer()
# Try to Connect
try:
s.connect((host, int(port)))
s.shutdown(socket.SHUT_RD)
success = True
# Connection Timed Out
except socket.timeout:
print("Connection timed out!")
failed += 1
except OSError as e:
print("OS Error:", e)
failed += 1
# Stop Timer
s_stop = timer()
s_runtime = "%.2f" % (1000 * (s_stop - s_start))
if success:
print("Connected to %s[%s]: tcp_seq=%s time=%s ms" % (host, port, (count-1), s_runtime))
passed += 1
# Sleep for 1sec
if count < maxCount:
time.sleep(1)
# Output Results if maxCount reached
getResults()
一旦你可以创建一个 'ping' 你就可以将每次的结果/ping 结果封装到一个对象流中来显示你想要的。
aping支持这个。
以下示例每秒向 8.8.8.8 发送 3 个间隔 50 毫秒的 ping 请求,超时为 800 毫秒。
aping -n 3 -i 50 -w 800 -sleep 1 8.8.8.8
每次扫描 (-sleep 1) 的时间根据先前扫描的执行时间进行调整,以确保恒定的一秒节奏,无论响应时间/所有三个数据包均未响应的情况如何。
Windows不需要管理员。
我一直在编写和维护一个个人使用的实用程序,它每隔 1 秒 ping 8.8.8.8
并显示一个指示网络状况的 pip 已有一段时间了,而且由于我的网络似乎具有固有的~1.7% 的数据包丢失,我使它 运行 两个 PING.EXE 进程并行,超时为 950 毫秒,以便区分随机丢失的数据包和短暂的中断(这些也会发生)
现在,问题是:我对 ping 没有足够的控制,而且我 运行 遇到了 PING.EXE 的 1 秒延迟不准确的问题足够了,随着流程的漂移,我随机得到 heavily degenerate 个案例,例如与 107.24s 相关的 60 个点,而不是预期的 60s。
我可能可以通过每秒启动一个新的 ping 进程来解决这个问题,但这是一个非常不优雅的解决方案,而且开销不可忽略(至少在 Windows 上),而且我会喜欢避免需要手动发送 ICMP 数据包,因为那样我需要 运行 我的实用程序具有管理权限。
对于开放式 "is there anything" 问题表示歉意,但在一无所知的情况下,我不知道该问什么。
编辑:我已经尝试将超时从 999 毫秒降低到 900 毫秒甚至 750 毫秒,但是 PING.EXE 要么不遵守它,要么这样做不正确,因为这种情况一直在发生,尽管问题是多 更小 - 在最大网络负载下 +15s。可能足够接受,但是...
如果您需要在 Python 中精确复制 ping
,您需要管理员权限才能将 ICMP 数据包发送到主机。
您可以改为依赖 TCP 数据包并点击 http 端口。这不需要管理员权限。它也相当准确,但它更准确地描述了网络浏览能力。
简化this TCP ping代码:
import sys
import socket
import time
import signal
from timeit import default_timer as timer
host = "google.com"
port = 80
# Default to 10000 connections max
maxCount = 10000
count = 0
# Pass/Fail counters
passed = 0
failed = 0
def getResults():
""" Summarize Results """
lRate = 0
if failed != 0:
lRate = failed / (count) * 100
lRate = "%.2f" % lRate
print("\nTCP Ping Results: Connections (Total/Pass/Fail): [{:}/{:}/{:}] (Failed: {:}%)".format((count), passed, failed, str(lRate)))
def signal_handler(signal, frame):
""" Catch Ctrl-C and Exit """
getResults()
sys.exit(0)
# Register SIGINT Handler
signal.signal(signal.SIGINT, signal_handler)
# Loop while less than max count or until Ctrl-C caught
while count < maxCount:
# Increment Counter
count += 1
success = False
# New Socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 1sec Timeout
s.settimeout(1)
# Start a timer
s_start = timer()
# Try to Connect
try:
s.connect((host, int(port)))
s.shutdown(socket.SHUT_RD)
success = True
# Connection Timed Out
except socket.timeout:
print("Connection timed out!")
failed += 1
except OSError as e:
print("OS Error:", e)
failed += 1
# Stop Timer
s_stop = timer()
s_runtime = "%.2f" % (1000 * (s_stop - s_start))
if success:
print("Connected to %s[%s]: tcp_seq=%s time=%s ms" % (host, port, (count-1), s_runtime))
passed += 1
# Sleep for 1sec
if count < maxCount:
time.sleep(1)
# Output Results if maxCount reached
getResults()
一旦你可以创建一个 'ping' 你就可以将每次的结果/ping 结果封装到一个对象流中来显示你想要的。
aping支持这个。
以下示例每秒向 8.8.8.8 发送 3 个间隔 50 毫秒的 ping 请求,超时为 800 毫秒。
aping -n 3 -i 50 -w 800 -sleep 1 8.8.8.8
每次扫描 (-sleep 1) 的时间根据先前扫描的执行时间进行调整,以确保恒定的一秒节奏,无论响应时间/所有三个数据包均未响应的情况如何。
Windows不需要管理员。