Source code for parted.geom

# Copyright (c) 2023 Adolfo Gómez
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""

import typing
import logging

from . import _parted  # type: ignore

from . import exceptions, timer
from .util import ensure_obj, ensure_obj_or_default, make_destroyable, cache_on, OpenContext

from . import device

if typing.TYPE_CHECKING:
    import cffi

logger = logging.getLogger(__name__)


[docs]class Geometry: """This class represents a Geometry""" _geometry: typing.Any _destroyable: bool def __init__( self, geom_or_device: typing.Union['device.Device', 'cffi.FFI.CData', None] = None, start: int = 0, length: int = 0, ) -> None: """Creates a new Geometry object Args: geom_or_device (typing.Optional[typing.Union['device.Device', 'cffi.FFI.CData']], optional): A geometry or a device to create the geometry from. Defaults to None. start (int, optional): Start of the geometry. Defaults to 0. length (int, optional): Length of the geometry. Defaults to 0. """ """""" self._destroyable = False if isinstance(geom_or_device, device.Device): # If not a velid device, return an empty geometry if not geom_or_device: self._geometry = _parted.ffi.NULL return self._geometry = _parted.lib.ped_geometry_new(geom_or_device.obj, start, length) self._destroyable = True else: geom = typing.cast(typing.Any, _parted.ffi.NULL if not geom_or_device else geom_or_device) if geom and _parted.ffi.typeof(geom).cname == 'struct _PedGeometry': # If we get a PedGeometry, we need to copy it (may come from "partition" or "disk") self._geometry = _parted.lib.ped_geometry_new(geom.dev, geom.start, geom.length) self._destroyable = True else: self._geometry = geom def __del__(self) -> None: if self._geometry and self._destroyable: _parted.lib.ped_geometry_destroy(self._geometry) self._geometry = _parted.ffi.NULL def __bool__(self) -> bool: return bool(self._geometry) def __eq__(self, other: typing.Any) -> bool: if not isinstance(other, Geometry): return False if not self and not other: return True if not self or not other: return False return bool(_parted.lib.ped_geometry_test_equal(self._geometry, other._geometry)) @property def obj(self) -> 'cffi.FFI.CData': """Wrapped ``PedGeometry*`` object""" return self._geometry @property # type: ignore # mypy does not like property decorators @cache_on('_cached_device') @ensure_obj_or_default(lambda: device.Device()) def dev(self) -> 'device.Device': """Device of the geometry""" return device.Device(self._geometry.dev) @property # type: ignore # mypy does not like property decorators @ensure_obj_or_default(0) def start(self) -> int: """Start of the geometry""" return self._geometry.start @start.setter def start(self, value: int) -> None: """Sets the start of the geometry""" if not self._geometry: return # Nothing to do if _parted.lib.ped_geometry_set_start(self._geometry, value) == 0: raise exceptions.PartedException("Invalid start sector") @property # type: ignore # mypy does not like property decorators @ensure_obj_or_default(0) def length(self) -> int: """Length of the geometry""" return self._geometry.length @length.setter def length(self, value: int) -> None: """Sets the length of the geometry""" if not self._geometry: return # Nothing to do if _parted.lib.ped_geometry_set(self._geometry, self.start, value) == 0: raise exceptions.PartedException("Invalid length") @property # type: ignore # mypy does not like property decorators @ensure_obj_or_default(0) def end(self) -> int: """End of the geometry""" return self._geometry.end @end.setter def end(self, value: int) -> None: """Sets the end of the geometry""" if not self._geometry: return # Nothing to do if _parted.lib.ped_geometry_set_end(self._geometry, value) == 0: raise exceptions.PartedException("Invalid end sector") @ensure_obj def __contains__(self, other: typing.Any) -> bool: """Checks if a geometry is contained in this geometry Args: other (typing.Any): The geometry to check Raises: exceptions.PartedException: If the geometry is not valid Returns: bool: True if the geometry is contained in this geometry, False otherwise Note: This is a wrapper for ``ped_geometry_test_inside`` """ if not isinstance(other, (Geometry, int)): return False if isinstance(other, Geometry): return bool(_parted.lib.ped_geometry_test_inside(self._geometry, other._geometry)) elif isinstance(other, int): return bool(_parted.lib.ped_geometry_test_sector_inside(self._geometry, other)) raise exceptions.PartedException("Invalid type {}".format(type(other))) @ensure_obj def __xor__(self, other: 'Geometry') -> 'Geometry': return self.intersect(other)
[docs] @ensure_obj def intersect(self, other: 'Geometry') -> 'Geometry': """Returns the intersection of this geometry with another one Args: other (Geometry): The other geometry Returns: Geometry: The intersection geometry Note: This is a wrapper around ``ped_geometry_intersect``. """ return make_destroyable(Geometry(_parted.lib.ped_geometry_intersect(self._geometry, other._geometry)))
[docs] @ensure_obj def overlap(self, other: 'Geometry') -> bool: """Returns True if this geometry overlaps with another one Args: other (Geometry): The other geometry Returns: bool: True if the geometries overlap Note: This is a wrapper around the ``ped_geometry_test_overlap`` function. """ return bool(_parted.lib.ped_geometry_test_overlap(self._geometry, other._geometry))
[docs] @ensure_obj def duplicate(self) -> 'Geometry': """Returns a copy of this geometry Returns: Geometry: The copy Note: This is a wrapper around the ``ped_geometry_duplicate`` function. """ return make_destroyable(Geometry(_parted.lib.ped_geometry_duplicate(self._geometry)))
[docs] @ensure_obj def map(self, other: 'Geometry', sector: int) -> int: """ This function takes a sector inside the region described by src, and returns that sector's address inside dst. The two geometries must be on the same device and, must overlap. Args: other: The geometry to map to sector: The sector to map Returns: The mapped sector or -1 if error """ return _parted.lib.ped_geometry_map(other._geometry, self._geometry, sector)
[docs] @ensure_obj def check( self, sector_offset: int, granularity: int = 1, # Sectors to group on error count: int = 1, buffer_size: int = 1024, # Buffer size, in sectors tmr: typing.Optional['timer.Timer'] = None, ) -> int: """Checks for physical disk errors. granularity specificies how sectors should be grouped together. The first bad sector to be returned will always be in the form: * offset + n * granularity return the first bad sector, or 0 if there were no physical errors Args: sector_offset (int): The sector offset to start checking granularity (int, optional): The granularity of how to group sectors. Defaults to 1. buffer_size (int, optional): The buffer size to use in sectors. Defaults to 1024. insectorstmr (typing.Optional["timer.Timer"], optional): The timer to use. Defaults to None. Returns: The first bad sector, or 0 if there were no physical errors Raises: exceptions.InvalidObjectError: If "self" geometry is invalid exceptions.NotOpenedError: If device is not opened exceptions.PartedException: If error Note: This is a wrapper around the ``ped_geometry_check`` function. """ self.dev.wants_access() tmr = tmr or timer.Timer() buffer = _parted.ffi.new('char[]', buffer_size * self.dev.sector_size) # 32K buffer return _parted.lib.ped_geometry_check( self._geometry, buffer, buffer_size, sector_offset, granularity, count, tmr.obj )
[docs] @ensure_obj def set(self, start: int = -1, length: int = -1) -> None: """Sets the geometry Args: start (int): The start sector length (int): The length in sectors Raises: exceptions.InvalidDeviceError: If the device is invalid exceptions.PartedException: If any other error """ if not self.dev: raise exceptions.InvalidDeviceError("Invalid device") if start == -1: start = self.start if length == -1: length = self.length # any other start will fail if start <= 0: raise exceptions.PartedException("Invalid start sector") if length <= 0: raise exceptions.PartedException("Invalid length") if _parted.lib.ped_geometry_set(self._geometry, start, length) == 0: raise exceptions.PartedException("Invalid geometry")
[docs] @ensure_obj def sync(self) -> None: """Flushes the cache on geom. From the parted documentation: This function flushes all write-behind caches that might be holding writes made by ped_geometry_write() to geom. It is slow, because it guarantees cache coherency among all relevant caches. Raises: exceptions.NotOpenedError: If the device is not opened exceptions.ReadOnlyError: If the device is read-only exceptions.PartedException: If any other error Note: This is a wrapper around the ``ped_geometry_sync`` function. """ self.dev.wants_access(for_writing=True) if _parted.lib.ped_geometry_sync(self._geometry) == 0: raise exceptions.PartedException("Failed to sync geometry")
[docs] @ensure_obj def sync_fast(self) -> None: """Flushes the cache on geom. ("Fast" version) From the parted documentation: This function flushes all write-behind caches that might be holding writes made by ped_geometry_write() to geom. It does NOT ensure cache coherency with other caches that cache data in the region described by geom. If you need cache coherency, use sync() instead. Raises: exceptions.NotOpenedError: _description_ exceptions.ReadOnlyError: _description_ exceptions.IOError: _description_ Note: This is a wrapper around the ``ped_geometry_sync_fast`` function. """ self.dev.wants_access(for_writing=True) if _parted.lib.ped_geometry_sync_fast(self._geometry) == 0: raise exceptions.IOError("Failed to sync geometry")
[docs] @ensure_obj def open(self) -> 'OpenContext': """Opens to read/write the geometry Returns: OpenContext: The context """ self.dev.open() return OpenContext(self)
[docs] @ensure_obj def close(self) -> None: """Closes the stream Raises: exceptions.NotOpenedError: If the device is not opened """ self.dev.close()
[docs] @ensure_obj def read(self, sector_offset: int, sector_count: int = 1) -> bytes: """Reads data from the geometry Args: sector_offset (int): The sector offset to start reading sector_count (int, optional): The number of sectors to read. Defaults to 1. Raises: exceptions.NotOpenedError: if the device is not opened exceptions.PartedException: if any other error Returns: bytes: _description_ Note: This is a wrapper around the ``ped_geometry_read`` function. """ self.dev.wants_access() if sector_offset < 0: raise exceptions.PartedException("Invalid sector offset") if sector_count < 0: raise exceptions.PartedException("Invalid sector count") buffer = _parted.ffi.new('char[]', sector_count * self.dev.sector_size) if _parted.lib.ped_geometry_read(self._geometry, buffer, sector_offset, sector_count) == 0: raise exceptions.PartedException("Failed to read geometry") return _parted.ffi.buffer(buffer)[:] # a copy of the buffer
[docs] @ensure_obj def write(self, data: bytes, sector_offset: int) -> None: """Writes data to the geometry Args: data (bytes): The data to write sector_offset (int): The sector offset to start writing (from the beginning of the geometry) Raises: exceptions.PartedException: If any error """ self.dev.wants_access(for_writing=True) if sector_offset < 0: raise exceptions.PartedException("Invalid sector offset") dev = self.dev sector_count = (len(data) + dev.sector_size - 1) // dev.sector_size buf = data[:] # If buf size is less than sector_count * sector_size, do a read-modify-write if len(buf) < sector_count * dev.sector_size: # Read last sector buffer = self.read(sector_offset + sector_count - 1) # add data to buffer buf += buffer[len(buf) % dev.sector_size:] _parted.lib.ped_geometry_write(self._geometry, buf, sector_offset, sector_count)
[docs] @staticmethod def new(device: 'device.Device', start: int, length: int) -> 'Geometry': """ Create a new PedGeometry object on disk, starting at start with a size of length sectors. Args: device (device.Device): The device to create the geometry on start (int): The start sector length (int): The length in sectors Returns: Geometry: The new geometry object """ return make_destroyable(Geometry(_parted.lib.ped_geometry_new(device._device, start, length)))
def __str__(self) -> str: return f'Geometry(start={self.start}, end={self.end}, length={self.length})' def __repr__(self) -> str: return self.__str__()