connectors.tunnel_connector

Contains a connector base which handles communication via any spawned subprocess command that can run a tunnel dispatcher on the remote host.

class tunnel_connector.TunnelConnector

A connector that handles requests via an externally supplied subprocess running a tunnel dispatcher. Any subclass must override command().

def TunnelConnector.command()

def TunnelConnector.command(self) -> list[str]:

Returns the command that should be executed to open a tunnel dispatcher to the destination.

def TunnelConnector.open()

def TunnelConnector.open(self) -> None:

def TunnelConnector.close()

def TunnelConnector.close(self) -> None:

def TunnelConnector.run()

def TunnelConnector.run(self, command: list[str], 
                        input: Optional[bytes] = None, 
                        capture_output: bool = True, check: bool = True, 
                        user: Optional[str] = None, 
                        group: Optional[str] = None, 
                        umask: Optional[str] = None, 
                        cwd: Optional[str] = None
                        ) -> CompletedRemoteCommand:

def TunnelConnector.stat()

def TunnelConnector.stat(self, path: str, follow_links: bool = False, 
                         sha512sum: bool = False) -> Optional[StatResult]:

def TunnelConnector.resolve_user()

def TunnelConnector.resolve_user(self, user: Optional[str]) -> str:

def TunnelConnector.resolve_group()

def TunnelConnector.resolve_group(self, group: Optional[str]) -> str:

def TunnelConnector.query_user()

def TunnelConnector.query_user(self, user: str, 
                               query_password_hash: bool = False
                               ) -> UserEntry:

def TunnelConnector.query_group()

def TunnelConnector.query_group(self, group: str) -> GroupEntry:

def TunnelConnector.getenv()

def TunnelConnector.getenv(self, key: str) -> Optional[str]:

def TunnelConnector.upload()

def TunnelConnector.upload(self, file: str, content: bytes, 
                           mode: Optional[str] = None, 
                           owner: Optional[str] = None, 
                           group: Optional[str] = None) -> None:

def TunnelConnector.download()

def TunnelConnector.download(self, file: str) -> bytes:

Last updated