python中使用websocket进行链接与数据存储

笔者最近在测试星火大模型的时候,他们是使用websocket 来建立对话,而且星火大模型开放的测试代码,质量上不咋地(20231030记录),还需要对websocket有一定的了解,才适合自己微调。

安装:

代码语言:javascript
复制
pip install websocket
pip install websocket-client

1 常见的websocket获取数据的方法

参考【python: websocket获取实时数据的几种常见链接方式】常见的两种。

1.1 第一种使用create_connection链接

需要pip install websocket-client (此方法不建议使用,链接不稳定,容易断,并且连接很耗时)

代码语言:javascript
复制
import time
from websocket import create_connection

url = 'wss://i.cg.net/wi/ws'
while True: # 一直链接,直到连接上就退出循环
time.sleep(2)
try:
ws = create_connection(url)
print(ws)
break
except Exception as e:
print('连接异常:', e)
continue
while True: # 连接上,退出第一个循环之后,此循环用于一直获取数据
ws.send('{"event":"subscribe", "channel":"btc_usdt.ticker"}')
response = ws.recv()
print(response)

1.2 第二种:WebSocketApp + run_forever的方式

代码语言:javascript
复制
import websocket

def on_message(ws, message): # 服务器有数据更新时,主动推送过来的数据
print(message)

def on_error(ws, error): # 程序报错时,就会触发on_error事件
print(error)

def on_close(ws):
print("Connection closed ……")

def on_open(ws): # 连接到服务器之后就会触发on_open事件,这里用于send数据
req = '{"event":"subscribe", "channel":"btc_usdt.deep"}'
print(req)
ws.send(req)

if name == "main":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://i.cg.net/wi/ws",
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
ws.run_forever(ping_timeout=30)

第二种方式里面,run_forever其实是流式返回内容,大概可以看,流式输出的样例:

代码语言:javascript
复制
{"code":0,"sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":0}

error: 'content'

{"code":0,"fileRefer":"{"43816997a7a44a299d0bfb7c360c5838":[2,0,1]}","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":99}

error: 'content'

{"code":0,"content":"橘","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":1}

橘{"code":0,"content":"子。","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":1}

子。{"code":0,"content":"","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":2}

closed ###

那么run_forever流式输出,正常的内容如何保存呢,进入下一章


2 针对run_forever内容保存

2.1 通过定义global变量来保存内容

参考【将Websocket数据保存到Pandas】

来看一下,文中的案例:

代码语言:javascript
复制
import json

import pandas as pd
import websocket

df = pd.DataFrame(columns=['foreignNotional', 'grossValue', 'homeNotional', 'price', 'side',
'size', 'symbol', 'tickDirection', 'timestamp', 'trdMatchID'])

def on_message(ws, message):
msg = json.loads(message)
print(msg)
global df
# ignore_index=True has to be provided, otherwise you'll get
# "Can only append a Series if ignore_index=True or if the Series has a name" errors
df = df.append(msg, ignore_index=True)

def on_error(ws, error):
print(error)

def on_close(ws):
print("### closed ###")

def on_open(ws):
return

if name == "main":
ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=trade:XBTUSD",
on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close)
ws.run_forever()

其中global df是在定义全局变量df,可以在函数中把流式数据拿出来,还是很不错的

2.2 通过CallbackToIterator()来返回

在开源项目中ChuanhuChatGPT,看到了使用的方式spark.py,个人还没有尝试,只是贴在这里。

贴一下这个函数:

代码语言:javascript
复制
class CallbackToIterator:
def __init__(self):
self.queue = deque()
self.cond = Condition()
self.finished = False

def callback(self, result):
    with self.cond:
        self.queue.append(result)
        self.cond.notify()  # Wake up the generator.

def __iter__(self):
    return self

def __next__(self):
    with self.cond:
        # Wait for a value to be added to the queue.
        while not self.queue and not self.finished:
            self.cond.wait()
        if not self.queue:
            raise StopIteration()
        return self.queue.popleft()

def finish(self):
    with self.cond:
        self.finished = True
        self.cond.notify()  # Wake up the generator if it's waiting.

主函数截取

def get_answer_stream_iter(self):
wsParam = Ws_Param(self.appid, self.api_key, self.api_secret, self.spark_url)
websocket.enableTrace(False)
wsUrl = wsParam.create_url()
ws = websocket.WebSocketApp(
wsUrl,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open,
)
ws.appid = self.appid
ws.domain = self.domain

# Initialize the CallbackToIterator
ws.iterator = CallbackToIterator()

# Start the WebSocket connection in a separate thread
thread.start_new_thread(
    ws.run_forever, (), {"sslopt": {"cert_reqs": ssl.CERT_NONE}}
)

# Iterate over the CallbackToIterator instance
answer = ""
total_tokens = 0
for message in ws.iterator:
    data = json.loads(message)
    code = data["header"]["code"]
    if code != 0:
        ws.close()
        raise Exception(f"请求错误: {code}, {data}")
    else:
        choices = data["payload"]["choices"]
        status = choices["status"]
        content = choices["text"][0]["content"]
        if "usage" in data["payload"]:
            total_tokens = data["payload"]["usage"]["text"]["total_tokens"]
        answer += content
        if status == 2:
            ws.iterator.finish()  # Finish the iterator when the status is 2
            ws.close()
        yield answer, total_tokens</code></pre></div></div><p>截取了部分代码,这里先是定义<code>ws.iterator = CallbackToIterator()</code>然后通过迭代从<code>for message in ws.iterator:</code>拿出数据,看上去也是可行的</p>