Source code for asyncssh.agent

# Copyright (c) 2016 by Ron Frederick <ronf@timeheart.net>.
# All rights reserved.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v1.0 which accompanies this
# distribution and is available at:
#
#     http://www.eclipse.org/legal/epl-v10.html
#
# Contributors:
#     Ron Frederick - initial implementation, API, and documentation

"""SSH agent client"""

import asyncio
import os

import asyncssh

from .misc import ChannelOpenError, load_default_keypairs
from .packet import Byte, String, UInt32, PacketDecodeError, SSHPacket
from .public_key import SSHKeyPair


# pylint: disable=bad-whitespace

# Client request message numbers
SSH_AGENTC_REQUEST_IDENTITIES            = 11
SSH_AGENTC_SIGN_REQUEST                  = 13
SSH_AGENTC_ADD_IDENTITY                  = 17
SSH_AGENTC_REMOVE_IDENTITY               = 18
SSH_AGENTC_REMOVE_ALL_IDENTITIES         = 19
SSH_AGENTC_ADD_SMARTCARD_KEY             = 20
SSH_AGENTC_REMOVE_SMARTCARD_KEY          = 21
SSH_AGENTC_LOCK                          = 22
SSH_AGENTC_UNLOCK                        = 23
SSH_AGENTC_ADD_ID_CONSTRAINED            = 25
SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED = 26
SSH_AGENTC_EXTENSION                     = 27

# Agent response message numbers
SSH_AGENT_FAILURE                        = 5
SSH_AGENT_SUCCESS                        = 6
SSH_AGENT_IDENTITIES_ANSWER              = 12
SSH_AGENT_SIGN_RESPONSE                  = 14
SSH_AGENT_EXTENSION_FAILURE              = 28

# SSH agent constraint numbers
SSH_AGENT_CONSTRAIN_LIFETIME             = 1
SSH_AGENT_CONSTRAIN_CONFIRM              = 2
SSH_AGENT_CONSTRAIN_EXTENSION            = 3

# SSH agent signature flags
SSH_AGENT_RSA_SHA2_256                   = 2
SSH_AGENT_RSA_SHA2_512                   = 4

# pylint: enable=bad-whitespace


