试图将数据从 python 中的一个脚本发送到另一个脚本
trying to send data from one script in python to another script
我正在尝试编写一个脚本,以便在无法看到 IP 地址时通过电报发送一条消息,让我知道哪台计算机处于离线状态
我已经能够让电报端工作,但我无法从测试 ip 地址的主脚本传递数据,目前我在那里有测试数据,但我想要它用计算机名称发送错误
main.py
import socket
import ssl
from datetime import datetime
import pickle
import subprocess
import platform
class Server:
def __init__(self, name, port, connection, priority):
self.name = name
self.port = port
self.connection = connection.lower()
self.priority = priority.lower()
self.history = []
self.alert = False
def check_connection(self):
msg = ""
success = False
now = datetime.now().strftime("%d-%m-%Y %H:%M")
try:
if self.connection == "plain":
socket.create_connection((self.name, self.port), timeout=10)
msg = f"{self.name} is up. On port {self.port} with {self.connection}"
success = True
self.alert = False
elif self.connection == "ssl":
ssl.wrap_socket(socket.create_connection((self.name, self.port), timeout=10))
msg = f"{self.name} is up. On port {self.port} with {self.connection}"
success = True
self.alert = False
else:
if self.ping():
msg = f"{self.name} is up. On port {self.port} with {self.connection}"
success = True
self.alert = False
except socket.timeout:
msg = f"server: {self.name} timeout. On port {self.port}"
except (ConnectionRefusedError, ConnectionResetError) as e:
msg = f"server: {self.name} {e}"
except Exception as e:
msg = f"No Clue??: {e}"
if success == False and self.alert == False:
# Send Alert
self.alert = True
import tg_start
tg_start.send_message("Happy days")
self.create_history(msg, success, now)
def create_history(self, msg, success, now):
history_max = 100
self.history.append((msg, success, now))
while len(self.history) > history_max:
self.history.pop(0)
def ping(self):
try:
output = subprocess.check_output("ping -{} 1 {}".format('n' if platform.system().lower(
) == "windows" else 'c', self.name), shell=True, universal_newlines=True)
if 'unreachable' in output:
return False
else:
return True
except Exception:
return False
if __name__ == "__main__":
try:
servers = pickle.load(open("servers.pickle", "rb"))
except:
servers = [
# Server("ifmc-repserver", 80, "plain", "high"),
# Server("ifmc-repserver", 80, "plain", "high"),
# Server("ifmc-repserver", 465, "ssl", "high"),
# Server("ifmc-repserver", 80, "ping", "high"),
Server("ifmc-repserver", 80, "ping", "high")
]
for server in servers:
server.check_connection()
print(len(server.history))
print(server.history[-1])
pickle.dump(servers, open("servers.pickle", "wb"))
和tg_start.py
import requests
message = "global"
alert = ""
def send_message(text):
global alert
global message
print ("this is text messsage" + " " + text)
#text = "Superman"
alert = text
print("Sending ALERT ...")
token = "token"
chat_id = "chat_id"
print("test message" + " " + alert)
url_req = "https://api.telegram.org/bot" + token + "/sendMessage" + "?chat_id=" + chat_id + "&text=" + alert
print(url_req)
#results = requests.get(url_req)
results = requests.post(url_req) # this request is a post, not a get
print(results.json())
# text = "my name" + text
send_message(alert)
您的代码略有改动,sendMessage 需要一个 POST 请求,而不是 GET 请求。
def send_message(text):
global alert
global message
print ("this is text messsage" + " " + text)
alert = text
print("Sending ALERT ...")
token = "token"
chat_id = "chat_id"
print("test message" + " " + alert)
url_req = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={alert}"
print(url_req)
results = requests.post(url_req) # this request is a post, not a get
print(results.json())
# text = "my name" + text
我正在尝试编写一个脚本,以便在无法看到 IP 地址时通过电报发送一条消息,让我知道哪台计算机处于离线状态
我已经能够让电报端工作,但我无法从测试 ip 地址的主脚本传递数据,目前我在那里有测试数据,但我想要它用计算机名称发送错误
main.py
import socket
import ssl
from datetime import datetime
import pickle
import subprocess
import platform
class Server:
def __init__(self, name, port, connection, priority):
self.name = name
self.port = port
self.connection = connection.lower()
self.priority = priority.lower()
self.history = []
self.alert = False
def check_connection(self):
msg = ""
success = False
now = datetime.now().strftime("%d-%m-%Y %H:%M")
try:
if self.connection == "plain":
socket.create_connection((self.name, self.port), timeout=10)
msg = f"{self.name} is up. On port {self.port} with {self.connection}"
success = True
self.alert = False
elif self.connection == "ssl":
ssl.wrap_socket(socket.create_connection((self.name, self.port), timeout=10))
msg = f"{self.name} is up. On port {self.port} with {self.connection}"
success = True
self.alert = False
else:
if self.ping():
msg = f"{self.name} is up. On port {self.port} with {self.connection}"
success = True
self.alert = False
except socket.timeout:
msg = f"server: {self.name} timeout. On port {self.port}"
except (ConnectionRefusedError, ConnectionResetError) as e:
msg = f"server: {self.name} {e}"
except Exception as e:
msg = f"No Clue??: {e}"
if success == False and self.alert == False:
# Send Alert
self.alert = True
import tg_start
tg_start.send_message("Happy days")
self.create_history(msg, success, now)
def create_history(self, msg, success, now):
history_max = 100
self.history.append((msg, success, now))
while len(self.history) > history_max:
self.history.pop(0)
def ping(self):
try:
output = subprocess.check_output("ping -{} 1 {}".format('n' if platform.system().lower(
) == "windows" else 'c', self.name), shell=True, universal_newlines=True)
if 'unreachable' in output:
return False
else:
return True
except Exception:
return False
if __name__ == "__main__":
try:
servers = pickle.load(open("servers.pickle", "rb"))
except:
servers = [
# Server("ifmc-repserver", 80, "plain", "high"),
# Server("ifmc-repserver", 80, "plain", "high"),
# Server("ifmc-repserver", 465, "ssl", "high"),
# Server("ifmc-repserver", 80, "ping", "high"),
Server("ifmc-repserver", 80, "ping", "high")
]
for server in servers:
server.check_connection()
print(len(server.history))
print(server.history[-1])
pickle.dump(servers, open("servers.pickle", "wb"))
和tg_start.py
import requests
message = "global"
alert = ""
def send_message(text):
global alert
global message
print ("this is text messsage" + " " + text)
#text = "Superman"
alert = text
print("Sending ALERT ...")
token = "token"
chat_id = "chat_id"
print("test message" + " " + alert)
url_req = "https://api.telegram.org/bot" + token + "/sendMessage" + "?chat_id=" + chat_id + "&text=" + alert
print(url_req)
#results = requests.get(url_req)
results = requests.post(url_req) # this request is a post, not a get
print(results.json())
# text = "my name" + text
send_message(alert)
您的代码略有改动,sendMessage 需要一个 POST 请求,而不是 GET 请求。
def send_message(text):
global alert
global message
print ("this is text messsage" + " " + text)
alert = text
print("Sending ALERT ...")
token = "token"
chat_id = "chat_id"
print("test message" + " " + alert)
url_req = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={alert}"
print(url_req)
results = requests.post(url_req) # this request is a post, not a get
print(results.json())
# text = "my name" + text