背景

需要创建一个ws客户端mock弹幕一直给直播间发送消息。于是使用了async with websockets.connect(uri) as websocket 创建的ws客户端发送消息。但是发现发送一会儿后,客户端就自动关闭了,曝出:“websockets.ConnectionClosedError, WebSocket连接关闭: no close frame received or sent”

即:websocket.connect(url) as websocket 返回的异步上下文管理器对象。发收到发送一会儿消息后,客户端自动化关闭链接,导致报错。为什么?

原理

ws是基于tcp协议的信令通道。使用流程:

1、ws握手:

客户端主动创建ws连接,例如通过url: ws://127.0.0.1:8888创建ws连接。

2、发送数据:

发送信令数据包,Server如果长时间没有收到数据,会主动断开ws连接。这应该就是背景中自动关闭连接的原因。所以如果不说固定频率时间的发送信息,可以发送心跳包活包。

验证发现每隔2s就发送信息,连接就会一直连接。

使用式例代码如下“式例代码”。

实现websocket客户端

方式1:用websocket.WebSocketApp创建。通过定义回调函数来处理不同的事件,如接受消息、发生错误、连接关闭等。

ws = websocket.WebSocketApp("ws_barrage_mock://example.com",

on_message=on_message,

on_error=on_error,

on_close=on_close,

on_open=on_open)

在如下示例send_message()中进行死循环,在新开一个进程中进行发送消息。

方式2: 使用异步ws对象,同时进行发送心跳(目前代码里没有实现)

async with websockets.connect("ws_bk://127.0.0.1:8888") as websocket:

式例代码

ws_server.py

import asyncio

import websockets

# 保存已连接的客户端列表

connected_clients = set()

async def handle_websocket(websocket, path):

# 将新的客户端添加到已连接客户端列表

remote_address = websocket.remote_address

connected_clients.add(websocket)

print(f"新的客户端已连接:{remote_address}")

try:

# 接收客户端消息并发送回复

async for message in websocket:

print(f"收到客户端{remote_address}消息:{message}")

# 广播发送消息给所有客户端

await broadcast(message)

except websockets.ConnectionClosedError:

print(f"客户端{remote_address}连接关闭")

except Exception as e:

print(f"处理客户端时出现错误:{e}")

finally:

# 从已连接客户端列表中移除客户端

connected_clients.remove(websocket)

async def broadcast(message):

# 遍历所有已连接的客户端,并发送消息

for client in connected_clients:

await client.send(message)

async def main():

# 创建WebSocket服务器并注册处理程序

server = await websockets.serve(handle_websocket, "127.0.0.1", 8888)

# 运行服务器

await server.wait_closed()

# 运行主函数

asyncio.run(main())

web_client_webapp.py

import websocket

def on_message(ws, message):

print(f"Received message: {message}")

def on_error(ws, error):

print(f"Error: {error}")

def on_close(ws):

print("Connection closed")

def on_open(ws):

print("Connection opened")

ws.send("Hello, server!")

websocket.enableTrace(True) # 启用调试日志

ws = websocket.WebSocketApp("ws_barrage_mock://example.com",

on_message=on_message,

on_error=on_error,

on_close=on_close,

on_open=on_open)

ws.run_forever()

web_client_async.py

import asyncio

import websockets

async def receive_msg(websocket):

while True:

reply = await websocket.recv()

print(f"收到服务器回复:{reply}")

async def send_message(websocket):

while True:

# message = ''

# while '1' not in message \

# and '2' not in message \

# and '3' not in message \

# and '4' not in message \

# and '5' not in message:

# message = input("请输入要发送的消息,输入\n"

# "(1)弹幕消息\n"

# "(2)点赞消息\n"

# "(3)进直播间\n"

# "(4)关注消息\n"

# "(5)礼物消息:\n")

# print(f'-->输入成功:{message}\n')

message = '1'

await websocket.send(pkg_msg(message))

print('发送消息:', pkg_msg(message))

await asyncio.sleep(2)

async def asyncio_tasks(url):

try:

async with websockets.connect(url) as websocket:

# 启动发送消息的任务

send_task = asyncio.create_task(send_message(websocket))

# 启动接收消息的任务

receive_task = asyncio.create_task(receive_msg(websocket))

# 等待两个任务完成

await asyncio.gather(send_task, receive_task)

except websockets.ConnectionClosedError as e:

print(f"WebSocket连接关闭: {e}")

# 示例使用

asyncio.run(asyncio_tasks("ws://127.0.0.1:8888"))

推荐链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: