服务器
from socket import *
import select
import sys
from threading import Thread
import os
import time
s = socket()
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind(('127.0.0.1', 8421))
s.listen(5)
rlist = [s]
wlist = []
xlist = []
user = {}
userlist = ['admin']
class XcqServer:
# 处理客户端退出请求
@classmethod
def quit(slef, r):
print(user[r][0] + "客户端退出")
for i in user:
if i is r:
pass
else:
i.send((user[r][0] + "已下线\n").encode())
userlist.remove[user[r][0]]
del user[r]
rlist.remove(r)
xlist.remove(r)
r.close()
# 处理用户获取成员列表
@classmethod
def lst(self, r):
lst = ','.join(userlist)
r.send(lst.encode())
# 处理客户端登录请求
@classmethod
def land(self, r, L):
name = L[1]
pwd = L[2]
with open('user', 'r') as fr:
while True:
a = fr.readline()
if not a:
r.send(")(**&^&%%*(".encode())
break
b = a[:(len(a) - 1)]
c = b.split(",")
if c[0] == name and c[1] == pwd:
r.send("*!&@*~((~^!^%".encode())
user[r] = L[1:4]
userlist.append(name)
for i in user:
if i is r:
pass
else:
i.send(("%s已上线\n" % name).encode())
return
# 处理客户端消息发送
@classmethod
def say(self, r):
for i in user:
if i is r:
pass
else:
i.send((user[r][0] + "说:" + data).encode())
# 处理用户注册请求
@classmethod
def sign(self, r, L):
name = L[1]
pwd = L[2]
np = name + "," + pwd
with open('user', 'r') as fr:
while True:
a = fr.readline()
b = a[:(len(a) - 1)]
c = b.split(",")
if not a:
break
if c[0] == L[1]:
r.send("^%#&^@*&^#%%@@@".encode())
return
with open('user', 'a') as f:
f.write(np)
f.write('\n')
user[r] = L[1:4]
print("%s已注册" % name)
r.send("**#(@*!))*#*(@!*&".encode())
return
# 管理员广播函数
def admin(s):
while True:
msg = input()
if not msg:
break
for i in user:
if i is s:
pass
else:
i.send(("管理员说:" + msg).encode())
while True:
try:
rs, ws, es = select.select(rlist, wlist, xlist)
for r in rs:
if r is s:
c, addr = r.accept()
print("连接到", addr)
rlist.append(c)
xlist.append(c)
t = Thread(target=admin, args=(s,))
t.start()
else:
data = r.recv(1024).decode()
namelst = data.split(" ")
if data == '@×&#还7@8#7':
XcqServer.quit(r)
elif namelst[0] == "*#*&!((#*!@@&&&":
XcqServer.land(r, namelst)
elif namelst[0] == "&@……%#sh!@#3@*&":
XcqServer.sign(r, namelst)
elif namelst[0] == "lstaaaaa":
XcqServer.lst(r)
else:
XcqServer.say(r)
for w in ws:
pass
for e in es:
if e is s:
s.close()
sys.exit(0)
else:
e.close()
rlist.remove(e)
xlist.remove(e)
except KeyboardInterrupt:
if user == []:
pass
else:
for i in user:
if i is s:
pass
i.send("!&$*#@%!*@^#$^!@&@5!&".encode())
print("服务器关闭")
s.close()
os._exit(0)
except Exception:
continue
from socket import *
import select
import