背景
需要创建一个ws客户端mock弹幕一直给直播间发送消息。于是使用了async with websockets.connect(uri) as websocket 创建的ws客户端发送消息。但是发现发送一会儿后,客户端就自动关闭了,曝出:“websockets.ConnectionClosedError, WebSocket连接关闭: no close frame received or sent”
即:websocket.connect(url) as websocket 返回的异步上下文管理器对象。发收到发送一会儿消息后,客户端自动化关闭链接,导致报错。为什么?
原理
ws是基于tcp协议的信令通道。使用流程:
1、ws握手:
客户端主动创建ws连接,例如通过url: ws://127.0.0.1:8888创建ws连接。
2、发送数据:
发送信令数据包,Server如果长时间没有收到数据,会主动断开ws连接。这应该就是背景中自动关闭连接的原因。所以如果不说固定频率时间的发送信息,可以发送心跳包活包。
验证发现每隔2s就发送信息,连接就会一直连接。
使用式例代码如下“式例代码”。
实现websocket客户端
方式1:用websocket.WebSocketApp创建。通过定义回调函数来处理不同的事件,如接受消息、发生错误、连接关闭等。
ws = websocket.WebSocketApp("ws_barrage_mock://example.com",
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_open=on_open)
在如下示例send_message()中进行死循环,在新开一个进程中进行发送消息。
方式2: 使用异步ws对象,同时进行发送心跳(目前代码里没有实现)
async with websockets.connect("ws_bk://127.0.0.1:8888") as websocket:
式例代码
ws_server.py
import asyncio
import websockets
# 保存已连接的客户端列表
connected_clients = set()
async def handle_websocket(websocket, path):
# 将新的客户端添加到已连接客户端列表
remote_address = websocket.remote_address
connected_clients.add(websocket)
print(f"新的客户端已连接:{remote_address}")
try:
# 接收客户端消息并发送回复
async for message in websocket:
print(f"收到客户端{remote_address}消息:{message}")
# 广播发送消息给所有客户端
await broadcast(message)
except websockets.ConnectionClosedError:
print(f"客户端{remote_address}连接关闭")
except Exception as e:
print(f"处理客户端时出现错误:{e}")
finally:
# 从已连接客户端列表中移除客户端
connected_clients.remove(websocket)
async def broadcast(message):
# 遍历所有已连接的客户端,并发送消息
for client in connected_clients:
await client.send(message)
async def main():
# 创建WebSocket服务器并注册处理程序
server = await websockets.serve(handle_websocket, "127.0.0.1", 8888)
# 运行服务器
await server.wait_closed()
# 运行主函数
asyncio.run(main())
web_client_webapp.py
import websocket
def on_message(ws, message):
print(f"Received message: {message}")
def on_error(ws, error):
print(f"Error: {error}")
def on_close(ws):
print("Connection closed")
def on_open(ws):
print("Connection opened")
ws.send("Hello, server!")
websocket.enableTrace(True) # 启用调试日志
ws = websocket.WebSocketApp("ws_barrage_mock://example.com",
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_open=on_open)
ws.run_forever()
web_client_async.py
import asyncio
import websockets
async def receive_msg(websocket):
while True:
reply = await websocket.recv()
print(f"收到服务器回复:{reply}")
async def send_message(websocket):
while True:
# message = ''
# while '1' not in message \
# and '2' not in message \
# and '3' not in message \
# and '4' not in message \
# and '5' not in message:
# message = input("请输入要发送的消息,输入\n"
# "(1)弹幕消息\n"
# "(2)点赞消息\n"
# "(3)进直播间\n"
# "(4)关注消息\n"
# "(5)礼物消息:\n")
# print(f'-->输入成功:{message}\n')
message = '1'
await websocket.send(pkg_msg(message))
print('发送消息:', pkg_msg(message))
await asyncio.sleep(2)
async def asyncio_tasks(url):
try:
async with websockets.connect(url) as websocket:
# 启动发送消息的任务
send_task = asyncio.create_task(send_message(websocket))
# 启动接收消息的任务
receive_task = asyncio.create_task(receive_msg(websocket))
# 等待两个任务完成
await asyncio.gather(send_task, receive_task)
except websockets.ConnectionClosedError as e:
print(f"WebSocket连接关闭: {e}")
# 示例使用
asyncio.run(asyncio_tasks("ws://127.0.0.1:8888"))
推荐链接
发表评论