# Copyright (c) 2023 Adolfo Gómez
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
This module contains Device and Device related classes
:author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import logging
import enum
from . import _parted # type: ignore
from . import exceptions, disk as mdisk
from .util import ensure_obj, ensure_root, ensure_obj_or_default, make_destroyable, OpenContext
if typing.TYPE_CHECKING:
import cffi
from . import geom
from . import timer
from . import disk
logger = logging.getLogger(__name__)
[docs]class DeviceType(enum.IntEnum):
"""Device types"""
UNKNOWN = 0
SCSI = 1
IDE = 2
DAC960 = 3
CPQARRAY = 4
FILE = 5
ATARAID = 6
I2O = 7
UBD = 8
DASD = 9
VIODASD = 10
SX8 = 11
DM = 12
XVD = 13
SDMMC = 14
VIRTBLK = 15
AOE = 16
MD = 17
LOOP = 18
NVME = 19
RAM = 20
PMEM = 21
[docs] @staticmethod
def from_string(s: str) -> 'DeviceType':
"""Returns a DeviceType from a string
Args:
s (str): String to convert
Returns:
DeviceType: DeviceType from string
Raises:
ValueError: If string is not a valid DeviceType
"""
return DeviceType.__members__[s.upper()]
def __str__(self) -> str:
return f"DeviceType.{self.name}"
def __repr__(self) -> str:
return self.__str__()
[docs]class CHSGeometry:
"""Cylinders, heads and sectors geometry of a device"""
cylinders: int
heads: int
sectors: int
def __init__(self, cylinders: int, heads: int, sectors: int):
"""Creates a new CHSGeometry object
Args:
cylinders (int): Number of cylinders
heads (int): Number of heads
sectors (int): Number of sectors
"""
self.cylinders = cylinders
self.heads = heads
self.sectors = sectors
@property
def total_sectors(self) -> int:
"""Total number of sectors in the device"""
return self.cylinders * self.heads * self.sectors
[docs] @staticmethod
def from_partitiontable_msdos(data: bytes) -> 'CHSGeometry':
"""
Creates a CHSGeometry from a partition table msdos data
"""
return CHSGeometry(cylinders=data[0], heads=data[1] & 0x3F, sectors=((data[1] & 0xC0) << 2) | data[2])
def __str__(self) -> str:
return f"CHSGeometry(cylinders={self.cylinders}, heads={self.heads}, sectors={self.sectors})"
def __repr__(self) -> str:
return str(self)
[docs]class Device:
"""Wrapper for PedDevice*
This class is a wrapper for the PedDevice* type, and provides a more pythonic interface to it.
"""
_device: typing.Any # PedDevice*
def __init__(self, device: typing.Union[str, 'cffi.FFI.CData', 'Device', None] = None) -> None:
"""Creates a new Device object, if device is a string, it will be used as the path and create a new device
Args:
device (typing.Union[str, 'cffi.FFI.CData'], optional): Device path or PedDevice*. Defaults to None.
"""
if isinstance(device, str):
self._device = _parted.lib.ped_device_get(device.encode())
elif isinstance(device, Device):
self._device = device._device
else:
self._device = device if device else _parted.ffi.NULL
def __bool__(self) -> bool:
"""Returns True if the device is valid, False otherwise
Returns:
bool: True if the device is valid, False otherwise
"""
return bool(self._device)
def __eq__(self, other: typing.Any) -> bool:
"""Returns True if the other object is a Device and has the same underlying PedDevice* or it other is a string and
the path of the device is the same as the other string
Args:
other (typing.Any): Other object to compare (must be a Device or a string)
Returns:
bool: True if the other object is a Device and has the same underlying PedDevice* or it other is a string and
the path of the device is the same as the other string
"""
if isinstance(other, Device):
return self._device == other._device
elif isinstance(other, str):
return self.path == other
return False
@property
def obj(self) -> 'cffi.FFI.CData':
"""Wrapped ``PedDevice*`` object"""
return self._device
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default('Unknown')
def model(self) -> str:
"""device model(e.g. ""ATA ST4000VN008-2DR1", "Linux device-mapper (linear)", "QEMU HARDDISK", "Unknown")"""
return _parted.ffi.string(self._device.model).decode()
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default('')
def path(self) -> str:
"""device path (e.g. /dev/sda, file.img, etc)"""
return _parted.ffi.string(self._device.path).decode()
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(DeviceType.UNKNOWN)
def type(self) -> DeviceType:
"""Device type"""
return DeviceType(self._device.type)
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(0)
def sector_size(self) -> int:
"""The device sector size (e.g. 512, 4096, etc)"""
return self._device.sector_size
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(0)
def phys_sector_size(self) -> int:
"""The device physical sector size (e.g. 512, 4096, etc)"""
return self._device.phys_sector_size
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(0)
def length(self) -> int:
"""Device length in sectors"""
return self._device.length
@property
def size(self) -> int:
"""Device size in bytes"""
return self.sector_size * self.length
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(0)
def open_count(self) -> int:
"""number of times the device has been opened"""
return self._device.open_count
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(True)
def read_only(self) -> bool:
"""if the device is read only"""
return bool(self._device.read_only)
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(False)
def external_mode(self) -> bool:
"""if the device is in external access mode"""
return bool(self._device.external_mode)
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(False)
def dirty(self) -> bool:
"""if the device is dirty"""
return bool(self._device.dirty)
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(False)
def boot_dirty(self) -> bool:
"""Returns:
bool: True if the device is set to boot dirty
"""
return bool(self._device.boot_dirty)
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(lambda: CHSGeometry(0, 0, 0))
def hw_geom(self) -> CHSGeometry:
"""Returns:
CHSGeometry: The device hardware geometry
"""
return CHSGeometry(
self._device.hw_geom.cylinders,
self._device.hw_geom.heads,
self._device.hw_geom.sectors,
)
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(lambda: CHSGeometry(0, 0, 0))
def bios_geom(self) -> CHSGeometry:
"""Returns:
CHSGeometry: The device BIOS geometry
"""
return CHSGeometry(
self._device.bios_geom.cylinders,
self._device.bios_geom.heads,
self._device.bios_geom.sectors,
)
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(0)
def host(self) -> int:
"""The host number of the device"""
return self._device.host
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(0)
def did(self) -> int:
"""The device id of the device"""
return self._device.did
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(False)
def is_busy(self) -> bool:
"""If the device
Note:
This is a wrapper around the ``ped_device_is_busy`` function
"""
return bool(_parted.lib.ped_device_is_busy(self._device))
@property # type: ignore # (mypy does not recognizes properties and decorators)
@ensure_obj_or_default(False)
def is_opened(self) -> bool:
"""Returns True if the device is opened
Returns:
bool: True if the device is opened
"""
return bool(self._device.open_count)
[docs] @ensure_obj
def wants_access(self, for_writing: bool = False) -> None:
"""Raises an exception if the device cannot be accesed
The device is "accesible" if it's opened and not is in external access mode.
if "for_writing" is True, the device must be opened not read only.
Args:
for_writing (bool, optional): If the device is opened for writing. Defaults to False.
Raises:
exceptions.InvalidDeviceError: If the device is not accesible
"""
if not self.is_opened:
raise exceptions.NotOpenedError(
"Device is not opened", self.path
)
if self.external_mode != 0:
raise exceptions.InvalidDeviceError(
"Device is in external access mode", self.path
)
if for_writing and self.read_only:
raise exceptions.ReadOnlyError(
"Device is opened read only", self.path
)
[docs] def next(self) -> 'Device':
"""Returns:
Device: The next device in the list, if any, or a Device with _device set to NULL
Note:
This is a wrapper around the ``ped_device_get_next`` function
"""
if not self._device:
return Device()
return Device(_parted.lib.ped_device_get_next(self._device))
[docs] @ensure_obj
def open(self, mode: typing.Literal['r', 'rw'] = 'rw') -> 'OpenContext':
"""Opens the device, allowing read/write access
Args:
for_writing (bool): True if the device should be opened for writing (default: True)
Raises:
exceptions.InvalidObjectError: If the wrapped device is not valid
exceptions.IOError: If the device cannot be opened
Note:
This is a wrapper around the ``ped_device_open`` function
"""
if _parted.lib.ped_device_open(self._device) == 0: # pragma: no cover
raise exceptions.IOError("Failed to open device")
self._device.read_only = mode == 'r'
logger.debug('Opened device %s', self.path)
return OpenContext(self)
[docs] @ensure_obj
def close(self) -> None:
"""Closes the device, disallowing read/write access.
Raises:
exceptions.InvalidObjectError: If the wrapped device is not valid
exceptions.IOError: If the device cannot be closed
Note:
This is a wrapper around the ``ped_device_close`` function
"""
if not self.is_opened:
return
if _parted.lib.ped_device_close(self._device) == 0: # pragma: no cover
raise exceptions.IOError("Failed to close device")
logger.debug('Closed device %s', self.path)
return
[docs] @ensure_obj
def read(self, sector_start: int, sector_count: int) -> bytes:
"""Reads the specified number of sectors from the device, starting at the specified sector
Args:
sector_start (int): The sector to start reading from
sector_count (int): The number of sectors to read
Returns:
bytes: The data read from the device
Raises:
exceptions.InvalidObjectError: If the wrapped device is not valid
exceptions.IOError: If the read fails
Note:
This is a wrapper around the ``ped_device_read`` function
"""
self.wants_access()
if sector_start < 0:
sector_start = self.length + sector_start # from end
buffer = _parted.ffi.new("char[]", sector_count * self.sector_size)
if _parted.lib.ped_device_read(self._device, buffer, sector_start, sector_count) == 0:
raise exceptions.IOError("Failed to read from device")
return _parted.ffi.buffer(buffer)[:]
[docs] @ensure_obj
def write(self, buffer: bytes, sector_start: int, sector_count: int) -> None:
"""Write buffer to device.
Args:
buffer (bytes): Buffer to write to device.
sector_start (int): Sector to start writing at.
sector_count (int): Number of sectors to write.
Raises:
exceptions.InvalidObjectError: If the wrapped device is not valid
exceptions.IOError: If the write fails
exceptions.NotOpenedError: If the device is not opened
Note:
This is a wrapper around the ``ped_device_write`` function
"""
self.wants_access(for_writing=True)
if sector_start < 0:
sector_start = self.length + sector_start # from end
# If buffer size is less than sector_count * sector_size, pad with zeros
buf = buffer
if len(buf) < sector_count * self.sector_size:
buf += b"\0" * (sector_count * self.sector_size - len(buf))
if _parted.lib.ped_device_write(self._device, buf, sector_start, sector_count) == 0:
raise exceptions.IOError("Failed to write to device")
[docs] @ensure_obj
def copy(
self, source: 'geom.Geometry', to: int, tm: typing.Optional['timer.Timer'] = None, sectors_block: int = 32
) -> None:
"""Copies the data from one region to another
Args:
from (geom.Geometry): The geometry to copy from
to (int): The sector to start copying to
Raises:
exceptions.InvalidObjectError: If the wrapped device is not valid
exceptions.NotOpenedError: If the device is not opened
exceptions.IOError: If the copy fails
Important:
The device needs to be opened before calling this function.
"""
if source.start == to: # same region
return
self.wants_access(for_writing=True)
# Determine direction of the copy (from start to end or end to start)
# so we don't overwrite data we're copying
def do_copy(sector_num: int, size: int):
if tm:
tm.update(float(sector_num) / source.length)
self.write(
self.read(source.start + sector_num, size),
to + sector_num,
size,
)
block_count = source.length // sectors_block # for copying block_size sectors per block
block_rest = source.length % sectors_block # for copying the rest of sectors
if source.start > to:
range_block = range(block_count)
else:
range_block = range(block_count - 1, -1, -1)
for blk in range_block:
do_copy(blk * sectors_block, sectors_block)
# Last block
if block_rest:
do_copy(block_count * sectors_block, block_rest)
[docs] @ensure_obj
def sync(self) -> None:
"""Flushes the device's write cache
Raises:
exceptions.InvalidObjectError: If the wrapped device is not valid
exceptions.NotOpenedError: If the device is not opened
exceptions.IOError: If the sync fails
Important:
The device needs to be opened before calling this function.
Note:
This is a wrapper around the ``ped_device_sync`` function
"""
self.wants_access(for_writing=True)
if _parted.lib.ped_device_sync(self._device) == 0: # pragma: no cover
raise exceptions.IOError("Failed to sync device")
[docs] @ensure_obj
def sync_fast(self) -> None:
"""Flushes the device's write cache
Raises:
exceptions.InvalidObjectError: If the wrapped device is not valid
Important:
The device needs to be opened before calling this function
Note:
This is a wrapper around the ``ped_device_sync_fast`` function
"""
self.wants_access(for_writing=True)
if _parted.lib.ped_device_sync_fast(self._device) == 0: # pragma: no cover
raise exceptions.IOError("Failed to fast sync device")
[docs] @ensure_obj
def begin_external_access(self, read_only: bool = False) -> None: # pragma: no cover
"""Begins external access to the device
Invokes the ``ped_device_begin_external_access`` function
Args:
read_only (bool): Whether the device is opened read-only
Raises:
exceptions.InvalidObjectError: If the wrapped device is not valid
exceptions.PartedException: If the access could not be granted
Note:
This is a wrapper around the ``ped_device_begin_external_access`` function
"""
if _parted.lib.ped_device_begin_external_access(self._device, read_only) == 0:
raise exceptions.PartedException("Failed to begin external access")
[docs] @ensure_obj
def end_external_access(self) -> None:
"""Ends external access to the device
Invokes the ``ped_device_end_external_access`` function
Raises:
exceptions.InvalidObjectError: If the wrapped device is not valid
exceptions.PartedException: If the access could not be ended
Note:
This is a wrapper around the ``ped_device_end_external_access`` function
"""
if _parted.lib.ped_device_end_external_access(self._device) == 0: # pragma: no cover
raise exceptions.PartedException("Failed to end external access")
[docs] @ensure_obj
def probe(self) -> 'mdisk.DiskType':
"""Get the partition table type of the device.
Args:
device (device.Device): The device to probe
Returns:
DiskType: The disk type
Note:
This is a wrapper around the ``ped_disk_probe`` function
"""
return mdisk.DiskType(_parted.lib.ped_disk_probe(self._device))
[docs] @ensure_obj
def clobber(self) -> None:
"""Clobber the partition table on the device. Does not need to be opened before.
As a protection, the device must be opened for writing before calling this function.
Args:
device (device.Device): The device to clobber
Returns:
bool: True on success, False on error
Warning:
Handle with care! will destroy all the partions on the device!!.
Data will be written to the device as soon as this function is called.
Note:
This is a wrapper around the ``ped_disk_clobber`` function
"""
self.wants_access(for_writing=True)
if _parted.lib.ped_disk_clobber(self._device) == 0:
raise exceptions.PartedException("Failed to clobber device")
[docs] @ensure_obj
def read_table(self) -> 'mdisk.Disk':
"""Reads the partition table from the device and returns a new Disk object.
Args:
dev (device.Device): The device to read the partition table from.
Returns:
Disk: A new Disk object.
Warning:
The device object cilinders, heads and sectors can be modified by this call
if the partition table indicates that the existing values are incorrect.
Note:
This is a wrapper around the ``ped_disk_new`` function
"""
return mdisk.Disk(self)
[docs] @ensure_obj
def new_table(self, type: typing.Union[str, 'mdisk.DiskType', 'mdisk.DiskType.WNT']) -> 'mdisk.Disk':
"""
Creates a new partition table on the disk.
Args:
dev (device.Device): The device to create the partition table on.
type (DiskType): The type of partition table to create. (e.g. 'gpt', 'msdos', 'bsd', ...)
Returns:
Partition: The new partition.
Important:
The new table is created "in memory" only. You must call the commit_to_dev() method to write it to the device.
Note:
This is a wrapper around the ``ped_disk_new_fresh`` function
Raises:
exceptions.InvalidDeviceError: if the device is invalid
"""
type = mdisk.DiskType(type) if not isinstance(type, mdisk.DiskType) else type
if not self:
raise exceptions.InvalidDeviceError('Invalid device: {}'.format(self))
dsk = make_destroyable(mdisk.Disk(_parted.lib.ped_disk_new_fresh(self._device, type.obj)))
return dsk
[docs] @staticmethod
def get(name: str) -> 'Device':
"""Get device by name.
Args:
name (str): Name of device. (i.e. filename, '/dev/sda', etc.)
Returns:
Device: Device object.
Note:
This is a wrapper around the ``ped_device_get`` function
"""
return Device(_parted.lib.ped_device_get(name.encode()))
[docs] @staticmethod
@ensure_root
def enumerate() -> typing.Iterator['Device']:
"""Enumerate all devices.
Yields:
Device: Device object.
Raises:
exceptions.PartedException: If the device could not be enumerated (e.g. permission denied)
Note:
This is a wrapper around the ``ped_device_get_next`` function
"""
device = _parted.lib.ped_device_get_next(_parted.ffi.NULL)
while device:
yield Device(device)
device = _parted.lib.ped_device_get_next(device)
[docs] @staticmethod
@ensure_root
def probe_all() -> None:
"""Probe all devices. (needs to be root)
Raises:
exceptions.PartedException: If the device could not be probed (e.g. permission denied)
Note:
This is a wrapper around the ``ped_device_probe_all`` function
"""
_parted.lib.ped_device_probe_all()
[docs] @staticmethod
def free_all() -> None:
"""Clear all devices from memory
Note:
This is a wrapper around the ``ped_device_free_all`` function
"""
_parted.lib.ped_device_free_all()
def __str__(self) -> str:
return (
f'Device: path: {self.path}, type: {self.type.name}, model: '
f'"{self.model}", length: {self.length}, '
f'size: {self.sector_size*self.hw_geom.sectors*self.hw_geom.heads*self.hw_geom.cylinders}'
)
def __repr__(self) -> str:
return self.__str__()