笔者最近在测试星火大模型的时候,他们是使用websocket 来建立对话,而且星火大模型开放的测试代码,质量上不咋地(20231030记录),还需要对websocket有一定的了解,才适合自己微调。

安装:

pip install websocket

pip install websocket-client

文章目录

1 常见的websocket获取数据的方法1.1 第一种使用create_connection链接1.2 第二种:WebSocketApp + run_forever的方式

2 针对`run_forever`内容保存2.1 通过定义global变量来保存内容2.2 通过`CallbackToIterator()`来返回

1 常见的websocket获取数据的方法

参考【python: websocket获取实时数据的几种常见链接方式】常见的两种。

1.1 第一种使用create_connection链接

需要pip install websocket-client (此方法不建议使用,链接不稳定,容易断,并且连接很耗时)

import time

from websocket import create_connection

url = 'wss://i.cg.net/wi/ws'

while True: # 一直链接,直到连接上就退出循环

time.sleep(2)

try:

ws = create_connection(url)

print(ws)

break

except Exception as e:

print('连接异常:', e)

continue

while True: # 连接上,退出第一个循环之后,此循环用于一直获取数据

ws.send('{"event":"subscribe", "channel":"btc_usdt.ticker"}')

response = ws.recv()

print(response)

1.2 第二种:WebSocketApp + run_forever的方式

import websocket

def on_message(ws, message): # 服务器有数据更新时,主动推送过来的数据

print(message)

def on_error(ws, error): # 程序报错时,就会触发on_error事件

print(error)

def on_close(ws):

print("Connection closed ……")

def on_open(ws): # 连接到服务器之后就会触发on_open事件,这里用于send数据

req = '{"event":"subscribe", "channel":"btc_usdt.deep"}'

print(req)

ws.send(req)

if __name__ == "__main__":

websocket.enableTrace(True)

ws = websocket.WebSocketApp("wss://i.cg.net/wi/ws",

on_message=on_message,

on_error=on_error,

on_close=on_close)

ws.on_open = on_open

ws.run_forever(ping_timeout=30)

第二种方式里面,run_forever其实是流式返回内容,大概可以看,流式输出的样例:

{"code":0,"sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":0}

### error: 'content'

{"code":0,"fileRefer":"{\"43816997a7a44a299d0bfb7c360c5838\":[2,0,1]}","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":99}

### error: 'content'

{"code":0,"content":"橘","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":1}

橘{"code":0,"content":"子。","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":1}

子。{"code":0,"content":"","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":2}

### closed ###

那么run_forever流式输出,正常的内容如何保存呢,进入下一章

2 针对run_forever内容保存

2.1 通过定义global变量来保存内容

参考【将Websocket数据保存到Pandas】 来看一下,文中的案例:

import json

import pandas as pd

import websocket

df = pd.DataFrame(columns=['foreignNotional', 'grossValue', 'homeNotional', 'price', 'side',

'size', 'symbol', 'tickDirection', 'timestamp', 'trdMatchID'])

def on_message(ws, message):

msg = json.loads(message)

print(msg)

global df

# `ignore_index=True` has to be provided, otherwise you'll get

# "Can only append a Series if ignore_index=True or if the Series has a name" errors

df = df.append(msg, ignore_index=True)

def on_error(ws, error):

print(error)

def on_close(ws):

print("### closed ###")

def on_open(ws):

return

if __name__ == "__main__":

ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=trade:XBTUSD",

on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close)

ws.run_forever()

其中global df是在定义全局变量df,可以在函数中把流式数据拿出来,还是很不错的

2.2 通过CallbackToIterator()来返回

在开源项目中ChuanhuChatGPT,看到了使用的方式spark.py,个人还没有尝试,只是贴在这里。

贴一下这个函数:

class CallbackToIterator:

def __init__(self):

self.queue = deque()

self.cond = Condition()

self.finished = False

def callback(self, result):

with self.cond:

self.queue.append(result)

self.cond.notify() # Wake up the generator.

def __iter__(self):

return self

def __next__(self):

with self.cond:

# Wait for a value to be added to the queue.

while not self.queue and not self.finished:

self.cond.wait()

if not self.queue:

raise StopIteration()

return self.queue.popleft()

def finish(self):

with self.cond:

self.finished = True

self.cond.notify() # Wake up the generator if it's waiting.

# 主函数截取

def get_answer_stream_iter(self):

wsParam = Ws_Param(self.appid, self.api_key, self.api_secret, self.spark_url)

websocket.enableTrace(False)

wsUrl = wsParam.create_url()

ws = websocket.WebSocketApp(

wsUrl,

on_message=self.on_message,

on_error=self.on_error,

on_close=self.on_close,

on_open=self.on_open,

)

ws.appid = self.appid

ws.domain = self.domain

# Initialize the CallbackToIterator

ws.iterator = CallbackToIterator()

# Start the WebSocket connection in a separate thread

thread.start_new_thread(

ws.run_forever, (), {"sslopt": {"cert_reqs": ssl.CERT_NONE}}

)

# Iterate over the CallbackToIterator instance

answer = ""

total_tokens = 0

for message in ws.iterator:

data = json.loads(message)

code = data["header"]["code"]

if code != 0:

ws.close()

raise Exception(f"请求错误: {code}, {data}")

else:

choices = data["payload"]["choices"]

status = choices["status"]

content = choices["text"][0]["content"]

if "usage" in data["payload"]:

total_tokens = data["payload"]["usage"]["text"]["total_tokens"]

answer += content

if status == 2:

ws.iterator.finish() # Finish the iterator when the status is 2

ws.close()

yield answer, total_tokens

截取了部分代码,这里先是定义ws.iterator = CallbackToIterator()然后通过迭代从for message in ws.iterator:拿出数据,看上去也是可行的

推荐链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: