python 中的多线程套接字编程

Multithreaded socket programming in python

我正在使用 python 进行 client/server 套接字编程。我可以毫无问题地设置它,但是有了线程,我迷路了。我不知道如何完成那部分。我也查看了 python 文档,但找不到我期望的内容。

我想要实现的是 - 服务器正在接受来自客户端的连接并监控目录以创建任何文件。我正在尝试 运行 它们并行,但它没有用。

#!/usr/bin/python           # This is server.py file

import socket               # Import socket module
import sys,os
import time
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler

class MyHandler(PatternMatchingEventHandler):
    patterns = ["*.challenger", "*.challenger"]

    def process(self, event):
        """
        event.event_type
            'modified' | 'created' | 'moved' | 'deleted'
        event.is_directory
            True
        event.src_path
            /home/abcd/Maildir/new/
        """
        # the file will be processed there
        print event.src_path, event.event_type  # print now only for degug

    def on_modified(self, event):
        self.process(event)
        flag = '1'
        print flag

    def on_created(self, event):
        self.process(event)

    def on_any_event(self,event):
        if event.event_type == 'created':
            send()


s = socket.socket()         # Create a socket object
host = socket.gethostname() # Get local machine name
port = 12346                # Reserve a port for your service.
s.bind((host, port))        # Bind to the port
flag = '0'
s.listen(5)                 # Now wait for client connection.
while True:
    c, addr = s.accept()     # Establish connection with client.
    print 'Got connection from', addr

#   c.send('Thank you for connecting')
#   c.send(flag)        

    if __name__ == '__main__':

       args = sys.argv[1:]
       observer = Observer()
       observer.schedule(MyHandler(), path='/home/abcd/Maildir/new')
       observer.start()

    try:
        while True:
          time.sleep(1)

    except KeyboardInterrupt:
         observer.stop()

observer.join()


c.send(flag)
c.close()                # Close the connection

我发布的代码没有线程,因为有线程它根本不起作用。

我正在使用 watchdog 来监控 /home/abcd/Maildir/new 来监控创建的任何新电子邮件文件,如果创建了它,我想向客户端发送确认(在本例中为标志),电子邮件 -邮件已收到。套接字连接和看门狗单独工作很好,但我不知道为什么他们不能一起工作? :(

如何将这些方法放在不同的线程中,以便它们 运行 并行?感谢您的帮助。

编辑:克雷格输入后的代码:

import threading
import socket               # Import socket module
import sys,os
import time
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler

def send_message(conn, flag):
    conn.send(flag)


class MyHandler(PatternMatchingEventHandler):
    patterns = ["*.challenger", "*.challenger"]

    def __init__(self, conn, *args, **kwargs):
        super(MyHandler, self).__init__(*args, **kwargs)
        self.conn_list = conn

    def process(self, event):
        """
        event.event_type
            'modified' | 'created' | 'moved' | 'deleted'
        event.is_directory
            True
        event.src_path
            /home/abcd/Maildir/new/
        """
        print "I reached here too"
        time.sleep(5)
        print event.src_path, event.event_type  # print now only for degug

    def on_created(self, event):
        flag = '1'
        threads = [threading.Thread(target=send_message, args=(conn, flag)) for conn in self.conn_list]
        for t in threads:
            t.start()
        print "on_created"
        self.process(event)
        for t in threads:
            t.join()


from collections import deque
conn_list = deque()

if __name__ == '__main__':
    s = socket.socket()         # Create a socket object
    host = socket.gethostname()  # Get local machine name
    port = 12346                # Reserve a port for your service.
    s.bind((host, port))        # Bind to the port
    flag = '0'
    s.listen(5)                 # Now wait for client connection.
    observer = Observer()
    observer.schedule(MyHandler(conn_list), path='/home/abcd/Maildir/new/')
    observer.start()
    print "Before True"
while True:
     try:
        c, addr = s.accept()     # Establish connection with client.
        print 'Got connection from', addr
        conn_list.append(c)
        time.sleep(1)
        print "I started Observer"
     except KeyboardInterrupt:
        observer.stop()
        while conn_list:
            conn_list.pop().close()
        print "Connections closed"
        break

observer.join()

编辑 2:

当我从客户端向服务器发送电子邮件时,我第一次得到正确的结果但是当我再次连接时,输出很奇怪("In message" 打印两次,标志值也打印两次并且我收到破管错误)

根据克雷格的输入修改Server.py:

import threading
import socket               # Import socket module
import sys,os
import time
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler


def send_message(conn, flag):
    print "In message"
    conn.send(flag)
    print "flag"+flag

class MyHandler(PatternMatchingEventHandler):
    patterns = ["*.challenger", "*.challenger"]

    def __init__(self, conn, *args, **kwargs):
        super(MyHandler, self).__init__(*args, **kwargs)
        self.conn_list = conn

    def process(self, event):
        time.sleep(5)
        print "In process"
        print event.src_path, event.event_type  # print now only for degug

    def on_created(self, event):
        flag = '1'
        print "before process event"
        self.process(event)
        print "after process, before thread target"
        threads = [threading.Thread(target=send_message, args=(conn, flag)) for$
        flag = '0'
        for t in threads:
            t.start()
#        print "Before process(Event)"
#        self.process(event)
        print "after process event"
        for t in threads:
            t.join()

s = socket.socket()         # Create a socket object
host = socket.gethostname()  # Get local machine name
port = 12345                # Reserve a port for your service.
s.bind((host, port))        # Bind to the port
flag = '0'
s.listen(5)                 # Now wait for client connection.
print "flag before: "+flag

from collections import deque
conn_list = deque()

if __name__ == '__main__':
    observer = Observer()
    observer.start()
    args = sys.argv[1:]
    observer.schedule(MyHandler(conn_list), path='/home/abcd/Maildir/new')
while True:
    try:
        c, addr = s.accept()     # Establish connection with client.
        print 'Got connection from', addr
        flag = '0'
        print flag
        conn_list.append(c)

    except KeyboardInterrupt:
        observer.stop()
        while conn_list:
            conn_list.pop().close()
        print "Connections closed"
        break

observer.join()

这是输出:

Got connection from ('72.123.27.223', 39844)
0
before process event
In process
/home/abcd/Maildir/new/1425403821.V801I2ac232cM275759.challenger created
after process, before thread target
In message
In message
flag1In message
 after process event

 flag1
Exception in thread Thread-7:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/paras/server42.py", line 11, in send_message
    conn.send(flag)
error: [Errno 32] Broken pipe

这是我的 client.py - 我想要做的就是检查从客户端到服务器的电子邮件延迟并从服务器获取响应。

#!/usr/bin/python           # This is client.py file
import sys,smtplib
import socket               # Import socket module
import threading,time
import urllib
import random
import datetime
#class myThread(threading.Thread):
#    def __init__ (self,name):
#        threading.Thread.__init__(self)
#   self.name=name

#    def run(self):
#        connection()    
#        print "Starting"+self.name

def connection():
    s = socket.socket()         # Create a socket object
    host = socket.gethostbyname('server.edu') # Get local machine name
    port = 12345                # Reserve a port for your service.
    try:
        s.connect((host, port))
#        print s.recv(1024)
        t1 = datetime.datetime.now()
        sendmail()
    t2 = datetime.datetime.now()
        print str(t1),str(t2) 
        print "Time taken to send e-mail from client to server: "+str(t2-t1)
        print "came back"
        flag = s.recv(1024)
        print flag
        if (flag=='1'):
            t3=datetime.datetime.now()   
        print "Time taken to receive response from server: "+str(t3-t2)
        s.close                     # Close the socket when done
    except KeyboardInterrupt:
        s.close

def sendmail():
    fromaddr = 'xyz@gmail.com'

    toaddrs = 'email@server.edu'
    url = "http://www.google.com"
    seq = str(random.randint(1,9))
    msg = 'URL: '+ url  + '\n'+'SEQ:'+seq+'\n'
    print "In sendmail"
#print msg

# Credentials (if needed)
    username = 'xyz@gmail.com'
    password = 'somepassword'
#The actual mail send
    server = smtplib.SMTP('smtp.gmail.com:587')
#server = smtplib.SMTP('localhost')
    server.starttls()
    server.login(username,password)
    server.sendmail(fromaddr, toaddrs, msg)
    server.quit()
    print "email sent"
    return

if __name__ == "__main__":
    connection()

请帮帮我。谢谢。

这并不完全符合您的要求,因为服务器不发送数据,但我认为它实现了您想要执行的操作。您可以传递连接对象,以便您可以将标志从 MyHandler 发送给它。

class MyHandler(PatternMatchingEventHandler):
    patterns = ["*.challenger", "*.challenger"]

    def __init__(self, conn, *args, **kwargs):
        super(MyHandler, self).__init__(*args, **kwargs)
        self.conn_list = conn

    def process(self, event):
        time.sleep(5)
        print event.src_path, event.event_type  # print now only for degug

    def on_created(self, event):
        self.process(event)
        c = self.conn_list.pop()
        c.send('1')
        c.close()



s = socket.socket()         # Create a socket object
host = socket.gethostname()  # Get local machine name
port = 12346                # Reserve a port for your service.
s.bind((host, port))        # Bind to the port
flag = '0'
s.listen(5)                 # Now wait for client connection.


from collections import deque
conn_list = deque()

if __name__ == '__main__':
    observer = Observer()
    observer.start()
    args = sys.argv[1:]
    observer.schedule(MyHandler(conn_list), path='/home/abcd/Maildir/new')
while True:
    try:
        c, addr = s.accept()     # Establish connection with client.
        print 'Got connection from', addr
        conn_list.append(c)

    except KeyboardInterrupt:
        observer.stop()
        while conn_list:
            conn_list.pop().close()
        print "Connections closed"
        break

observer.join()