近期必须开发设计前面实际操作远程控制vm虚拟机的作用,通称WebShell。根据现阶段的技术栈是react django的客观事实,大家看到绝大多数的后面完成全是django 安全通道来完成WebSocket服务项目。

看了以后感觉不足有意思。翻阅django的官方网文本文档,发觉django自身并不兼容websocket,可是django3适用asgi协议书,能够自主完成websocket服务项目。

因而挑选guni corn uv icon asgi WebSocket django 3.2 paramiko来完成WebShell。

完成websocket服务项目。

根据应用django自身的钢管脚手架转化成的新项目将自动生成asgi.py和wsgi.py,大部分普遍的应用软件应用wsgi.py与Nginx协作布署在线客服。

此次关键用asgi.py完成websocket服务项目的构思,大概能够根据在网上快速寻找,主要是完成connect/send/receive/disconnect的处理方法。

这儿,怎样在沒有附加依靠的状况下向django应用软件加上webtcp协议(https://jaydenwindle.com/writing/django-webtcp协议-零依靠/)是一个非常好的事例,可是它太简易了...

思索

# asgi.py import os from django.core.asgi import get_asgi_application from websocket_app.websocket import websocket_application os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'websocket_app.settings') django_application = get_asgi_application() async def application(scope, receive, send): if scope['type'] == 'http': await django_application(scope, receive, send) elif scope['type'] == 'websocket': await websocket_application(scope, receive, send) else: raise NotImplementedError(f"Unknown scope type {scope['type']}") # websocket.py async def websocket_application(scope, receive, send): pass # websocket.py async def websocket_application(scope, receive, send): while True: event = await receive() if event['type'] == 'websocket.connect': await send({ 'type': 'websocket.accept' }) if event['type'] == 'websocket.disconnect': break if event['type'] == 'websocket.receive': if event['text'] == 'ping': await send({ 'type': 'websocket.send', 'text': 'pong!' })

完成

上边的编码给予了构思,为了更好地详细考虑,这儿能够参照一下。姜戈-3-1互联网电源插座(https://aliashkevich.com/websockets-in-django-3-1/)大部分能够器重。

我将关键完成一部分放到下边:

class WebSocket: def ._init._(self, scope, receive, send): self._scope = scope self._receive = receive self._send = send self._client_state = State.CONNECTING self._app_state = State.CONNECTING @property def headers(self): return Headers(self._scope) @property def scheme(self): return self._scope["scheme"] @property def path(self): return self._scope["path"] @property def query_params(self): return QueryParams(self._scope["query_string"].decode()) @property def query_string(self) -> str: return self._scope["query_string"] @property def scope(self): return self._scope async def accept(self, subprotocol: str = None): ""Accept connection. :param subprotocol: The subprotocol the server wishes to accept. :type subprotocol: str, optional "" if self._client_state == State.CONNECTING: await self.receive() await self.send({"type": SendEvent.ACCEPT, "subprotocol": subprotocol}) async def close(self, code: int = 1000): await self.send({"type": SendEvent.CLOSE, "code": code}) async def send(self, message: t.Mapping): if self._app_state == State.DISCONNECTED: raise RuntimeError("WebSocket is disconnected.") if self._app_state == State.CONNECTING: assert message["type"] in {SendEvent.ACCEPT, SendEvent.CLOSE}, ( 'Could not write event "%s" into socket in connecting state.' % message["type"] ) if message["type"] == SendEvent.CLOSE: self._app_state = State.DISCONNECTED else: self._app_state = State.CONNECTED elif self._app_state == State.CONNECTED: assert message["type"] in {SendEvent.SEND, SendEvent.CLOSE}, ( 'Connected socket can send "%s" and "%s" events, not "%s"' % (SendEvent.SEND, SendEvent.CLOSE, message["type"]) ) if message["type"] == SendEvent.CLOSE: self._app_state = State.DISCONNECTED await self._send(message) async def receive(self): if self._client_state == State.DISCONNECTED: raise RuntimeError("WebSocket is disconnected.") message = await self._receive() if self._client_state == State.CONNECTING: assert message["type"] == ReceiveEvent.CONNECT, ( 'WebSocket is in connecting state but received "%s" event' % message["type"] ) self._client_state = State.CONNECTED elif self._client_state == State.CONNECTED: assert message["type"] in {ReceiveEvent.RECEIVE, ReceiveEvent.DISCONNECT}, ( 'WebSocket is connected but received invalid event "%s".' % message["type"] ) if message["type"] == ReceiveEvent.DISCONNECT: self._client_state = State.DISCONNECTED return message

手术缝合妖怪

做为一个及格的编码搬运工人,为了更好地提升运送高效率,必须造一些车轮子来填一些坑。如何把上边的WebSocket类和paramiko融合起來,进而接纳前面到远程控制服务器的标识符,与此同时接纳回到?

import asyncio import traceback import paramiko from webshell.ssh import Base, RemoteSSH from webshell.connection import WebSocket class WebShell: ""梳理 WebSocket 和 paramiko.Channel,完成应用的数据信息相通"" def ._init._(self, ws_session: WebSocket, ssh_session: paramiko.SSHClient = None, chanel_session: paramiko.Channel = None ): self.ws_session = ws_session self.ssh_session = ssh_session self.chanel_session = chanel_session def init_ssh(self, host=None, port=22, user="admin", passwd="admin@123"): self.ssh_session, self.chanel_session = RemoteSSH(host, port, user, passwd).session() def set_ssh(self, ssh_session, chanel_session): self.ssh_session = ssh_session self.chanel_session = chanel_session async def ready(self): await self.ws_session.accept() async def welcome(self): # 展现Linux热烈欢迎相关内容 for i in range(2): if self.chanel_session.send_ready(): message = self.chanel_session.recv(2048).decode('utf-8') if not message: return await self.ws_session.send_text(message) async def web_to_ssh(self): # print('--------web_to_ssh------->') while True: # print('--------------->') if not self.chanel_session.active or not self.ws_session.status: return await asyncio.sleep(0.01) shell = await self.ws_session.receive_text() # print('-------shell-------->', shell) if self.chanel_session.active and self.chanel_session.send_ready(): self.chanel_session.send(bytes(shell, 'utf-8')) # print('--------------->', "end") async def ssh_to_web(self): # print('

评论(0条)

刀客源码 游客评论