# Copyright (c) 2020 Jeff Irion and contributors
#
# This file is part of the aio-adb-shell package. It incorporates work
# covered by the following license notice:
#
#
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implement the :class:`AdbDevice` class, which can connect to a device and run ADB shell commands.
.. rubric:: Contents
* :class:`_AdbTransactionInfo`
* :class:`_FileSyncTransactionInfo`
* :meth:`_FileSyncTransactionInfo.can_add_to_send_buffer`
* :class:`AdbDevice`
* :meth:`AdbDevice._close`
* :meth:`AdbDevice._filesync_flush`
* :meth:`AdbDevice._filesync_read`
* :meth:`AdbDevice._filesync_read_buffered`
* :meth:`AdbDevice._filesync_read_until`
* :meth:`AdbDevice._filesync_send`
* :meth:`AdbDevice._handle_progress`
* :meth:`AdbDevice._okay`
* :meth:`AdbDevice._open`
* :meth:`AdbDevice._pull`
* :meth:`AdbDevice._push`
* :meth:`AdbDevice._read`
* :meth:`AdbDevice._read_until`
* :meth:`AdbDevice._read_until_close`
* :meth:`AdbDevice._send`
* :meth:`AdbDevice._service`
* :meth:`AdbDevice._streaming_command`
* :meth:`AdbDevice._streaming_service`
* :meth:`AdbDevice._write`
* :attr:`AdbDevice.available`
* :meth:`AdbDevice.close`
* :meth:`AdbDevice.connect`
* :meth:`AdbDevice.list`
* :meth:`AdbDevice.pull`
* :meth:`AdbDevice.push`
* :meth:`AdbDevice.root`
* :meth:`AdbDevice.shell`
* :meth:`AdbDevice.stat`
* :meth:`AdbDevice.streaming_shell`
* :class:`AdbDeviceTcp`
"""
from collections import namedtuple
from contextlib import contextmanager
import io
import logging
import os
import socket
import struct
import time
from . import constants
from . import exceptions
from .adb_message import AdbMessage, checksum, unpack
from .handle.base_handle import BaseHandle
from .handle.tcp_handle import TcpHandle
try:
FILE_TYPES = (file, io.IOBase)
except NameError: # pragma: no cover
FILE_TYPES = (io.IOBase,)
_LOGGER = logging.getLogger(__name__)
DeviceFile = namedtuple('DeviceFile', ['filename', 'mode', 'size', 'mtime'])
[docs]@contextmanager
def _open(name, mode='r'):
"""Handle opening and closing of files and IO streams.
Parameters
----------
name : str, io.IOBase
The name of the file *or* an IO stream
mode : str
The mode for opening the file
Yields
------
io.IOBase
The opened file *or* the IO stream
"""
try:
opened = open(name, mode) if isinstance(name, str) else None
if isinstance(name, str):
yield opened
else:
yield name
finally:
if isinstance(name, str):
opened.close()
else:
name.close()
[docs]class _AdbTransactionInfo(object): # pylint: disable=too-few-public-methods
"""A class for storing info and settings used during a single ADB "transaction."
Parameters
----------
local_id : int
The ID for the sender (i.e., the device running this code)
remote_id : int
The ID for the recipient
timeout_s : float, None
Timeout in seconds for sending and receiving packets, or ``None``; see :meth:`BaseHandle.bulk_read() <adb_shell.handle.base_handle.BaseHandle.bulk_read>`
and :meth:`BaseHandle.bulk_write() <adb_shell.handle.base_handle.BaseHandle.bulk_write>`
total_timeout_s : float
The total time in seconds to wait for a command in ``expected_cmds`` in :meth:`AdbDevice._read`
Attributes
----------
local_id : int
The ID for the sender (i.e., the device running this code)
remote_id : int
The ID for the recipient
timeout_s : float, None
Timeout in seconds for sending and receiving packets, or ``None``; see :meth:`BaseHandle.bulk_read() <adb_shell.handle.base_handle.BaseHandle.bulk_read>`
and :meth:`BaseHandle.bulk_write() <adb_shell.handle.base_handle.BaseHandle.bulk_write>`
total_timeout_s : float
The total time in seconds to wait for a command in ``expected_cmds`` in :meth:`AdbDevice._read`
"""
def __init__(self, local_id, remote_id, timeout_s=None, total_timeout_s=constants.DEFAULT_TOTAL_TIMEOUT_S):
self.local_id = local_id
self.remote_id = remote_id
self.timeout_s = timeout_s
self.total_timeout_s = total_timeout_s
[docs]class _FileSyncTransactionInfo(object): # pylint: disable=too-few-public-methods
"""A class for storing info used during a single FileSync "transaction."
Parameters
----------
recv_message_format : bytes
The FileSync message format
Attributes
----------
recv_buffer : bytearray
A buffer for storing received data
recv_message_format : bytes
The FileSync message format
recv_message_size : int
The FileSync message size
send_buffer : bytearray
A buffer for storing data to be sent
send_idx : int
The index in ``recv_buffer`` that will be the start of the next data packet sent
"""
def __init__(self, recv_message_format):
self.send_buffer = bytearray(constants.MAX_ADB_DATA)
self.send_idx = 0
self.recv_buffer = bytearray()
self.recv_message_format = recv_message_format
self.recv_message_size = struct.calcsize(recv_message_format)
[docs] def can_add_to_send_buffer(self, data_len):
"""Determine whether ``data_len`` bytes of data can be added to the send buffer without exceeding :const:`constants.MAX_ADB_DATA`.
Parameters
----------
data_len : int
The length of the data to be potentially added to the send buffer (not including the length of its header)
Returns
-------
bool
Whether ``data_len`` bytes of data can be added to the send buffer without exceeding :const:`constants.MAX_ADB_DATA`
"""
added_len = self.recv_message_size + data_len
return self.send_idx + added_len < constants.MAX_ADB_DATA
[docs]class AdbDevice(object):
"""A class with methods for connecting to a device and executing ADB commands.
Parameters
----------
handle : BaseHandle
A user-provided handle for communicating with the device; must be an instance of a subclass of :class:`~adb_shell.handle.base_handle.BaseHandle`
banner : str, bytes, None
The hostname of the machine where the Python interpreter is currently running; if
it is not provided, it will be determined via ``socket.gethostname()``
Raises
------
adb_shell.exceptions.InvalidHandleError
The passed ``handle`` is not an instance of a subclass of :class:`~adb_shell.handle.base_handle.BaseHandle`
Attributes
----------
_available : bool
Whether an ADB connection to the device has been established
_banner : bytearray, bytes
The hostname of the machine where the Python interpreter is currently running
_handle : BaseHandle
The handle that is used to connect to the device; must be a subclass of :class:`~adb_shell.handle.base_handle.BaseHandle`
"""
def __init__(self, handle, banner=None):
if banner:
if not isinstance(banner, (bytes, bytearray)):
self._banner = bytearray(banner, 'utf-8')
else:
self._banner = banner
else:
try:
# TODO: make this async / don't do I/O
self._banner = bytearray(socket.gethostname(), 'utf-8')
except: # noqa pylint: disable=bare-except
self._banner = bytearray('unknown', 'utf-8')
if not isinstance(handle, BaseHandle):
raise exceptions.InvalidHandleError("`handle` must be an instance of a subclass of `BaseHandle`")
self._handle = handle
self._available = False
@property
def available(self):
"""Whether or not an ADB connection to the device has been established.
Returns
-------
bool
``self._available``
"""
return self._available
[docs] async def close(self):
"""Close the connection via the provided handle's ``close()`` method.
"""
self._available = False
await self._handle.close()
[docs] async def connect(self, rsa_keys=None, timeout_s=None, auth_timeout_s=constants.DEFAULT_AUTH_TIMEOUT_S, total_timeout_s=constants.DEFAULT_TOTAL_TIMEOUT_S, auth_callback=None):
"""Establish an ADB connection to the device.
1. Use the handle to establish a connection
2. Send a ``b'CNXN'`` message
3. Unpack the ``cmd``, ``arg0``, ``arg1``, and ``banner`` fields from the response
4. If ``cmd`` is not ``b'AUTH'``, then authentication is not necesary and so we are done
5. If no ``rsa_keys`` are provided, raise an exception
6. Loop through our keys, signing the last ``banner`` that we received
1. If the last ``arg0`` was not :const:`adb_shell.constants.AUTH_TOKEN`, raise an exception
2. Sign the last ``banner`` and send it in an ``b'AUTH'`` message
3. Unpack the ``cmd``, ``arg0``, and ``banner`` fields from the response via :func:`adb_shell.adb_message.unpack`
4. If ``cmd`` is ``b'CNXN'``, return ``banner``
7. None of the keys worked, so send ``rsa_keys[0]``'s public key; if the response does not time out, we must have connected successfully
Parameters
----------
rsa_keys : list, None
A list of signers of type :class:`~adb_shell.auth.sign_cryptography.CryptographySigner`,
:class:`~adb_shell.auth.sign_pycryptodome.PycryptodomeAuthSigner`, or :class:`~adb_shell.auth.sign_pythonrsa.PythonRSASigner`
timeout_s : float, None
Timeout in seconds for sending and receiving packets, or ``None``; see :meth:`BaseHandle.bulk_read() <adb_shell.handle.base_handle.BaseHandle.bulk_read>`
and :meth:`BaseHandle.bulk_write() <adb_shell.handle.base_handle.BaseHandle.bulk_write>`
auth_timeout_s : float, None
The time in seconds to wait for a ``b'CNXN'`` authentication response
total_timeout_s : float
The total time in seconds to wait for expected commands in :meth:`AdbDevice._read`
auth_callback : function, None
Function callback invoked when the connection needs to be accepted on the device
Returns
-------
bool
Whether the connection was established (:attr:`AdbDevice.available`)
Raises
------
adb_shell.exceptions.DeviceAuthError
Device authentication required, no keys available
adb_shell.exceptions.InvalidResponseError
Invalid auth response from the device
"""
# 1. Use the handle to establish a connection
await self._handle.close()
await self._handle.connect(timeout_s)
# 2. Send a ``b'CNXN'`` message
msg = AdbMessage(constants.CNXN, constants.VERSION, constants.MAX_ADB_DATA, b'host::%s\0' % self._banner)
adb_info = _AdbTransactionInfo(None, None, timeout_s, total_timeout_s)
await self._send(msg, adb_info)
# 3. Unpack the ``cmd``, ``arg0``, ``arg1``, and ``banner`` fields from the response
cmd, arg0, arg1, banner = await self._read([constants.AUTH, constants.CNXN], adb_info)
# 4. If ``cmd`` is not ``b'AUTH'``, then authentication is not necesary and so we are done
if cmd != constants.AUTH:
self._available = True
return True # return banner
# 5. If no ``rsa_keys`` are provided, raise an exception
if not rsa_keys:
await self._handle.close()
raise exceptions.DeviceAuthError('Device authentication required, no keys available.')
# 6. Loop through our keys, signing the last ``banner`` that we received
for rsa_key in rsa_keys:
# 6.1. If the last ``arg0`` was not :const:`adb_shell.constants.AUTH_TOKEN`, raise an exception
if arg0 != constants.AUTH_TOKEN:
await self._handle.close()
raise exceptions.InvalidResponseError('Unknown AUTH response: %s %s %s' % (arg0, arg1, banner))
# 6.2. Sign the last ``banner`` and send it in an ``b'AUTH'`` message
signed_token = rsa_key.Sign(banner)
msg = AdbMessage(constants.AUTH, constants.AUTH_SIGNATURE, 0, signed_token)
await self._send(msg, adb_info)
# 6.3. Unpack the ``cmd``, ``arg0``, and ``banner`` fields from the response via :func:`adb_shell.adb_message.unpack`
cmd, arg0, _, banner = await self._read([constants.CNXN, constants.AUTH], adb_info)
# 6.4. If ``cmd`` is ``b'CNXN'``, return ``banner``
if cmd == constants.CNXN:
self._available = True
return True # return banner
# 7. None of the keys worked, so send ``rsa_keys[0]``'s public key; if the response does not time out, we must have connected successfully
pubkey = rsa_keys[0].GetPublicKey()
if not isinstance(pubkey, (bytes, bytearray)):
pubkey = bytearray(pubkey, 'utf-8')
if auth_callback is not None:
auth_callback(self)
msg = AdbMessage(constants.AUTH, constants.AUTH_RSAPUBLICKEY, 0, pubkey + b'\0')
await self._send(msg, adb_info)
adb_info.timeout_s = auth_timeout_s
cmd, arg0, _, banner = await self._read([constants.CNXN], adb_info)
self._available = True
return True # return banner
# ======================================================================= #
# #
# Services #
# #
# ======================================================================= #
[docs] async def _service(self, service, command, timeout_s=None, total_timeout_s=constants.DEFAULT_TOTAL_TIMEOUT_S, decode=True):
"""Send an ADB command to the device.
Parameters
----------
service : bytes
The ADB service to talk to (e.g., ``b'shell'``)
command : bytes
The command that will be sent
timeout_s : float, None
Timeout in seconds for sending and receiving packets, or ``None``; see :meth:`BaseHandle.bulk_read() <adb_shell.handle.base_handle.BaseHandle.bulk_read>`
and :meth:`BaseHandle.bulk_write() <adb_shell.handle.base_handle.BaseHandle.bulk_write>`
total_timeout_s : float
The total time in seconds to wait for a ``b'CLSE'`` or ``b'OKAY'`` command in :meth:`AdbDevice._read`
decode : bool
Whether to decode the output to utf8 before returning
Returns
-------
bytes, str
The output of the ADB command as a string if ``decode`` is True, otherwise as bytes.
"""
adb_info = _AdbTransactionInfo(None, None, timeout_s, total_timeout_s)
if decode:
return b''.join([x async for x in self._streaming_command(service, command, adb_info)]).decode('utf8')
return b''.join([x async for x in self._streaming_command(service, command, adb_info)])
[docs] async def _streaming_service(self, service, command, timeout_s=None, total_timeout_s=constants.DEFAULT_TOTAL_TIMEOUT_S, decode=True):
"""Send an ADB command to the device, yielding each line of output.
Parameters
----------
service : bytes
The ADB service to talk to (e.g., ``b'shell'``)
command : bytes
The command that will be sent
timeout_s : float, None
Timeout in seconds for sending and receiving packets, or ``None``; see :meth:`BaseHandle.bulk_read() <adb_shell.handle.base_handle.BaseHandle.bulk_read>`
and :meth:`BaseHandle.bulk_write() <adb_shell.handle.base_handle.BaseHandle.bulk_write>`
total_timeout_s : float
The total time in seconds to wait for a ``b'CLSE'`` or ``b'OKAY'`` command in :meth:`AdbDevice._read`
decode : bool
Whether to decode the output to utf8 before returning
Yields
-------
bytes, str
The line-by-line output of the ADB command as a string if ``decode`` is True, otherwise as bytes.
"""
adb_info = _AdbTransactionInfo(None, None, timeout_s, total_timeout_s)
stream = self._streaming_command(service, command, adb_info)
if decode:
async for line in (stream_line.decode('utf8') async for stream_line in stream):
yield line
else:
async for line in stream:
yield line
[docs] async def root(self, timeout_s=None, total_timeout_s=constants.DEFAULT_TOTAL_TIMEOUT_S):
"""Gain root access.
The device must be rooted in order for this to work.
Parameters
----------
timeout_s : float, None
Timeout in seconds for sending and receiving packets, or ``None``; see :meth:`BaseHandle.bulk_read() <adb_shell.handle.base_handle.BaseHandle.bulk_read>`
and :meth:`BaseHandle.bulk_write() <adb_shell.handle.base_handle.BaseHandle.bulk_write>`
total_timeout_s : float
The total time in seconds to wait for a ``b'CLSE'`` or ``b'OKAY'`` command in :meth:`AdbDevice._read`
"""
if not self.available:
raise exceptions.AdbConnectionError("ADB command not sent because a connection to the device has not been established. (Did you call `AdbDevice.connect()`?)")
await self._service(b'root', b'', timeout_s, total_timeout_s, False)
[docs] async def shell(self, command, timeout_s=None, total_timeout_s=constants.DEFAULT_TOTAL_TIMEOUT_S, decode=True):
"""Send an ADB shell command to the device.
Parameters
----------
command : str
The shell command that will be sent
timeout_s : float, None
Timeout in seconds for sending and receiving packets, or ``None``; see :meth:`BaseHandle.bulk_read() <adb_shell.handle.base_handle.BaseHandle.bulk_read>`
and :meth:`BaseHandle.bulk_write() <adb_shell.handle.base_handle.BaseHandle.bulk_write>`
total_timeout_s : float
The total time in seconds to wait for a ``b'CLSE'`` or ``b'OKAY'`` command in :meth:`AdbDevice._read`
decode : bool
Whether to decode the output to utf8 before returning
Returns
-------
bytes, str
The output of the ADB shell command as a string if ``decode`` is True, otherwise as bytes.
"""
if not self.available:
raise exceptions.AdbConnectionError("ADB command not sent because a connection to the device has not been established. (Did you call `AdbDevice.connect()`?)")
return await self._service(b'shell', command.encode('utf8'), timeout_s, total_timeout_s, decode)
[docs] async def streaming_shell(self, command, timeout_s=None, total_timeout_s=constants.DEFAULT_TOTAL_TIMEOUT_S, decode=True):
"""Send an ADB shell command to the device, yielding each line of output.
Parameters
----------
command : str
The shell command that will be sent
timeout_s : float, None
Timeout in seconds for sending and receiving packets, or ``None``; see :meth:`BaseHandle.bulk_read() <adb_shell.handle.base_handle.BaseHandle.bulk_read>`
and :meth:`BaseHandle.bulk_write() <adb_shell.handle.base_handle.BaseHandle.bulk_write>`
total_timeout_s : float
The total time in seconds to wait for a ``b'CLSE'`` or ``b'OKAY'`` command in :meth:`AdbDevice._read`
decode : bool
Whether to decode the output to utf8 before returning
Yields
-------
bytes, str
The line-by-line output of the ADB shell command as a string if ``decode`` is True, otherwise as bytes.
"""
if not self.available:
raise exceptions.AdbConnectionError("ADB command not sent because a connection to the device has not been established. (Did you call `AdbDevice.connect()`?)")
async for line in self._streaming_service(b'shell', command.encode('utf8'), timeout_s, total_timeout_s, decode):
yield line
# ======================================================================= #
# #
# FileSync #
# #
# ======================================================================= #
[docs] async def list(self, device_path, timeout_s=None, total_timeout_s=constants.DEFAULT_TOTAL_TIMEOUT_S):
"""Return a directory listing of the given path.
Parameters
----------
device_path : str
Directory to list.
timeout_s : float, None
Expected timeout for any part of the pull.
total_timeout_s : float
The total time in seconds to wait for a ``b'CLSE'`` or ``b'OKAY'`` command in :meth:`AdbDevice._read`
Returns
-------
files : list[DeviceFile]
Filename, mode, size, and mtime info for the files in the directory
"""
if not self.available:
raise exceptions.AdbConnectionError("ADB command not sent because a connection to the device has not been established. (Did you call `AdbDevice.connect()`?)")
adb_info = _AdbTransactionInfo(None, None, timeout_s, total_timeout_s)
filesync_info = _FileSyncTransactionInfo(constants.FILESYNC_LIST_FORMAT)
await self._open(b'sync:', adb_info)
await self._filesync_send(constants.LIST, adb_info, filesync_info, data=device_path)
files = []
async for cmd_id, header, filename in self._filesync_read_until([constants.DENT], [constants.DONE], adb_info, filesync_info):
if cmd_id == constants.DONE:
break
mode, size, mtime = header
files.append(DeviceFile(filename, mode, size, mtime))
await self._close(adb_info)
return files
[docs] async def pull(self, device_filename, dest_file=None, progress_callback=None, timeout_s=None, total_timeout_s=constants.DEFAULT_TOTAL_TIMEOUT_S):
"""Pull a file from the device.
Parameters
----------
device_filename : str
Filename on the device to pull.
dest_file : str, file, io.IOBase, None
If set, a filename or writable file-like object.
progress_callback : function, None
Callback method that accepts filename, bytes_written and total_bytes, total_bytes will be -1 for file-like
objects
timeout_s : float, None
Expected timeout for any part of the pull.
total_timeout_s : float
The total time in seconds to wait for a ``b'CLSE'`` or ``b'OKAY'`` command in :meth:`AdbDevice._read`
Returns
-------
bytes, bool
The file data if ``dest_file`` is not set. Otherwise, ``True`` if the destination file exists
Raises
------
ValueError
If ``dest_file`` is of unknown type.
"""
if not self.available:
raise exceptions.AdbConnectionError("ADB command not sent because a connection to the device has not been established. (Did you call `AdbDevice.connect()`?)")
if not dest_file:
dest_file = io.BytesIO()
if not isinstance(dest_file, FILE_TYPES + (str,)):
raise ValueError("dest_file is of unknown type")
adb_info = _AdbTransactionInfo(None, None, timeout_s, total_timeout_s)
filesync_info = _FileSyncTransactionInfo(constants.FILESYNC_PULL_FORMAT)
with _open(dest_file, 'wb') as dest:
await self._open(b'sync:', adb_info)
await self._pull(device_filename, dest, progress_callback, adb_info, filesync_info)
await self._close(adb_info)
if isinstance(dest, io.BytesIO):
return dest.getvalue()
if hasattr(dest, 'name'):
return os.path.exists(dest.name)
# We don't know what the path is, so we just assume it exists.
return True
[docs] async def _pull(self, filename, dest, progress_callback, adb_info, filesync_info):
"""Pull a file from the device into the file-like ``dest_file``.
Parameters
----------
filename : str
The file to be pulled
dest : _io.BytesIO
File-like object for writing to
progress_callback : function, None
Callback method that accepts ``filename``, ``bytes_written``, and ``total_bytes``
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
filesync_info : _FileSyncTransactionInfo
Data and storage for this FileSync transaction
"""
if progress_callback:
total_bytes = await self.stat(filename)[1]
progress = self._handle_progress(lambda current: progress_callback(filename, current, total_bytes))
next(progress)
await self._filesync_send(constants.RECV, adb_info, filesync_info, data=filename)
async for cmd_id, _, data in self._filesync_read_until([constants.DATA], [constants.DONE], adb_info, filesync_info):
if cmd_id == constants.DONE:
break
dest.write(data)
if progress_callback:
progress.send(len(data))
[docs] async def push(self, source_file, device_filename, st_mode=constants.DEFAULT_PUSH_MODE, mtime=0, progress_callback=None, timeout_s=None, total_timeout_s=constants.DEFAULT_TOTAL_TIMEOUT_S):
"""Push a file or directory to the device.
Parameters
----------
source_file : str
Either a filename, a directory or file-like object to push to the device.
device_filename : str
Destination on the device to write to.
st_mode : int
Stat mode for filename
mtime : int
Modification time to set on the file.
progress_callback : function, None
Callback method that accepts filename, bytes_written and total_bytes, total_bytes will be -1 for file-like
objects
timeout_s : float, None
Expected timeout for any part of the push.
total_timeout_s : float
The total time in seconds to wait for a ``b'CLSE'`` or ``b'OKAY'`` command in :meth:`AdbDevice._read`
"""
if not self.available:
raise exceptions.AdbConnectionError("ADB command not sent because a connection to the device has not been established. (Did you call `AdbDevice.connect()`?)")
if isinstance(source_file, str):
if os.path.isdir(source_file):
await self.shell("mkdir " + device_filename, timeout_s, total_timeout_s)
for f in os.listdir(source_file):
await self.push(os.path.join(source_file, f), device_filename + '/' + f, progress_callback=progress_callback)
return
adb_info = _AdbTransactionInfo(None, None, timeout_s, total_timeout_s)
filesync_info = _FileSyncTransactionInfo(constants.FILESYNC_PUSH_FORMAT)
with _open(source_file, 'rb') as source:
await self._open(b'sync:', adb_info)
await self._push(source, device_filename, st_mode, mtime, progress_callback, adb_info, filesync_info)
await self._close(adb_info)
[docs] async def _push(self, datafile, filename, st_mode, mtime, progress_callback, adb_info, filesync_info):
"""Push a file-like object to the device.
Parameters
----------
datafile : _io.BytesIO
File-like object for reading from
filename : str
Filename to push to
st_mode : int
Stat mode for filename
mtime : int
Modification time
progress_callback : function, None
Callback method that accepts ``filename``, ``bytes_written``, and ``total_bytes``
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
Raises
------
PushFailedError
Raised on push failure.
"""
fileinfo = ('{},{}'.format(filename, int(st_mode))).encode('utf-8')
await self._filesync_send(constants.SEND, adb_info, filesync_info, data=fileinfo)
if progress_callback:
total_bytes = os.fstat(datafile.fileno()).st_size if isinstance(datafile, FILE_TYPES) else -1
progress = self._handle_progress(lambda current: progress_callback(filename, current, total_bytes))
next(progress)
while True:
data = datafile.read(constants.MAX_PUSH_DATA)
if data:
await self._filesync_send(constants.DATA, adb_info, filesync_info, data=data)
if progress_callback:
progress.send(len(data))
else:
break
if mtime == 0:
mtime = int(time.time())
# DONE doesn't send data, but it hides the last bit of data in the size field.
await self._filesync_send(constants.DONE, adb_info, filesync_info, size=mtime)
async for cmd_id, _, data in self._filesync_read_until([], [constants.OKAY, constants.FAIL], adb_info, filesync_info):
if cmd_id == constants.OKAY:
return
raise exceptions.PushFailedError(data)
[docs] async def stat(self, device_filename, timeout_s=None, total_timeout_s=constants.DEFAULT_TOTAL_TIMEOUT_S):
"""Get a file's ``stat()`` information.
Parameters
----------
device_filename : str
The file on the device for which we will get information.
timeout_s : float, None
Expected timeout for any part of the pull.
total_timeout_s : float
The total time in seconds to wait for a ``b'CLSE'`` or ``b'OKAY'`` command in :meth:`AdbDevice._read`
Returns
-------
mode : int
The octal permissions for the file
size : int
The size of the file
mtime : int
The last modified time for the file
"""
if not self.available:
raise exceptions.AdbConnectionError("ADB command not sent because a connection to the device has not been established. (Did you call `AdbDevice.connect()`?)")
adb_info = _AdbTransactionInfo(None, None, timeout_s, total_timeout_s)
await self._open(b'sync:', adb_info)
filesync_info = _FileSyncTransactionInfo(constants.FILESYNC_STAT_FORMAT)
await self._filesync_send(constants.STAT, adb_info, filesync_info, data=device_filename)
_, (mode, size, mtime), _ = await self._filesync_read([constants.STAT], adb_info, filesync_info, read_data=False)
await self._close(adb_info)
return mode, size, mtime
# ======================================================================= #
# #
# Hidden Methods #
# #
# ======================================================================= #
[docs] async def _close(self, adb_info):
"""Send a ``b'CLSE'`` message.
.. warning::
This is not to be confused with the :meth:`AdbDevice.close` method!
Parameters
----------
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
"""
msg = AdbMessage(constants.CLSE, adb_info.local_id, adb_info.remote_id)
await self._send(msg, adb_info)
await self._read_until([constants.CLSE], adb_info)
[docs] async def _okay(self, adb_info):
"""Send an ``b'OKAY'`` mesage.
Parameters
----------
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
"""
msg = AdbMessage(constants.OKAY, adb_info.local_id, adb_info.remote_id)
await self._send(msg, adb_info)
[docs] async def _open(self, destination, adb_info):
"""Opens a new connection to the device via an ``b'OPEN'`` message.
1. :meth:`~AdbDevice._send` an ``b'OPEN'`` command to the device that specifies the ``local_id``
2. :meth:`~AdbDevice._read` a response from the device that includes a command, another local ID (``their_local_id``), and ``remote_id``
* If ``local_id`` and ``their_local_id`` do not match, raise an exception.
* If the received command is ``b'CLSE'``, :meth:`~AdbDevice._read` another response from the device
* If the received command is not ``b'OKAY'``, raise an exception
* Set the ``adb_info.local_id`` and ``adb_info.remote_id`` attributes
Parameters
----------
destination : bytes
``b'SERVICE:COMMAND'``
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
Raises
------
adb_shell.exceptions.InvalidResponseError
Wrong local_id sent to us.
"""
adb_info.local_id = 1
msg = AdbMessage(constants.OPEN, adb_info.local_id, 0, destination + b'\0')
await self._send(msg, adb_info)
_, adb_info.remote_id, their_local_id, _ = await self._read([constants.OKAY], adb_info)
if adb_info.local_id != their_local_id:
raise exceptions.InvalidResponseError('Expected the local_id to be {}, got {}'.format(adb_info.local_id, their_local_id))
[docs] async def _read(self, expected_cmds, adb_info):
"""Receive a response from the device.
1. Read a message from the device and unpack the ``cmd``, ``arg0``, ``arg1``, ``data_length``, and ``data_checksum`` fields
2. If ``cmd`` is not a recognized command in :const:`adb_shell.constants.WIRE_TO_ID`, raise an exception
3. If the time has exceeded ``total_timeout_s``, raise an exception
4. Read ``data_length`` bytes from the device
5. If the checksum of the read data does not match ``data_checksum``, raise an exception
6. Return ``command``, ``arg0``, ``arg1``, and ``bytes(data)``
Parameters
----------
expected_cmds : list[bytes]
We will read packets until we encounter one whose "command" field is in ``expected_cmds``
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
Returns
-------
command : bytes
The received command, which is in :const:`adb_shell.constants.WIRE_TO_ID` and must be in ``expected_cmds``
arg0 : int
TODO
arg1 : int
TODO
bytes
The data that was read
Raises
------
adb_shell.exceptions.InvalidCommandError
Unknown command *or* never got one of the expected responses.
adb_shell.exceptions.InvalidChecksumError
Received checksum does not match the expected checksum.
"""
start = time.time()
while True:
msg = await self._handle.bulk_read(constants.MESSAGE_SIZE, adb_info.timeout_s)
_LOGGER.debug("bulk_read(%d): %s", constants.MESSAGE_SIZE, repr(msg))
cmd, arg0, arg1, data_length, data_checksum = unpack(msg)
command = constants.WIRE_TO_ID.get(cmd)
if not command:
raise exceptions.InvalidCommandError('Unknown command: %x' % cmd, cmd, (arg0, arg1))
if command in expected_cmds:
break
if time.time() - start > adb_info.total_timeout_s:
raise exceptions.InvalidCommandError('Never got one of the expected responses (%s)' % expected_cmds, cmd, (adb_info.timeout_s, adb_info.total_timeout_s))
if data_length > 0:
data = bytearray()
while data_length > 0:
temp = await self._handle.bulk_read(data_length, adb_info.timeout_s)
_LOGGER.debug("bulk_read(%d): %s", data_length, repr(temp))
data += temp
data_length -= len(temp)
actual_checksum = checksum(data)
if actual_checksum != data_checksum:
raise exceptions.InvalidChecksumError('Received checksum {0} != {1}'.format(actual_checksum, data_checksum))
else:
data = bytearray()
return command, arg0, arg1, bytes(data)
[docs] async def _read_until(self, expected_cmds, adb_info):
"""Read a packet, acknowledging any write packets.
1. Read data via :meth:`AdbDevice._read`
2. If a ``b'WRTE'`` packet is received, send an ``b'OKAY'`` packet via :meth:`AdbDevice._okay`
3. Return the ``cmd`` and ``data`` that were read by :meth:`AdbDevice._read`
Parameters
----------
expected_cmds : list[bytes]
:meth:`AdbDevice._read` with look for a packet whose command is in ``expected_cmds``
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
Returns
-------
cmd : bytes
The command that was received by :meth:`AdbDevice._read`, which is in :const:`adb_shell.constants.WIRE_TO_ID` and must be in ``expected_cmds``
data : bytes
The data that was received by :meth:`AdbDevice._read`
Raises
------
adb_shell.exceptions.InterleavedDataError
We don't support multiple streams...
adb_shell.exceptions.InvalidResponseError
Incorrect remote id.
adb_shell.exceptions.InvalidCommandError
Never got one of the expected responses.
"""
start = time.time()
while True:
cmd, remote_id2, local_id2, data = await self._read(expected_cmds, adb_info)
if local_id2 not in (0, adb_info.local_id):
raise exceptions.InterleavedDataError("We don't support multiple streams...")
if remote_id2 in (0, adb_info.remote_id):
break
if time.time() - start > adb_info.total_timeout_s:
raise exceptions.InvalidCommandError('Never got one of the expected responses (%s)' % expected_cmds, cmd, (adb_info.timeout_s, adb_info.total_timeout_s))
# Ignore CLSE responses to previous commands
# https://github.com/JeffLIrion/adb_shell/pull/14
if cmd != constants.CLSE:
raise exceptions.InvalidResponseError('Incorrect remote id, expected {0} got {1}'.format(adb_info.remote_id, remote_id2))
# Acknowledge write packets
if cmd == constants.WRTE:
await self._okay(adb_info)
return cmd, data
[docs] async def _read_until_close(self, adb_info):
"""Yield packets until a ``b'CLSE'`` packet is received.
1. Read the ``cmd`` and ``data`` fields from a ``b'CLSE'`` or ``b'WRTE'`` packet via :meth:`AdbDevice._read_until`
2. If ``cmd`` is ``b'CLSE'``, then send a ``b'CLSE'`` message and stop
3. If ``cmd`` is not ``b'WRTE'``, raise an exception
* If ``cmd`` is ``b'FAIL'``, raise :class:`~adb_shell.exceptions.AdbCommandFailureException`
* Otherwise, raise :class:`~~adb_shell.exceptions.InvalidCommandError`
4. Yield ``data`` and repeat
Parameters
----------
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
Yields
------
data : bytes
The data that was read by :meth:`AdbDevice._read_until`
"""
while True:
cmd, data = await self._read_until([constants.CLSE, constants.WRTE], adb_info)
if cmd == constants.CLSE:
msg = AdbMessage(constants.CLSE, adb_info.local_id, adb_info.remote_id)
await self._send(msg, adb_info)
break
yield data
[docs] async def _send(self, msg, adb_info):
"""Send a message to the device.
1. Send the message header (:meth:`adb_shell.adb_message.AdbMessage.pack <AdbMessage.pack>`)
2. Send the message data
Parameters
----------
msg : AdbMessage
The data that will be sent
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
"""
_LOGGER.debug("bulk_write: %s", repr(msg.pack()))
await self._handle.bulk_write(msg.pack(), adb_info.timeout_s)
_LOGGER.debug("bulk_write: %s", repr(msg.data))
await self._handle.bulk_write(msg.data, adb_info.timeout_s)
[docs] async def _streaming_command(self, service, command, adb_info):
"""One complete set of USB packets for a single command.
1. :meth:`~AdbDevice._open` a new connection to the device, where the ``destination`` parameter is ``service:command``
2. Read the response data via :meth:`AdbDevice._read_until_close`
.. note::
All the data is held in memory, and thus large responses will be slow and can fill up memory.
Parameters
----------
service : bytes
The ADB service (e.g., ``b'shell'``, as used by :meth:`AdbDevice.shell`)
command : bytes
The service command
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
Yields
------
bytes
The responses from the service.
"""
await self._open(b'%s:%s' % (service, command), adb_info)
async for data in self._read_until_close(adb_info):
yield data
[docs] async def _write(self, data, adb_info):
"""Write a packet and expect an Ack.
Parameters
----------
data : bytes
The data that will be sent
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
"""
msg = AdbMessage(constants.WRTE, adb_info.local_id, adb_info.remote_id, data)
await self._send(msg, adb_info)
# Expect an ack in response.
await self._read_until([constants.OKAY], adb_info)
# ======================================================================= #
# #
# FileSync Hidden Methods #
# #
# ======================================================================= #
[docs] async def _filesync_flush(self, adb_info, filesync_info):
"""Write the data in the buffer up to ``filesync_info.send_idx``, then set ``filesync_info.send_idx`` to 0.
Parameters
----------
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
filesync_info : _FileSyncTransactionInfo
Data and storage for this FileSync transaction
"""
await self._write(filesync_info.send_buffer[:filesync_info.send_idx], adb_info)
filesync_info.send_idx = 0
[docs] async def _filesync_read(self, expected_ids, adb_info, filesync_info, read_data=True):
"""Read ADB messages and return FileSync packets.
Parameters
----------
expected_ids : tuple[bytes]
If the received header ID is not in ``expected_ids``, an exception will be raised
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
filesync_info : _FileSyncTransactionInfo
Data and storage for this FileSync transaction
read_data : bool
Whether to read the received data
Returns
-------
command_id : bytes
The received header ID
tuple
The contents of the header
data : bytearray, None
The received data, or ``None`` if ``read_data`` is False
Raises
------
adb_shell.exceptions.AdbCommandFailureException
Command failed
adb_shell.exceptions.InvalidResponseError
Received response was not in ``expected_ids``
"""
if filesync_info.send_idx:
await self._filesync_flush(adb_info, filesync_info)
# Read one filesync packet off the recv buffer.
header_data = await self._filesync_read_buffered(filesync_info.recv_message_size, adb_info, filesync_info)
header = struct.unpack(filesync_info.recv_message_format, header_data)
# Header is (ID, ...).
command_id = constants.FILESYNC_WIRE_TO_ID[header[0]]
if command_id not in expected_ids:
if command_id == constants.FAIL:
reason = ''
if filesync_info.recv_buffer:
reason = filesync_info.recv_buffer.decode('utf-8', errors='ignore')
raise exceptions.AdbCommandFailureException('Command failed: {}'.format(reason))
raise exceptions.InvalidResponseError('Expected one of %s, got %s' % (expected_ids, command_id))
if not read_data:
return command_id, header[1:], None
# Header is (ID, ..., size).
size = header[-1]
data = await self._filesync_read_buffered(size, adb_info, filesync_info)
return command_id, header[1:-1], data
[docs] async def _filesync_read_buffered(self, size, adb_info, filesync_info):
"""Read ``size`` bytes of data from ``self.recv_buffer``.
Parameters
----------
size : int
The amount of data to read
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
filesync_info : _FileSyncTransactionInfo
Data and storage for this FileSync transaction
Returns
-------
result : bytearray
The read data
"""
# Ensure recv buffer has enough data.
while len(filesync_info.recv_buffer) < size:
_, data = await self._read_until([constants.WRTE], adb_info)
filesync_info.recv_buffer += data
result = filesync_info.recv_buffer[:size]
filesync_info.recv_buffer = filesync_info.recv_buffer[size:]
return result
[docs] async def _filesync_read_until(self, expected_ids, finish_ids, adb_info, filesync_info):
"""Useful wrapper around :meth:`AdbDevice._filesync_read`.
Parameters
----------
expected_ids : tuple[bytes]
If the received header ID is not in ``expected_ids``, an exception will be raised
finish_ids : tuple[bytes]
We will read until we find a header ID that is in ``finish_ids``
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
filesync_info : _FileSyncTransactionInfo
Data and storage for this FileSync transaction
Yields
------
cmd_id : bytes
The received header ID
header : tuple
TODO
data : bytearray
The received data
"""
while True:
cmd_id, header, data = await self._filesync_read(expected_ids + finish_ids, adb_info, filesync_info)
yield cmd_id, header, data
# These lines are not reachable because whenever this method is called and `cmd_id` is in `finish_ids`, the code
# either breaks (`list` and `_pull`), returns (`_push`), or raises an exception (`_push`)
if cmd_id in finish_ids: # pragma: no cover
break
[docs] async def _filesync_send(self, command_id, adb_info, filesync_info, data=b'', size=0):
"""Send/buffer FileSync packets.
Packets are buffered and only flushed when this connection is read from. All
messages have a response from the device, so this will always get flushed.
Parameters
----------
command_id : bytes
Command to send.
adb_info : _AdbTransactionInfo
Info and settings for this ADB transaction
filesync_info : _FileSyncTransactionInfo
Data and storage for this FileSync transaction
data : str, bytes
Optional data to send, must set data or size.
size : int
Optionally override size from len(data).
"""
if data:
if not isinstance(data, bytes):
data = data.encode('utf8')
size = len(data)
if not filesync_info.can_add_to_send_buffer(len(data)):
await self._filesync_flush(adb_info, filesync_info)
buf = struct.pack(b'<2I', constants.FILESYNC_ID_TO_WIRE[command_id], size) + data
filesync_info.send_buffer[filesync_info.send_idx:filesync_info.send_idx + len(buf)] = buf
filesync_info.send_idx += len(buf)
[docs] @staticmethod
def _handle_progress(progress_callback):
"""Calls the callback with the current progress and total bytes written/received.
Parameters
----------
progress_callback : function
Callback method that accepts ``filename``, ``bytes_written``, and ``total_bytes``; total_bytes will be -1 for file-like
objects.
"""
current = 0
while True:
current += yield
try:
progress_callback(current)
except Exception: # pylint: disable=broad-except
continue
[docs]class AdbDeviceTcp(AdbDevice):
"""A class with methods for connecting to a device via TCP and executing ADB commands.
Parameters
----------
host : str
The address of the device; may be an IP address or a host name
port : int
The device port to which we are connecting (default is 5555)
default_timeout_s : float, None
Default timeout in seconds for TCP packets, or ``None``
banner : str, bytes, None
The hostname of the machine where the Python interpreter is currently running; if
it is not provided, it will be determined via ``socket.gethostname()``
Attributes
----------
_available : bool
Whether an ADB connection to the device has been established
_banner : bytearray, bytes
The hostname of the machine where the Python interpreter is currently running
_handle : TcpHandle
The handle that is used to connect to the device
"""
def __init__(self, host, port=5555, default_timeout_s=None, banner=None):
handle = TcpHandle(host, port, default_timeout_s)
super(AdbDeviceTcp, self).__init__(handle, banner)