Python websocket
@router.websocket('/chat/{flow_id}')
接口代码,并了解其工作流程、涉及的组件以及如何基于此实现你的新 WebSocket 接口。以下内容将分为几个部分进行讲解:
- 接口整体概述
- 代码逐行解析
- 关键组件和依赖关系
- 如何基于此实现新功能
- 示例:创建一个新的 WebSocket 接口
1. 接口整体概述
@router.websocket('/chat/{flow_id}')
是一个 WebSocket 端点,用于处理实时聊天会话。它的主要职责包括:
- 认证和授权:通过 JWT 令牌验证用户身份。
- 连接管理:建立和管理 WebSocket 连接。
- 消息处理:接收和发送消息,处理聊天逻辑。
- 异常处理:处理连接断开、认证失败和其他异常情况。
2. 代码逐行解析
以下是你提供的 WebSocket 端点代码:
@router.websocket('/chat/{flow_id}')
async def chat(*,flow_id: str,websocket: WebSocket,t: Optional[str] = None,chat_id: Optional[str] = None,version_id: Optional[int] = None,Authorize: AuthJWT = Depends(),
):"""Websocket endpoint for chat."""try:if t:Authorize.jwt_required(auth_from='websocket', token=t)Authorize._token = telse:Authorize.jwt_required(auth_from='websocket', websocket=websocket)login_user = await get_login_user(Authorize)user_id = login_user.user_idif chat_id:with session_getter() as session:db_flow = session.get(Flow, flow_id)if not db_flow:await websocket.accept()message = '该技能已被删除'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)if db_flow.status != 2:await websocket.accept()message = '当前技能未上线,无法直接对话'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)graph_data = db_flow.dataelse:flow_data_key = 'flow_data_' + flow_idif version_id:flow_data_key = flow_data_key + '_' + str(version_id)if not flow_data_store.exists(flow_data_key) or str(flow_data_store.hget(flow_data_key, 'status'),'utf-8') != BuildStatus.SUCCESS.value:await websocket.accept()message = '当前编译没通过'await websocket.close(code=status.WS_1013_TRY_AGAIN_LATER, reason=message)returngraph_data = json.loads(flow_data_store.hget(flow_data_key, 'graph_data'))if not chat_id:# 调试时,每次都初始化对象chat_manager.set_cache(get_cache_key(flow_id, chat_id), None)with logger.contextualize(trace_id=chat_id):logger.info('websocket_verify_ok begin=handle_websocket')await chat_manager.handle_websocket(flow_id,chat_id,websocket,user_id,gragh_data=graph_data)except WebSocketException as exc:logger.error(f'Websocket exrror: {str(exc)}')await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))except Exception as exc:logger.exception(f'Error in chat websocket: {str(exc)}')messsage = exc.detail if isinstance(exc, HTTPException) else str(exc)if 'Could not validate credentials' in str(exc):await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason='Unauthorized')else:await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=messsage)
2.1. 函数签名和参数
@router.websocket('/chat/{flow_id}')
async def chat(*,flow_id: str,websocket: WebSocket,t: Optional[str] = None,chat_id: Optional[str] = None,version_id: Optional[int] = None,Authorize: AuthJWT = Depends(),
):
flow_id: str
:URL路径参数,用于标识聊天流程或技能。websocket: WebSocket
:WebSocket 连接对象。t: Optional[str]
:查询参数,可选,用于传递 JWT 令牌。chat_id: Optional[str]
:查询参数,可选,用于标识具体的聊天会话。version_id: Optional[int]
:查询参数,可选,用于标识流程的版本。Authorize: AuthJWT = Depends()
:依赖注入,用于处理 JWT 认证。
2.2. 认证和授权
if t:Authorize.jwt_required(auth_from='websocket', token=t)Authorize._token = t
else:Authorize.jwt_required(auth_from='websocket', websocket=websocket)
login_user = await get_login_user(Authorize)
user_id = login_user.user_id
-
条件判断
:
- 如果提供了
t
参数,使用它作为 JWT 令牌进行认证。 - 否则,尝试从 WebSocket 连接中提取 JWT 令牌进行认证。
- 如果提供了
-
获取用户信息
:
get_login_user(Authorize)
函数用于解析 JWT 令牌并获取登录用户的信息。- 获取
user_id
以供后续使用。
2.3. 流程或技能验证
if chat_id:with session_getter() as session:db_flow = session.get(Flow, flow_id)if not db_flow:await websocket.accept()message = '该技能已被删除'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)if db_flow.status != 2:await websocket.accept()message = '当前技能未上线,无法直接对话'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)graph_data = db_flow.data
else:flow_data_key = 'flow_data_' + flow_idif version_id:flow_data_key = flow_data_key + '_' + str(version_id)if not flow_data_store.exists(flow_data_key) or str(flow_data_store.hget(flow_data_key, 'status'),'utf-8') != BuildStatus.SUCCESS.value:await websocket.accept()message = '当前编译没通过'await websocket.close(code=status.WS_1013_TRY_AGAIN_LATER, reason=message)returngraph_data = json.loads(flow_data_store.hget(flow_data_key, 'graph_data'))
-
有
chat_id
:
- 从数据库获取
Flow
对象(流程或技能)。 - 如果
Flow
不存在,关闭连接并返回错误信息。 - 如果
Flow
状态不是2
(假设2
表示“已上线”),关闭连接并返回错误信息。 - 获取
graph_data
(流程图数据)用于后续处理。
- 从数据库获取
-
无
chat_id
:
- 根据
flow_id
和version_id
生成flow_data_key
。 - 从 Redis 缓存中检查该
flow_data_key
是否存在且状态为SUCCESS
。 - 如果条件不满足,关闭连接并返回错误信息。
- 获取
graph_data
(流程图数据)用于后续处理。
- 根据
2.4. 初始化聊天会话
if not chat_id:# 调试时,每次都初始化对象chat_manager.set_cache(get_cache_key(flow_id, chat_id), None)
-
无
chat_id
:
- 调用
chat_manager.set_cache
方法,初始化或重置聊天缓存。这在调试时可能会频繁初始化聊天对象。
- 调用
2.5. 处理聊天 WebSocket 会话
with logger.contextualize(trace_id=chat_id):logger.info('websocket_verify_ok begin=handle_websocket')await chat_manager.handle_websocket(flow_id,chat_id,websocket,user_id,gragh_data=graph_data)
-
上下文日志记录:使用
trace_id=chat_id
来上下文化日志,便于追踪特定聊天会话的日志。 -
调用
chat_manager.handle_websocket
:
- 传递
flow_id
、chat_id
、websocket
、user_id
和graph_data
。 handle_websocket
方法负责处理整个 WebSocket 会话,包括接收消息、发送响应、处理业务逻辑等。
- 传递
2.6. 异常处理
except WebSocketException as exc:logger.error(f'Websocket exrror: {str(exc)}')await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))
except Exception as exc:logger.exception(f'Error in chat websocket: {str(exc)}')messsage = exc.detail if isinstance(exc, HTTPException) else str(exc)if 'Could not validate credentials' in str(exc):await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason='Unauthorized')else:await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=messsage)
-
WebSocketException
:
- 记录错误日志。
- 以
WS_1011_INTERNAL_ERROR
状态码关闭连接。
-
其他异常
:
- 记录异常日志。
- 如果异常与认证失败相关,使用
WS_1008_POLICY_VIOLATION
状态码关闭连接并返回“Unauthorized”。 - 否则,使用
WS_1011_INTERNAL_ERROR
状态码关闭连接并返回错误信息。
3. 关键组件和依赖关系
为了全面理解这个 WebSocket 接口,我们需要了解以下关键组件和它们的职责:
3.1. AuthJWT
和 get_login_user
-
AuthJWT
:
- 负责处理 JWT 认证,包括生成、验证和解析令牌。
-
get_login_user
:
- 解析
AuthJWT
提供的令牌,获取登录用户的信息。 - 返回一个
UserPayload
对象,包含用户的user_id
、user_name
等信息。
- 解析
3.2. session_getter
-
职责
:
- 提供数据库会话上下文管理,确保数据库操作的事务性和资源管理。
-
用法
:
- 使用
with session_getter() as session
来获取数据库会话,并在with
块结束时自动关闭会话。
- 使用
3.3. ChatManager
-
职责
:
- 管理 WebSocket 连接。
- 处理聊天消息的接收与发送。
- 维护聊天历史。
- 处理具体的聊天逻辑,如调用 LangChain 对象、生成响应等。
-
主要方法
:
handle_websocket
:处理 WebSocket 会话,包括接收消息、处理消息、发送响应等。- 其他辅助方法,如
connect
、send_message
、close_connection
等,用于管理连接和消息传输。
3.4. Redis 缓存
-
职责
:
- 存储流程图数据 (
graph_data
) 和构建状态 (status
)。 - 用于验证流程是否已成功构建,以及存储和检索相关数据。
- 存储流程图数据 (
3.5. 数据库模型和 DAO 类
-
Flow
:
- 表示一个流程或技能的数据库模型。
-
ChatMessage
和
ChatMessageDao
:
ChatMessage
表示聊天消息的数据库模型。ChatMessageDao
提供对ChatMessage
的数据库操作方法,如查询、插入、更新等。
4. 如何基于此实现新 WebSocket 功能
假设你的需求是实现一个新的 WebSocket 接口,例如 /chat/{flow_id}/new_feature
,你可以按照以下步骤进行:
4.1. 定位文件和结构
基于现有项目结构,新的 WebSocket 接口应添加到相同的路由文件中,即 src/backend/bisheng/api/v1/chat.py
。如果有多个 WebSocket 接口,考虑将它们逻辑上分离到不同的模块中,例如 chat_new_feature.py
,然后在路由文件中引入。
4.2. 编写新的 WebSocket 端点
在 chat.py
文件中新增一个 WebSocket 端点:
@router.websocket('/chat/{flow_id}/new_feature')
async def chat_new_feature(*,flow_id: str,websocket: WebSocket,t: Optional[str] = None,chat_id: Optional[str] = None,version_id: Optional[int] = None,Authorize: AuthJWT = Depends(),
):"""Websocket endpoint for new feature."""try:if t:Authorize.jwt_required(auth_from='websocket', token=t)Authorize._token = telse:Authorize.jwt_required(auth_from='websocket', websocket=websocket)login_user = await get_login_user(Authorize)user_id = login_user.user_id# 验证流程或技能if chat_id:with session_getter() as session:db_flow = session.get(Flow, flow_id)if not db_flow:await websocket.accept()message = '该技能已被删除'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)if db_flow.status != 2:await websocket.accept()message = '当前技能未上线,无法直接对话'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)graph_data = db_flow.dataelse:flow_data_key = 'flow_data_' + flow_idif version_id:flow_data_key = flow_data_key + '_' + str(version_id)if not flow_data_store.exists(flow_data_key) or str(flow_data_store.hget(flow_data_key, 'status'),'utf-8') != BuildStatus.SUCCESS.value:await websocket.accept()message = '当前编译没通过'await websocket.close(code=status.WS_1013_TRY_AGAIN_LATER, reason=message)returngraph_data = json.loads(flow_data_store.hget(flow_data_key, 'graph_data'))if not chat_id:# 初始化对象chat_manager.set_cache(get_cache_key(flow_id, chat_id), None)with logger.contextualize(trace_id=chat_id):logger.info('websocket_verify_ok begin=handle_new_feature_websocket')# 调用 ChatManager 的新方法来处理新的功能await chat_manager.handle_new_feature_websocket(flow_id,chat_id,websocket,user_id,gragh_data=graph_data)except WebSocketException as exc:logger.error(f'Websocket error: {str(exc)}')await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))except Exception as exc:logger.exception(f'Error in new feature websocket: {str(exc)}')message = exc.detail if isinstance(exc, HTTPException) else str(exc)if 'Could not validate credentials' in str(exc):await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason='Unauthorized')else:await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=message)
4.3. 实现 ChatManager
的新方法
在 src/backend/bisheng/chat/manager.py
文件中,添加一个新的方法 handle_new_feature_websocket
,用于处理新功能的 WebSocket 会话。
# src/backend/bisheng/chat/manager.pyclass ChatManager:# 现有的 __init__ 和其他方法...async def handle_new_feature_websocket(self, flow_id: str, chat_id: str, websocket: WebSocket, user_id: int, gragh_data: dict):"""处理新功能的 WebSocket 会话。"""client_key = uuid.uuid4().hexchat_client = ChatClient(request=None, # 如果有 Request 对象,传递进去client_key=client_key,client_id=flow_id,chat_id=chat_id,user_id=user_id,login_user=UserPayload(user_id=user_id, user_name='username', role='user'), # 根据实际情况调整work_type=WorkType.NEW_FEATURE, # 假设有新的工作类型websocket=websocket,graph_data=gragh_data)await self.accept_client(client_key, chat_client, websocket)logger.debug(f'New feature websocket accepted: client_key={client_key}, flow_id={flow_id}, chat_id={chat_id}')try:while True:try:json_payload_receive = await asyncio.wait_for(websocket.receive_json(), timeout=2.0)except asyncio.TimeoutError:continuetry:payload = json.loads(json_payload_receive) if json_payload_receive else {}except TypeError:payload = json_payload_receive# 处理新功能的消息await chat_client.handle_new_feature_message(payload)except WebSocketDisconnect as e:logger.info(f'New feature websocket disconnect: {str(e)}')except IgnoreException:# 客户端内部自行关闭连接passexcept Exception as e:logger.exception(f'Error in new feature websocket: {str(e)}')await self.close_client(client_key, code=status.WS_1011_INTERNAL_ERROR, reason='未知错误')finally:await self.close_client(client_key, code=status.WS_1000_NORMAL_CLOSURE, reason='Client disconnected')self.clear_client(client_key)
解释:
- 创建
ChatClient
实例:- 新的
ChatClient
实例用于处理特定的聊天会话。 - 你可能需要扩展
ChatClient
类,添加处理新功能消息的方法,如handle_new_feature_message
。
- 新的
- 接收和处理消息:
- 持续接收来自客户端的 JSON 消息。
- 调用
handle_new_feature_message
方法处理消息。
- 异常处理:
- 处理连接断开、忽略的异常和其他未知错误,确保连接正确关闭。
4.4. 扩展 ChatClient
类
根据新功能的需求,扩展 ChatClient
类,添加处理新功能消息的方法。
# src/backend/bisheng/chat/client.pyclass ChatClient:# 现有的 __init__ 和其他方法...async def handle_new_feature_message(self, payload: dict):"""处理新功能的消息。"""# 根据 payload 的内容,执行相应的逻辑# 例如,执行某个特定的操作、调用服务、返回结果等# 示例:假设新功能是执行一个计算任务if 'action' in payload:action = payload['action']if action == 'compute':data = payload.get('data')result = self.perform_computation(data)response = {'result': result}await self.websocket.send_json(response)else:response = {'error': '未知的操作'}await self.websocket.send_json(response)def perform_computation(self, data):"""执行某个计算任务的示例方法。"""# 示例计算:计算数据的平方try:number = float(data)return number ** 2except (ValueError, TypeError):return 'Invalid input for computation'
解释:
handle_new_feature_message
方法:- 解析接收到的
payload
,根据内容执行相应的操作。 - 示例中,如果
action
为compute
,则执行一个计算任务并返回结果。
- 解析接收到的
perform_computation
方法:- 一个示例计算方法,接收数据并返回其平方。
4.5. 更新 ChatClient
类
确保 ChatClient
类中包含处理新功能消息的方法,如上面的 handle_new_feature_message
。如果需要处理更复杂的逻辑,可以进一步扩展。
5. 示例:创建一个新的 WebSocket 接口
假设你需要实现一个新的 WebSocket 接口 /chat/{flow_id}/translate
,用于实时翻译用户发送的消息。以下是具体的实现步骤:
5.1. 新增 WebSocket 端点
在 src/backend/bisheng/api/v1/chat.py
中新增 WebSocket 端点:
@router.websocket('/chat/{flow_id}/translate')
async def chat_translate(*,flow_id: str,websocket: WebSocket,t: Optional[str] = None,chat_id: Optional[str] = None,version_id: Optional[int] = None,Authorize: AuthJWT = Depends(),
):"""Websocket endpoint for translate feature."""try:# 认证和授权if t:Authorize.jwt_required(auth_from='websocket', token=t)Authorize._token = telse:Authorize.jwt_required(auth_from='websocket', websocket=websocket)login_user = await get_login_user(Authorize)user_id = login_user.user_id# 验证流程或技能if chat_id:with session_getter() as session:db_flow = session.get(Flow, flow_id)if not db_flow:await websocket.accept()message = '该技能已被删除'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)if db_flow.status != 2:await websocket.accept()message = '当前技能未上线,无法直接对话'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)graph_data = db_flow.dataelse:flow_data_key = 'flow_data_' + flow_idif version_id:flow_data_key = flow_data_key + '_' + str(version_id)if not flow_data_store.exists(flow_data_key) or str(flow_data_store.hget(flow_data_key, 'status'),'utf-8') != BuildStatus.SUCCESS.value:await websocket.accept()message = '当前编译没通过'await websocket.close(code=status.WS_1013_TRY_AGAIN_LATER, reason=message)returngraph_data = json.loads(flow_data_store.hget(flow_data_key, 'graph_data'))if not chat_id:# 初始化对象chat_manager.set_cache(get_cache_key(flow_id, chat_id), None)with logger.contextualize(trace_id=chat_id):logger.info('websocket_translate_verify_ok begin=handle_translate_websocket')# 调用 ChatManager 的新方法来处理翻译功能await chat_manager.handle_translate_websocket(flow_id,chat_id,websocket,user_id,gragh_data=graph_data)except WebSocketException as exc:logger.error(f'Websocket error: {str(exc)}')await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))except Exception as exc:logger.exception(f'Error in translate websocket: {str(exc)}')message = exc.detail if isinstance(exc, HTTPException) else str(exc)if 'Could not validate credentials' in str(exc):await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason='Unauthorized')else:await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=message)
5.2. 实现 ChatManager
的新方法
在 src/backend/bisheng/chat/manager.py
中,添加 handle_translate_websocket
方法:
# src/backend/bisheng/chat/manager.pyclass ChatManager:# 现有的 __init__ 和其他方法...async def handle_translate_websocket(self, flow_id: str, chat_id: str, websocket: WebSocket, user_id: int, gragh_data: dict):"""处理翻译功能的 WebSocket 会话。"""client_key = uuid.uuid4().hexchat_client = ChatClient(request=None, # 如果有 Request 对象,传递进去client_key=client_key,client_id=flow_id,chat_id=chat_id,user_id=user_id,login_user=UserPayload(user_id=user_id, user_name='username', role='user'), # 根据实际情况调整work_type=WorkType.TRANSLATE, # 假设有翻译工作类型websocket=websocket,graph_data=gragh_data)await self.accept_client(client_key, chat_client, websocket)logger.debug(f'Translate websocket accepted: client_key={client_key}, flow_id={flow_id}, chat_id={chat_id}')try:while True:try:json_payload_receive = await asyncio.wait_for(websocket.receive_json(), timeout=2.0)except asyncio.TimeoutError:continuetry:payload = json.loads(json_payload_receive) if json_payload_receive else {}except TypeError:payload = json_payload_receive# 处理翻译功能的消息await chat_client.handle_translate_message(payload)except WebSocketDisconnect as e:logger.info(f'Translate websocket disconnect: {str(e)}')except IgnoreException:# 客户端内部自行关闭连接passexcept Exception as e:logger.exception(f'Error in translate websocket: {str(e)}')await self.close_client(client_key, code=status.WS_1011_INTERNAL_ERROR, reason='未知错误')finally:await self.close_client(client_key, code=status.WS_1000_NORMAL_CLOSURE, reason='Client disconnected')self.clear_client(client_key)
5.3. 扩展 ChatClient
类
在 src/backend/bisheng/chat/client.py
中,添加处理翻译消息的方法:
# src/backend/bisheng/chat/client.pyclass ChatClient:# 现有的 __init__ 和其他方法...async def handle_translate_message(self, payload: dict):"""处理翻译功能的消息。"""if 'text' in payload:text_to_translate = payload['text']target_language = payload.get('target_language', 'en') # 默认翻译为英文translated_text = self.translate_text(text_to_translate, target_language)response = {'translated_text': translated_text}await self.websocket.send_json(response)else:response = {'error': 'No text provided for translation'}await self.websocket.send_json(response)def translate_text(self, text: str, target_language: str) -> str:"""执行文本翻译的示例方法。实际实现中可以调用第三方翻译服务,如 Google Translate API。"""# 示例:简单的模拟翻译translations = {'en': lambda x: f'Translated to English: {x}','zh': lambda x: f'翻译为中文: {x}',# 添加更多语言的翻译逻辑}translate_func = translations.get(target_language, lambda x: 'Unsupported language')return translate_func(text)
解释:
-
handle_translate_message
方法:
- 检查
payload
中是否包含text
字段。 - 获取
target_language
(目标语言),默认翻译为英文。 - 调用
translate_text
方法进行翻译。 - 将翻译结果通过 WebSocket 发送给客户端。
- 检查
-
translate_text
方法:
- 示例中提供了一个简单的翻译逻辑,实际应用中应调用第三方翻译 API(如 Google Translate、DeepL 等)。
5.4. 测试新的 WebSocket 接口
在本地环境中启动服务器,并使用 WebSocket 客户端工具(如 websocat、Postman 或浏览器的开发者工具)进行测试。
示例测试流程:
-
建立连接:
websocat "ws://127.0.0.1:8000/api/v1/chat/<flow_id>/translate?t=<token>&chat_id=<chat_id>"
-
发送翻译请求:
{"text": "Hello, how are you?","target_language": "zh" }
-
接收翻译结果:
{"translated_text": "翻译为中文: Hello, how are you?" }
6. 关键步骤总结
基于现有的 WebSocket 接口实现新功能时,遵循以下步骤:
- 新增 WebSocket 端点:
- 在相应的路由文件中(如
chat.py
)新增一个 WebSocket 路由。 - 定义必要的参数和认证逻辑。
- 在相应的路由文件中(如
- 扩展
ChatManager
类:- 添加一个新的方法来处理特定功能的 WebSocket 会话(如
handle_translate_websocket
)。 - 在该方法中,创建和管理
ChatClient
实例,并处理消息接收和发送。
- 添加一个新的方法来处理特定功能的 WebSocket 会话(如
- 扩展
ChatClient
类:- 添加一个新的方法来处理特定功能的消息(如
handle_translate_message
)。 - 实现功能逻辑,如调用翻译服务,并通过 WebSocket 发送响应。
- 添加一个新的方法来处理特定功能的消息(如
- 测试和验证:
- 编写和运行单元测试,确保新功能正常工作。
- 使用 WebSocket 客户端工具进行手动测试,验证消息的发送和接收。
- 文档更新:
- 更新 API 文档,说明新的 WebSocket 接口的使用方法和参数。
7. 文件路径和结构概览
以下是相关文件的路径和主要职责:
- WebSocket 路由:
src/backend/bisheng/api/v1/chat.py
:定义所有聊天相关的 API 端点,包括 WebSocket 端点。
- 聊天管理器:
src/backend/bisheng/chat/manager.py
:管理 WebSocket 连接、聊天历史和消息处理。
- 聊天客户端:
src/backend/bisheng/chat/client.py
:处理具体的聊天会话,包括消息的接收和发送。
- 认证服务:
src/backend/bisheng/api/services/user_service.py
:处理用户认证和授权。
- 数据库模型和 DAO:
src/backend/bisheng/database/models/
:包含数据库模型和数据访问对象,用于操作数据库中的数据。
- 缓存管理:
src/backend/bisheng/cache/
:管理缓存数据,如 Redis 缓存。
8. 最佳实践建议
- 模块化设计:将不同功能的逻辑分离到不同的模块或类中,保持代码的清晰和可维护性。
- 错误处理:确保 WebSocket 端点具备全面的错误处理机制,能够优雅地处理各种异常情况,避免资源泄漏。
- 安全性:确保所有 WebSocket 连接都经过认证和授权,防止未授权的访问。
- 性能优化:
- 使用异步编程模型(如
asyncio
)处理高并发的 WebSocket 连接。 - 合理管理连接池和任务队列,避免阻塞主事件循环。
- 使用异步编程模型(如
- 测试覆盖:编写充分的单元测试和集成测试,确保新功能的稳定性和可靠性。
- 日志记录:在关键步骤和异常处记录详细的日志,便于调试和监控。
总结
通过详细解析现有的 WebSocket 接口,你可以了解其工作流程和关键组件,并基于此实现新的 WebSocket 功能。关键在于:
- 理解现有的架构和逻辑:了解如何管理 WebSocket 连接、处理消息和维护聊天历史。
- 遵循一致的编码风格和结构:确保新功能与现有代码保持一致,提高代码的可读性和可维护性。
- 注重安全性和性能:确保所有 WebSocket 连接都经过认证,合理管理资源,优化性能。
- 充分测试和验证:通过自动化测试和手动测试,确保新功能的稳定性和正确性。
相关文章:
Python websocket
router.websocket(/chat/{flow_id}) 接口代码,并了解其工作流程、涉及的组件以及如何基于此实现你的新 WebSocket 接口。以下内容将分为几个部分进行讲解: 接口整体概述代码逐行解析关键组件和依赖关系如何基于此实现新功能示例:创建一个新的…...
正则表达式
正则表达式: 正则表达式区别于通配符,正则表达式是用来匹配文本的内容,命令的输出结果也属于文本内容。也可以使用正则表达式。 通配符用来匹配文件名和目录名。 grep用来过滤文本内容,以匹配要查询的结果。 linux的文本三剑客…...
机器学习——生成对抗网络(GANs):原理、进展与应用前景分析
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一. 生成对抗网络的基本原理二. 使用步骤2.1 对抗性训练2.2 损失函数 三. GAN的变种和进展四. 生成对抗网络的应用五. 持续挑战与未来发展方向六. 小结 前言 生…...
HTTPS 加密
HTTPS 加密技术 1. HTTPS 概述 HTTPS(HyperText Transfer Protocol Secure)是 HTTP 协议的安全版本,利用 SSL/TLS 协议对通信进行加密,确保数据的机密性、完整性和身份认证。HTTPS 在保护敏感数据的传输(如登录凭证、…...
Golang 构建学习
Golang 构建学习 如何搭建Golang开发环境 1. 下载GOlang包 https://golang.google.cn/dl/ 在地址上下载Golang 2. 配置包环境 修改全局环境变量,GOPROXY,GOPATH,GOROOT GOPROXYhttps://goproxy.cn,direct GOROOT"" // go二进…...
OpenCV相机标定与3D重建(7)鱼眼镜头立体校正的函数stereoRectify()的使用
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 cv::fisheye::stereoRectify 是 OpenCV 中用于鱼眼镜头立体校正的函数。该函数计算两个相机之间的校正变换,使得从两个相机拍摄的图像…...
JVM_垃圾收集器详解
1、 前言 JVM就是Java虚拟机,说白了就是为了屏蔽底层操作系统的不一致而设计出来的一个虚拟机,让用户更加专注上层,而不用在乎下层的一个产品。这就是JVM的跨平台,一次编译,到处运行。 而JVM中的核心功能其实就是自动…...
数据结构4——栈和队列
目录 1.栈 1.1.栈的概念及结构 1.2栈的实现 2.队列 2.1队列的概念及结构 2.2队列的实现 1.栈 1.1.栈的概念及结构 栈:一种特殊的线性表,其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一段称为栈顶,另一端称为…...
【AIGC】大模型面试高频考点-数据清洗篇
【AIGC】大模型面试高频考点-数据清洗篇 (一)常用文本清洗方法1.去除无用的符号2.去除表情符号3.文本只保留汉字4.中文繁体、简体转换5.删除 HTML 标签和特殊字符6.标记化7.小写8.停用词删除9.词干提取和词形还原10.处理缺失数据11.删除重复文本12.处理嘈…...
Java基于SSM框架的跑腿平台小程序【附源码、文档】
博主介绍:✌IT徐师兄、7年大厂程序员经历。全网粉丝15W、csdn博客专家、掘金/华为云//InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇dz…...
数据库连接池
在Java的多线程中,有线程池负责线程管理,类似线程池,在数据库中也有数据库连接池,负责数据库连接的管理。数据库连接池是一个容器。负责分配、管理数据库连接(Connection)。它允许应用程序重复使用一个现有…...
本地部署 WireGuard 无需公网 IP 实现异地组网
WireGuard 是一个高性能、极简且易于配置的开源虚拟组网协议。使用路由侠内网穿透使其相互通讯。 第一步,服务端(假设为公司电脑)和客户端(假设为公司外的电脑)安装部署 WireGuard 1,点此下载(…...
Educator头歌:离散数学 - 图论
第1关:图的概念 任务描述 本关任务:学习图的基本概念,完成相关练习。 相关知识 为了完成本关任务,你需要掌握:图的概念。 图的概念 1.一个图G是一个有序三元组G<V,R,ϕ>,其中V是非空顶点集合&am…...
axios的认识与基本使用
axios简介 Axios 是一个基于 promise 网络请求库,作用于node.js 和浏览器中。 它是 isomorphic 的(即同一套代码可以运行在浏览器和node.js中)。在服务端它使用原生 node.js http 模块, 而在客户端 (浏览端) 则使用 XMLHttpRequests。 主要特点 从浏览器创建 XML…...
springboot358智慧社区居家养老健康管理系统(论文+源码)_kaic
毕 业 设 计(论 文) 智慧社区居家养老健康管理系统设计与实现 摘 要 传统办法管理信息首先需要花费的时间比较多,其次数据出错率比较高,而且对错误的数据进行更改也比较困难,最后,检索数据费事费力。因此&…...
java-a+b 开启java语法学习
代码 (ab) import java.util.Scanner; //导入 java.util包中的Scanner 类,允许读取键盘输入数据public class Main { // 创建一个公共类 Mainpublic static void main(String[] args) {//程序入口点,main方法Scanner scanner new Scanner(…...
SpringAi整合免费大模型(NVIDIA)
接上回,发布了springAI整合本地大模型之后,我们来看看怎么去利用别人已经训练好的大模型。 如果对整合本地大模型感兴趣的,请阅读: SpringAI集成本地AI大模型ollama(调用篇)非常简单!…...
Flutter中的Future和Stream
在 Flutter 中,Future 和 Stream 都是用于处理异步操作的类,它们都基于 Dart 的异步编程模型,但是它们的使用场景和工作方式有所不同。以下是它们的区别以及各自适用的场景。 目录 一、Future1、基本使用2、异常处理1. catchError2. onError…...
Python将Excel文件转换为JSON文件
工作过程中,需要从 Excel 文件中读取数据,然后交给 Python 程序处理数据,中间需要把 Excel 文件读取出来转为 json 格式,再进行下一步数据处理。 这里我们使用pandas库,这是一个强大的数据分析工具,能够方便地读取和处理各种数据格式。需要注意的是还需要引入openpyxl库,…...
MySQL中EXPLAIN的介绍、作用、字段含义
MySQL中EXPLAIN的介绍、作用、字段含义 在MySQL中,EXPLAIN 是一个非常有用的命令,它可以帮助开发者和DBA理解查询执行计划,从而优化查询性能。EXPLAIN 可以模拟优化器执行SQL查询语句,而不真正执行这条语句,从而帮助用…...
Socket编程:UDP网络编程项目
目录 一、回显服务器 二、翻译器 三、聊天室 一、回显服务器 项目介绍:使用UDPIPv4协议进行Linux网络编程,实现回显服务器和客户端 功能介绍:客户端发送数据,经过服务端再返回到客户端,输出数据 源代码࿱…...
uniapp echarts tooltip formation 不识别html
需求: echarts 的tooltip 的域名太长,导致超出屏幕 想要让他换行 思路一: 用formation自定义样式实现换行 但是: uniapp 生成微信小程序, echart种的tooltip 的formation 识别不了html ,自定义样式没办…...
从0开始linux(39)——线程(2)线程控制
欢迎来到博主的专栏:从0开始linux 博主ID:代码小豪 文章目录 线程创建线程标识符线程参数多线程竞争资源 回收线程detach 线程退出pthread_cancel 线程创建 线程创建的函数为pthread_create。该函数是包含在posix线程库当中,posix线程是C语言…...
对载入的3dtiles进行旋转、平移和缩放变换。
使用 params: {tx: 129.75845, //模型中心X轴坐标(经度,单位:十进制度)//小左ty: 46.6839, //模型中心Y轴坐标(纬度,单位:十进制度)//小下tz: 28, //模型中心Z轴坐标(高…...
YOLO模型训练后的best.pt和last.pt区别
在选择YOLO模型训练后的权重文件best.pt和last.pt时,主要取决于具体的应用场景:12 best.pt:这个文件保存的是在训练过程中表现最好的模型权重。通常用于推理和部署阶段,因为它包含了在验证集上表现最好的模型权重&#x…...
Qt 项目中同时使用 CMAKE_AUTOUIC 和 UiTools 的注意事项
在 Qt 项目开发中,.ui 文件是界面设计的重要组成部分。开发者可以通过两种主要方式使用 .ui 文件: 编译期处理:通过 Qt 的 uic 工具将 .ui 文件转化为 C 代码(ui_xxx.h),静态绑定到项目中。运行时动态加载…...
不玩PS抠图了,改玩Python抠图
网上找了两个苏轼的印章图片: 把这两个印章抠出来的话,对于不少PS高手来说是相当容易,但是要去掉其中的水印,可能要用仿制图章慢慢描绘,图章的边缘也要慢慢勾画或者用通道抠图之类来处理,而且印章的红色也不…...
ubuntu 22.04 mini 安装,在配置网络时重启后配置文件被重置原因与解决方法
在 /etc/netplan/50-cloud-init.yaml 配置文件中有一段注释中有说明 rootlocalhost:/etc/netplan# cat 50-cloud-init.yaml # This file is generated from information provided by the datasource. Changes # to it will not persist across an instance reboot. To disab…...
【Go底层】time包Ticker定时器原理
目录 1、背景2、go版本3、源码解释【1】Ticker结构【2】NewTicker函数解释 4、代码示例5、总结 1、背景 说到定时器我们一般想到的库是cron,但是对于一些简单的定时任务场景,标准库time包下提供的定时器就足够我们使用,本篇文章我们就来研究…...
mac下Gpt Chrome升级成GptBrowser书签和保存的密码恢复
cd /Users/自己的用户名/Library/Application\ Support/ 目录下有 GPT\ Chrome/ Google/ GptBrowser/ GPT\ Chrome 为原来的chrome浏览器的文件存储目录. GptBrowser 为升级后chrome浏览器存储目录 书签所在的文件 Bookmarks 登录账号Login 相关的文件 拷贝到GptBrow…...
[Redis#6] list | 命令 | 应用 | 消息队列 | 微博 Timeline
目录 List 列表 特点 2. 命令 头插和尾插 下标 range 查询 头删和尾删 LINSERT LLEN LREM LTRIM LSET 阻塞命令 BLPOP BRPOP 操作 总结 3. 内部编码 ziplist(压缩列表) linkedlist(链表) ✔️quicklist(快速链…...
服务器数据恢复—raid6阵列硬盘被误重组为raid5阵列的数据恢复案例
服务器存储数据恢复环境: 存储中有一组由12块硬盘组建的RAID6阵列,上层linux操作系统EXT3文件系统,该存储划分3个LUN。 服务器存储故障&分析: 存储中RAID6阵列不可用。为了抢救数据,运维人员使用原始RAID中的部分…...
Xcode15(iOS17.4)打包的项目在 iOS12 系统上启动崩溃
0x00 启动崩溃 崩溃日志,只有 2 行,看不出啥来。 0x01 默认配置 由于我开发时,使用的 Xcode 14.1,打包在另外一台电脑 Xcode 15.3 Xcode 14.1 Build Settings -> Asset Catalog Compliter - Options Xcode 15.3 Build S…...
Netty的心跳机制怎么实现的?
大家好,我是锋哥。今天分享关于【Netty的心跳机制怎么实现的?】面试题。希望对大家有帮助; Netty的心跳机制怎么实现的? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Netty 的心跳机制用于维持客户端和服务器之间的…...
常用函数的使用错题汇总
目录 new/delete malloc/free1. 语言和类型2. 内存分配3. 内存释放4. 安全性和类型安全5. 其他特性总结 线程停止文件流 new/delete malloc/free malloc/free 和 new/delete 是 C/C 中用于动态内存管理的两种方式,它们有一些重要的区别。以下是这两种方式的比较&…...
使用 `aircrack-ng`扫描、获取握手包
使用 aircrack-ng 工具集来扫描 5GHz WiFi 网络的过程与扫描 2.4GHz 网络类似,但需要注意一些特定的配置和命令。以下是一个详细的步骤指南,帮助你在 5GHz 频段上扫描 WiFi 网络并捕获握手包。 ### 前提条件 1. **操作系统**:通常在 Linux 系…...
css—轮播图实现
一、背景 最近和朋友在一起讨论的时候,我们提出了这样的一个提问,难道轮播图的效果只能通过js来实现吗?经过我们的一系列的争论,发现了这是可以通过纯css来实现这一效果的,CSS轮播图也是一种常见的网页展示方式&#x…...
Ardusub源码剖析(1)——AP_Arming_Sub
代码 AP_Arming_Sub.h #pragma once#include <AP_Arming/AP_Arming.h>class AP_Arming_Sub : public AP_Arming { public:AP_Arming_Sub() : AP_Arming() { }/* Do not allow copies */CLASS_NO_COPY(AP_Arming_Sub);bool rc_calibration_checks(bool display_failure)…...
ESP32-S3模组上跑通ES8388(10)
接前一篇文章:ESP32-S3模组上跑通ES8388(9) 二、利用ESP-ADF操作ES8388 2. 详细解析 上一回解析了es8388_init函数中的第3段代码(也是实际与ES8388寄存器打交道的第1段代码),本回继续往下解析。为了便于理…...
AI/ML 基础知识与常用术语全解析
目录 一.引言 二.AI/ML 基础知识 1.人工智能(Artificial Intelligence,AI) (1).定义 (2).发展历程 (3).应用领域 2.机器学习(Machine Learning,ML) (1).定义 (2).学习方式 ①.监督学习 ②.无监督…...
【C#设计模式(15)——命令模式(Command Pattern)】
前言 命令模式的关键通过将请求封装成一个对象,使命令的发送者和接收者解耦。这种方式能更方便地添加新的命令,如执行命令的排队、延迟、撤销和重做等操作。 代码 #region 基础的命令模式 //命令(抽象类) public abstract class …...
Could not resolve com.android.tools.build:gradle:7.4.2.
Android Studio编译项目报错如下,始终无法下载解析7.4.2的gradle classpath A problem occurred configuring root project aistudyclient_questionlib. > Could not resolve all files for configuration :classpath.> Could not resolve com.android.tools…...
uniapp在App端定义全局弹窗,当打开关闭弹窗会触发onShow、onHide生命周期怎么解决?
在uniapp(App端)中实现自定义弹框,可以通过创建一个透明页面来实现。点击进入当前页面时,页面背景会变透明,用户可以根据自己的需求进行自定义,最终效果类似于弹框。 遇到问题:当打开弹窗(进入弹窗页面)就会触发当前页…...
2024 ccpc 辽宁省赛 E(构造 思维?)L(二分+一点点数论知识?)
E 题意: 可以注意到: 我的两种方格都四个方格的大小。 所以 如果存在一种摆放方式 那么 4|nm。 再考虑一种特殊的情况 22 ,此时虽然我的积是4 但是无法摆放的。 1>对于 4 | n,或者 4 | m.我直接摆放第二种方格就可以了。 如果我n 是4 的…...
IIC 随机写+多次写 可以控制写几次
verilog module icc_tx#(parameter SIZE 2 , //用来控制写多少次 比如地址是0000 一个地址只能存放8bit数据 超出指针就会到下一个地址0001parameter CLK_DIV 50_000_000 ,parameter SPEED 100_000 ,parameter LED 50 )( input wire c…...
基于SpringBoot+Vue的汽车票网上预订系统-无偿分享 (附源码+LW+调试)
目录 1. 项目技术 2. 功能菜单 3. 部分功能截图 4. 研究背景 5. 研究目的 6. 可行性分析 6.1 技术可行性 6.2 经济可行性 6.3 操作可行性 7. 系统设计 7.1 概述 7.2 系统流程和逻辑 7.3 系统结构 8. 数据库设计 8.1 数据库ER图 (1)公告信…...
net9 abp vnext 多语言通过数据库动态管理
通过数据库加载实现动态管理,用户可以自己修改界面显示的文本,满足国际化需求 如图所示,前端使用tdesign vnext 新建表TSYS_Localization与TSYS_LocalizationDetail 国旗图标下载网址flag-icons: Free Country Flags in SVG 在Shared下创建下图3个文件 …...
pip安装github上的开源软件包
1、若本机中安装的有git,可使用githttps方式安装 # 以安装pyfolio软件包为例,安装指令如下 pip install githttps://github.com/quantopian/pyfolio.git 2、若本机中没有安装git,可以直接使用软件包的zip地址进行安装 # 以安装pyfolio软件包为例,安装…...
【LeetCode刷题之路】120:三角形最小路径和的两种解法(动态规划优化)
LeetCode刷题记录 🌐 我的博客主页:iiiiiankor🎯 如果你觉得我的内容对你有帮助,不妨点个赞👍、留个评论✍,或者收藏⭐,让我们一起进步!📝 专栏系列:LeetCode…...
架构04-透明多级分流系统
零、文章目录 架构04-透明多级分流系统 1、透明多级分流系统 (1)概述 **定义:**透明多级分流系统是指在用户请求从客户端发出到最终查询或修改数据库信息的过程中,通过多个技术部件对流量进行合理分配,以提高系统的…...