[docs]class SSHAgentKeyPair(SSHKeyPair): """Surrogate for a key managed by the SSH agent""" _key_type = 'agent' def __init__(self, agent, algorithm, public_data, comment): super().__init__(algorithm, comment) self._agent = agent self.public_data = public_data self._cert = algorithm.endswith(b'-cert-v01@openssh.com') self._flags = 0 if self._cert: self.sig_algorithm = algorithm[:-21] else: self.sig_algorithm = algorithm if self.sig_algorithm == b'ssh-rsa': self.sig_algorithms = (b'rsa-sha2-256', b'rsa-sha2-512', b'ssh-rsa') else: self.sig_algorithms = (self.sig_algorithm,) if self._cert: self.host_key_algorithms = (algorithm,) else: self.host_key_algorithms = self.sig_algorithms def set_sig_algorithm(self, sig_algorithm): """Set the signature algorithm to use when signing data""" self.sig_algorithm = sig_algorithm if not self._cert: self.algorithm = sig_algorithm if sig_algorithm == b'rsa-sha2-256': self._flags |= SSH_AGENT_RSA_SHA2_256 elif sig_algorithm == b'rsa-sha2-512': self._flags |= SSH_AGENT_RSA_SHA2_512 @asyncio.coroutine
[docs] def sign(self, data): """Sign a block of data with this private key""" return (yield from self._agent.sign(self.public_data, data, self._flags))
@asyncio.coroutine
[docs] def remove(self): """Remove this key pair from the agent""" yield from self._agent.remove_keys([self])
[docs]class SSHAgentClient: """SSH agent client""" def __init__(self, loop, agent_path): self._loop = loop self._agent_path = agent_path self._reader = None self._writer = None self._lock = asyncio.Lock() def _cleanup(self): """Clean up this SSH agent client""" if self._writer: self._writer.close() self._reader = None self._writer = None @staticmethod def encode_constraints(lifetime, confirm): """Encode key constraints""" result = b'' if lifetime: result += Byte(SSH_AGENT_CONSTRAIN_LIFETIME) + UInt32(lifetime) if confirm: result += Byte(SSH_AGENT_CONSTRAIN_CONFIRM) return result @asyncio.coroutine def connect(self): """Connect to the SSH agent""" if isinstance(self._agent_path, str): # pylint doesn't think open_unix_connection exists # pylint: disable=no-member self._reader, self._writer = \ yield from asyncio.open_unix_connection(self._agent_path, loop=self._loop) else: self._reader, self._writer = \ yield from self._agent_path.open_agent_connection() @asyncio.coroutine def _make_request(self, msgtype, *args): """Send an SSH agent request""" with (yield from self._lock): try: if not self._writer: yield from self.connect() payload = Byte(msgtype) + b''.join(args) self._writer.write(UInt32(len(payload)) + payload) resplen = yield from self._reader.readexactly(4) resplen = int.from_bytes(resplen, 'big') resp = yield from self._reader.readexactly(resplen) resp = SSHPacket(resp) resptype = resp.get_byte() return resptype, resp except (OSError, EOFError, PacketDecodeError) as exc: self._cleanup() raise ValueError(str(exc)) from None @asyncio.coroutine
[docs] def get_keys(self): """Request the available client keys This method is a coroutine which returns a list of client keys available in the ssh-agent. :returns: A list of :class:`SSHKeyPair` objects """ resptype, resp = \ yield from self._make_request(SSH_AGENTC_REQUEST_IDENTITIES) if resptype == SSH_AGENT_IDENTITIES_ANSWER: result = [] num_keys = resp.get_uint32() for _ in range(num_keys): key_blob = resp.get_string() comment = resp.get_string() packet = SSHPacket(key_blob) algorithm = packet.get_string() result.append(SSHAgentKeyPair(self, algorithm, key_blob, comment)) resp.check_end() return result else: raise ValueError('Unknown SSH agent response: %d' % resptype)
@asyncio.coroutine def sign(self, key_blob, data, flags=0): """Sign a block of data with the requested key""" resptype, resp = \ yield from self._make_request(SSH_AGENTC_SIGN_REQUEST, String(key_blob), String(data), UInt32(flags)) if resptype == SSH_AGENT_SIGN_RESPONSE: sig = resp.get_string() resp.check_end() return sig elif resptype == SSH_AGENT_FAILURE: raise ValueError('Unable to sign with requested key') else: raise ValueError('Unknown SSH agent response: %d' % resptype) @asyncio.coroutine
[docs] def add_keys(self, keylist=(), passphrase=None, lifetime=None, confirm=False): """Add keys to the agent This method adds a list of local private keys and optional matching certificates to the agent. :param keylist: (optional) The list of keys to add. If not specified, an attempt will be made to load keys from the files :file:`.ssh/id_ed25519`, :file:`.ssh/id_ecdsa`, :file:`.ssh/id_rsa` and :file:`.ssh/id_dsa` in the user's home directory with optional matching certificates loaded from the files :file:`.ssh/id_ed25519-cert.pub`, :file:`.ssh/id_ecdsa-cert.pub`, :file:`.ssh/id_rsa-cert.pub`, and :file:`.ssh/id_dsa-cert.pub`. :param str passphrase: (optional) The passphrase to use to decrypt the keys. :param lifetime: (optional) The time in seconds after which the keys should be automatically deleted, or ``None`` to store these keys indefinitely (the default). :param bool confirm: (optional) Whether or not to require confirmation for each private key operation which uses these keys, defaulting to ``False``. :type keylist: *see* :ref:`SpecifyingPrivateKeys` :type lifetime: `int` or ``None`` :raises: :exc:`ValueError` if the keys cannot be added """ if keylist: keypairs = asyncssh.load_keypairs(keylist, passphrase) else: keypairs = load_default_keypairs(passphrase) constraints = self.encode_constraints(lifetime, confirm) msgtype = SSH_AGENTC_ADD_ID_CONSTRAINED if constraints else \ SSH_AGENTC_ADD_IDENTITY for keypair in keypairs: comment = keypair.get_comment() resptype, resp = \ yield from self._make_request(msgtype, keypair.get_agent_private_key(), String(comment or ''), constraints) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Unable to add key') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
@asyncio.coroutine
[docs] def add_smartcard_keys(self, provider, pin=None, lifetime=None, confirm=False): """Store keys associated with a smart card in the agent :param str provider: The name of the smart card provider :param pin: (optional) The PIN to use to unlock the smart card :param lifetime: (optional) The time in seconds after which the keys should be automatically deleted, or ``None`` to store these keys indefinitely (the default). :param bool confirm: (optional) Whether or not to require confirmation for each private key operation which uses these keys, defaulting to ``False``. :type pin: `str` or ``None`` :type lifetime: `int` or ``None`` :raises: :exc:`ValueError` if the keys cannot be added """ constraints = self.encode_constraints(lifetime, confirm) msgtype = SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED \ if constraints else SSH_AGENTC_ADD_SMARTCARD_KEY resptype, resp = \ yield from self._make_request(msgtype, String(provider), String(pin or ''), constraints) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Unable to add keys') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
@asyncio.coroutine
[docs] def remove_keys(self, keylist): """Remove a key stored in the agent :param keylist: The list of keys to remove. :type keylist: list of :class:`SSHKeyPair` :raises: :exc:`ValueError` if any keys are not found """ for keypair in keylist: resptype, resp = \ yield from self._make_request(SSH_AGENTC_REMOVE_IDENTITY, String(keypair.public_data)) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Key not found') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
@asyncio.coroutine
[docs] def remove_smartcard_keys(self, provider, pin=None): """Remove keys associated with a smart card stored in the agent :param str provider: The name of the smart card provider :param pin: (optional) The PIN to use to unlock the smart card :type pin: `str` or ``None`` :raises: :exc:`ValueError` if the keys are not found """ resptype, resp = \ yield from self._make_request(SSH_AGENTC_REMOVE_SMARTCARD_KEY, String(provider), String(pin or '')) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Keys not found') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
@asyncio.coroutine
[docs] def remove_all(self): """Remove all keys stored in the agent :raises: :exc:`ValueError` if the keys can't be removed """ resptype, resp = \ yield from self._make_request(SSH_AGENTC_REMOVE_ALL_IDENTITIES) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Unable to remove all keys') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
@asyncio.coroutine
[docs] def lock(self, passphrase): """Lock the agent using the specified passphrase :param str passphrase: The passphrase required to later unlock the agent :raises: :exc:`ValueError` if the agent can't be locked """ resptype, resp = yield from self._make_request(SSH_AGENTC_LOCK, String(passphrase)) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Unable to lock SSH agent') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
@asyncio.coroutine
[docs] def unlock(self, passphrase): """Unlock the agent using the specified passphrase :param str passphrase: The passphrase to use to unlock the agent :raises: :exc:`ValueError` if the agent can't be unlocked """ resptype, resp = yield from self._make_request(SSH_AGENTC_UNLOCK, String(passphrase)) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Unable to unlock SSH agent') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
@asyncio.coroutine
[docs] def query_extensions(self): """Return a list of extensions supported by the agent :returns: A list of strings of supported extension names """ resptype, resp = yield from self._make_request(SSH_AGENTC_EXTENSION, String('query')) if resptype == SSH_AGENT_SUCCESS: result = [] while resp: exttype = resp.get_string() try: exttype = exttype.decode('utf-8') except UnicodeDecodeError: raise ValueError('Invalid extension type name') result.append(exttype) return result elif resptype == SSH_AGENT_FAILURE: return [] else: raise ValueError('Unknown SSH agent response: %d' % resptype)
[docs] def close(self): """Close the SSH agent connection This method closes the connection to the ssh-agent. Any attempts to use this :class:`SSHAgentClient` or the key pairs it previously returned will result in an error. """ self._cleanup()
@asyncio.coroutine
[docs]def connect_agent(agent_path=None, *, loop=None): """Make a connection to the SSH agent This function attempts to connect to an ssh-agent process listening on a UNIX domain socket at ``agent_path``. If not provided, it will attempt to get the path from the ``SSH_AUTH_SOCK`` environment variable. If the connection is successful, an ``SSHAgentClient`` object is returned that has methods on it you can use to query the ssh-agent. If no path is specified and the environment variable is not set or the connection to the agent fails, this function returns ``None``. :param agent_path: (optional) The path to use to contact the ssh-agent process, or the :class:`SSHServerConnection` to forward the agent request over. :param loop: (optional) The event loop to use when creating the connection. If not specified, the default event loop is used. :type agent_path: str or :class:`SSHServerConnection` :returns: An :class:`SSHAgentClient` or ``None`` """ if not loop: loop = asyncio.get_event_loop() if not agent_path: agent_path = os.environ.get('SSH_AUTH_SOCK', None) if not agent_path: return None agent = SSHAgentClient(loop, agent_path) try: yield from agent.connect() return agent except (OSError, ChannelOpenError): return None