网络聊天程序
学习笔记
笔记比较杂,学习的时候哪不懂学哪,没有很好的规划,随便记一些
socket的几个基本方法
1.socket.socket()是 Python 的 socket 模块中的一个函数,用于创建一个新的套接字对象。套接字(socket)是支持 TCP/IP 和 UDP/IP 协议的网络通信的端点。在基于网络的编程中,套接字用于在不同的计算机之间或同一台计算机上的不同进程之间发送和接收数据。 socket.socket() 函数可以接受几个参数,但最常用的是以下两个:
family:套接字家族可以是 AF_INET、AF_INET6、AF_UNIX 等。其中 AF_INET 是用于网络协议的 IPv4 地址空间,而 AF_INET6 对应 IPv6。AF_UNIX 用于同一台机器上的进程间通信。type:套接字类型通常是 SOCK_STREAM 或 SOCK_DGRAM。SOCK_STREAM 表示套接字是基于 TCP 的流式套接字,它提供顺序、可靠、双向、基于连接的字节流。SOCK_DGRAM 表示套接字是基于 UDP 的数据报套接字,它是无连接的,发送和接收数据包。
2.bind() 函数是 Python socket 模块中的一个重要函数,用于将套接字绑定到特定的地址和端口号。在网络编程中,服务器端需要在其网络接口上指定一个端口号来监听客户端的连接请求,而 bind() 函数就是用来完成这个任务的。
3.listen 函数在 Python 的 socket 模块中用于初始化 TCP 服务器端的套接字以接受连接。当套接字设为监听模式后,它就可以接收客户端的连接请求。 以下是 listen 函数的基本用法:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', 10000)
sock.bind(server_address)
# 开始监听传入连接的尝试
sock.listen(backlog)
sock 是一个通过 socket.socket() 创建的套接字对象。server_address 是服务器的主机名和端口号。backlog 参数指定了队列中最多可以挂起的连接数。队列满时,服务器将拒绝新的连接。backlog 参数通常是可选的;如果不提供,会使用默认值,这个默认值依赖于操作系统。一个较大的 backlog 值可以增加等待接受的传入连接数,但是确切的行为和最大可能值取决于操作系统的实现。 当 listen 被调用之后,服务器端套接字 sock 就可以接受新的连接了。在 Python 中,接受新的连接是通过 accept 方法完成的,它会阻塞服务器进程直到客户端连接到服务器上:
# 循环以接受客户端的连接
while True:
# 接受一个新的连接
connection, client_address = sock.accept()
try:
# 在这里处理连接
finally:
# 清理连接
connection.close()
每次 accept 调用成功后,它返回一个新的套接字对象 connection 和客户端的地址 client_address。新的套接字 connection 用于与刚刚连接的客户端通信,而服务器端的监听套接字 sock 继续监听其他的连接请求。
listen 只在服务器端的 TCP 套接字编程中使用,因为它设置套接字以接受(即监听)传入的连接尝试。UDP 套接字,由于是无连接的,没有 listen 和 accept 方法,而是直接使用 recvfrom 方法来监听和接收数据。
5.connect():在客户端,可以使用 connect() 方法连接到服务器的地址和端口。这样客户端就能够与服务器建立连接,开始进行通信。
6.send() 和 recv():在已经建立连接的套接字上分别用于向对端发送数据和从对端接收数据。这两个方法常用于在聊天程序中进行实际的消息发送和接收操作。
实现多客户多功能
使用 Python 的 threading 模块来创建一个新的线程,目的是为了并发地处理客户端的连接。这通常发生在服务器已经接受客户端连接之后,服务器会为每个客户端启动一个新的线程以便并发处理多个连接。
简易服务端代码:
import socket
import threading
def client_handler(client_socket):
# 持续监听来自客户的消息
while True:
try:
# 尝试读取客户端发送的消息
message = client_socket.recv(1024)
if message:
# 解码并打印消息
print("[Client]:", message.decode('utf-8'))
# 添加代码向客户端发送响应(根据需求添加内容即可)
else:
# 没有消息意味着客户端可能已断开连接
break
except:
# 捕获异常,处理客户端断开连接的情况
break
# 断开客户端连接
client_socket.close()
def main():
# 设置服务器地址和监听端口
server_address = ('localhost', 12345)
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定地址到套接字
server_socket.bind(server_address)
# 开始监听连接
server_socket.listen(5)
print("Server is listening on port 12345...")
# 持续接受客户端连接
while True:
# 接受一个新的连接
client_socket, addr = server_socket.accept()
print(f"Accepted connection from {addr}")
# 为每个客户端创建一个新的线程来处理通信
client_thread = threading.Thread(target=client_handler, args=(client_socket,))
client_thread.start()
# 仅当直接运行此文件时才会调用main函数
if __name__ == "__main__":
main()
简易客户端代码:
import socket
import threading
def receive_message(client_socket):
while True:
try:
message = client_socket.recv(1024).decode('utf-8')
print(message)
except:
# 出现异常说明连接已断开
break
def send_message(client_socket):
while True:
message = input()
client_socket.send(message.encode('utf-8'))
# 设置服务器地址和端口
server_address = ('localhost', 12345)
# 创建客户端套接字
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 连接到服务器
client_socket.connect(server_address)
# 创建接收消息的线程
receive_thread = threading.Thread(target=receive_message, args=(client_socket,))
receive_thread.start()
# 创建发送消息的线程
send_thread = threading.Thread(target=send_message, args=(client_socket,))
send_thread.start()
下面是流程图:
功能强化:
服务器端消息广播:
修改 client_handler 函数以广播消息到所有连接的客户端。
def broadcast_message(sender_socket, message):
print("broadcast")
sender_username = [username for username, socket in client_usernames_to_sockets.items() if socket == sender_socket][0]
for client in client_sockets:
# if client != sender_socket:
try:
client.send(f"{sender_username}: {message}".encode('utf-8'))
except Exception as e:
print(f"Error broadcasting message: {e}")
client.close()
client_sockets.remove(client)
用户认证:
服务器需要一个用户列表来验证用户名和密码。
while True:
client_socket, addr = server_socket.accept()
try:
str = client_socket.recv(1024).decode('utf-8')
username,password = str.split(":")
# print(f"reserve from {addr}, Username: {username}")
if users.get(username) == password:
print(f"Accepted connection from {addr}, Username: {username}")
client_sockets.append(client_socket)
client_usernames_to_sockets[username] = client_socket
client_thread = threading.Thread(target=client_handler, args=(client_socket, username))
client_thread.daemon = True
client_thread.start()
client_socket.send("Login successful.".encode('utf-8'))
broadcast_message(client_socket, f"{username} has joined the chat.")
else:
client_socket.send("Username or Password incorrect.".encode('utf-8'))
client_socket.close()
except Exception as e:
print(f"Error accepting a connection: {e}")
退出方式:
客户端发送特定命令(如/quit)来关闭连接,并通知服务器。服务器接受信息时特殊判断即可。
用户列表:
服务器维护一个当前连接的用户列表。
# 用户认证信息
users = {
"jeazim": "1",
"zh1ng": "2",
"zhuzhu":"3"
}
私聊:
允许用户发送消息给特定的用户。与退出类似,服务器接收信息时分类处理,但和广播一样需要新增一个私聊方法:
def private_message(sender_socket, target_username, private_msg):
print("private")
if target_username in client_usernames_to_sockets:
target_socket = client_usernames_to_sockets[target_username]
try:
sender_username = [username for username, socket in client_usernames_to_sockets.items() if socket == sender_socket][0]
message_to_send = f"[Private] {sender_username}: {private_msg}"
target_socket.send(message_to_send.encode('utf-8'))
message_self = f"[Private] to {sender_username}: {private_msg}"
sender_username.send(message_self)
except Exception as e:
print(f"Error sending private message: {e}")
target_socket.close()
client_sockets.remove(target_socket)
del client_usernames_to_sockets[target_username]
else:
sender_socket.send(f"User {target_username} not found or not connected.".encode('utf-8'))
图形界面:
这需要使用图形界面库tkinter来实现,这个懂的不多,随便用用
最终代码:
服务端:
import socket
import threading
# 用户认证信息
users = {
"jeazim": "1",
"zh1ng": "2",
"zhuzhu":"3"
}
# 当前连接的用户列表
client_sockets = []
client_usernames_to_sockets = {}
# 发送私聊消息
def private_message(sender_socket, target_username, private_msg):
print("private")
if target_username in client_usernames_to_sockets:
target_socket = client_usernames_to_sockets[target_username]
try:
sender_username = [username for username, socket in client_usernames_to_sockets.items() if socket == sender_socket][0]
message_to_send = f"[Private] {sender_username}: {private_msg}"
target_socket.send(message_to_send.encode('utf-8'))
message_self = f"[Private] to {target_username}: {private_msg}"
sender_socket.send(message_self.encode('utf-8'))
except Exception as e:
print(f"Error sending private message: {e}")
target_socket.close()
client_sockets.remove(target_socket)
del client_usernames_to_sockets[target_username]
else:
sender_socket.send(f"User {target_username} not found or not connected.".encode('utf-8'))
# 广播消息
def broadcast_message(sender_socket, message):
print("broadcast")
sender_username = [username for username, socket in client_usernames_to_sockets.items() if socket == sender_socket][0]
for client in client_sockets:
# if client != sender_socket:
try:
client.send(f"{sender_username}: {message}".encode('utf-8'))
except Exception as e:
print(f"Error broadcasting message: {e}")
client.close()
client_sockets.remove(client)
# 处理客户端连接
def client_handler(client_socket, username):
while True:
try:
message = client_socket.recv(1024).decode('utf-8')
if message:
print(message)
if message.startswith("/quit"):
raise Exception("Client requested disconnection")
elif message.startswith("/pm"):
parts = message.split(" ", 2)
if len(parts) > 2:
target_username, private_msg = parts[1], parts[2]
private_message(client_socket, target_username, private_msg)
else:
client_socket.send("Private message format is incorrect. Use /pm
else:
broadcast_message(client_socket, message)
else:
raise Exception("Empty message received")
except Exception as e:
print(f"Client {username} disconnected: {e}")
break
client_socket.close()
client_sockets.remove(client_socket)
del client_usernames_to_sockets[username]
broadcast_message(client_socket, f"{username} has left the chat.")
# 主函数
def main():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('localhost', 12345))
server_socket.listen(5)
print("Server is listening on port 12345...")
while True:
client_socket, addr = server_socket.accept()
try:
str = client_socket.recv(1024).decode('utf-8')
username,password = str.split(":")
# print(f"reserve from {addr}, Username: {username}")
if users.get(username) == password:
print(f"Accepted connection from {addr}, Username: {username}")
client_sockets.append(client_socket)
client_usernames_to_sockets[username] = client_socket
client_thread = threading.Thread(target=client_handler, args=(client_socket, username))
client_thread.daemon = True
client_thread.start()
client_socket.send("Login successful.".encode('utf-8'))
broadcast_message(client_socket, f"{username} has joined the chat.")
else:
client_socket.send("Username or Password incorrect.".encode('utf-8'))
client_socket.close()
except Exception as e:
print(f"Error accepting a connection: {e}")
if __name__ == "__main__":
main()
客户端
import socket # 导入socket库以进行网络通信
import threading # 导入threading库以支持多线程
import tkinter as tk # 导入tkinter库以构建GUI界面
from tkinter import messagebox, scrolledtext # 导入messagebox和scrolledtext来显示信息和滚动文本
# 定义客户端类,用于创建聊天客户端对象
class ChatClient:
# 初始化方法,设置服务器地址和端口
def __init__(self, host, port):
self.host = host # 服务器的IP地址
self.port = port # 服务器的端口号
self.sock = None # 将用于通信的socket初始化为None
self.username = None # 用户名初始化为None,稍后用户输入
# 设置GUI界面
self.root = tk.Tk() # 创建Tkinter根窗口对象
self.root.title("Chat Client") # 设置窗口标题
# 创建主框架,用于容纳其他控件
self.main_frame = tk.Frame(self.root)
self.main_frame.pack(padx=10, pady=10) # 放置主框架并设置边距
# 创建滚动文本框用于显示聊天内容
self.chat_area = scrolledtext.ScrolledText(self.main_frame)
self.chat_area.pack() # 放置滚动文本框
self.chat_area.config(state=tk.DISABLED) # 初始设置为不可编辑,只用于显示消息
# 创建一个Entry控件用于用户输入消息
self.msg_entry = tk.Entry(self.main_frame, width=50)
self.msg_entry.pack(side=tk.LEFT, padx=(0, 10)) # 放置消息输入框
self.msg_entry.bind("
# 创建一个按钮用于发送消息
self.send_button = tk.Button(self.main_frame, text="Send", command=self.send)
self.send_button.pack(side=tk.RIGHT) # 放置发送按钮
# 设置窗口关闭时的回调函数
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
# 主运行程序,显示登录窗口并开始GUI循环
def run(self):
self.login_window() # 显示登录窗口
self.root.mainloop() # 开始Tkinter事件循环
# 创建并显示登录窗口让用户输入用户名和密码
def login_window(self):
self.login_win = tk.Toplevel() # 创建一个新的顶级窗口
self.login_win.title("Login") # 设置登录窗口标题
# 创建标签和输入框让用户输入用户名
tk.Label(self.login_win, text="Username:").pack(side=tk.TOP, fill=tk.X, padx=10)
self.username_entry = tk.Entry(self.login_win) # 输入框用于输入用户名
self.username_entry.pack(side=tk.TOP, fill=tk.X, padx=10)
# 创建标签和输入框让用户输入密码
tk.Label(self.login_win, text="Password:").pack(side=tk.TOP, fill=tk.X, padx=10)
self.password_entry = tk.Entry(self.login_win, show='*') # 输入框用于输入密码,隐藏显示
self.password_entry.pack(side=tk.TOP, fill=tk.X, padx=10)
# 创建一个登录按钮,点击后会触发登录方法
self.login_button = tk.Button(self.login_win, text="Login", command=self.login)
self.login_button.pack(side=tk.TOP, pady=5)
# 登录方法,用于处理用户登录
def login(self):
self.username = self.username_entry.get() # 获取输入的用户名
password = self.password_entry.get() # 获取输入的密码
# 尝试连接到服务器并发送登录信息
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建socket对象
try:
self.sock.connect((self.host, self.port)) # 连接到服务器
# 错误方法:
# self.sock.send(self.username.encode('utf-8')) # 发送用户名
# self.sock.send(password.encode('utf-8')) # 发送密码
# 更正:
self.sock.send((self.username + ":"+password).encode('utf-8')) # 发送用户名密码,加入:用于中断
# 接收服务器的登录响应信息
response = self.sock.recv(1024).decode('utf-8')
if response == 'Login successful.': # 如果登录成功
self.login_win.destroy() # 关闭登录窗口
self.root.title(self.username) # 设置窗口标题
# 创建并启动一个新线程用于接收消息
receive_thread = threading.Thread(target=self.receive)
receive_thread.daemon = True # 设置为守护线程
receive_thread.start()
else:
# 如果登录失败,显示错误信息并关闭socket
messagebox.showerror("Login failed", response)
self.sock.close()
self.sock = None
except Exception as e:
# 如果连接失败,显示错误信息
messagebox.showerror("Connection failed", str(e))
self.sock = None
# 接收消息的方法,运行在独立线程中
def receive(self):
while self.sock: # 当socket存在时
try:
msg = self.sock.recv(1024).decode('utf-8') # 接收消息
self.display_message(msg) # 在聊天区域显示消息
except OSError: # 如果发生错误,则退出循环
break
# 发送消息的方法
def send(self, event=None):
msg = self.msg_entry.get() # 获取输入框中的消息
self.msg_entry.delete(0, tk.END) # 清空输入框
if self.sock: # 如果socket存在
try:
self.sock.send(msg.encode('utf-8')) # 发送消息
if msg == "/quit": # 如果消息是退出命令
self.sock.close() # 关闭socket
self.sock = None
self.root.quit() # 退出GUI程序
except Exception as e:
self.display_message(f"Error: {str(e)}") # 显示错误信息
# 显示消息的方法
def display_message(self, msg):
self.chat_area.config(state=tk.NORMAL) # 设置聊天区域为可编辑状态
self.chat_area.insert(tk.END, msg + '\n') # 添加消息到聊天区域
self.chat_area.yview(tk.END) # 自动滚动到最新消息
self.chat_area.config(state=tk.DISABLED) # 设置聊天区域为不可编辑
# 窗口关闭时的处理方法
def on_close(self):
if self.sock:
# 发送退出命令到服务器
self.sock.send("/quit".encode('utf-8'))
self.sock.close() # 关闭socket
self.root.destroy() # 销毁根窗口
if __name__ == "__main__":
client = ChatClient('localhost', 12345) # 使用本地地址和端口12345
client.run() # 运行客户端
实验结果
运行服务端
运行客户端
登录界面:
服务端存有用户账号密码,登录时进行匹配,正确即可成功登录进入主页,否则提示失败
主页
多用户登录
历史信息作为隐私不显示,所以三人登录中后面来的zh1ng只显示一条信息
群聊(广播信息)
发送给群内所有人。zh1ng发送你们好,群内全部收到
私聊
只发送给个人。jeazim给zhuzhu私聊,只有zhuzhu看到,zh1ng看不到
退出
发送/quit即可离线
over
推荐链接
发表评论