Source code for sugar.core.cane

# (C) 2024, Tom Eulenfeld, MIT license
"""
Several core helper classes and functions, such as `~.cane.translate()`, `~.cane.match()`, and `~.cane.find_orfs()`
"""

import collections
import warnings

from sugar.data import gcode
from sugar.core.fts import FeatureList


def _keyfuncs(objs, keys, attr=None):
    from collections.abc import Iterable
    if isinstance(keys, str):
        keys = keys.split()
    if not isinstance(keys, Iterable):
        keys = [keys]
    keyfuncs = [
        (lambda ft, key=key: (getattr(ft, key, None) if attr is None else
                              getattr(getattr(ft, attr), key, None))) if isinstance(key, str) else
        key
        for key in keys
        ]
    return keyfuncs


def _flatten(d, pk=()):
    d2 = {}
    for k, v in d.items():
        k = pk + (k,)
        if isinstance(v, dict):
            d2.update(_flatten(v, k).items())
        else:
            d2[k] = v
    return d2


def _groupby(objs, keys, attr=None, flatten=False):
    """
    Group objects, used by several objects in sugar.core

    :param keys: Tuple of keys or functions to use for grouping.
        May also be a single string or callable.
    :param attr: Attribute where to look for keys
    :param flatten: Return a flattened instead of a nested dict
    :return: Nested dict structure
    """
    if keys is None:
        return {None: objs}
    keyfuncs = _keyfuncs(objs, keys, attr=attr)
    d = {}
    cls = objs.__class__
    for obj in objs:
        d2 = d
        for keyfunc in keyfuncs[:-1]:
            d2 = d2.setdefault(keyfunc(obj), {})
        d2.setdefault(keyfuncs[-1](obj), cls()).append(obj)
    if flatten:
        d = _flatten(d)
    return d


def _sorted(objs, keys=None, reverse=False, attr=None):
    """
    Sort objects, used by several objects in sugar.core

    :param keys: Tuple of keys or functions to use for sorting.
        None can be used as a single value or in the tuple
        to apply the default sorting.
        May also be a single string or callable.
    :param reverse: Use reversed order (default: False)
    :param attr: Attribute where to look for keys

    :return: Sorted objects
    """
    keyfuncs = _keyfuncs(objs, keys, attr=attr)
    for keyfunc in keyfuncs[::-1]:
        objs = sorted(objs, key=keyfunc, reverse=reverse)
    return objs


def _getattr_warn(obj, key, default=None):
    """
    Like getattr with default value, but warns if obj does not have the attribute
    """
    if not hasattr(obj, key):
        warnings.warn(f'Attribute "{key}" not present in {obj.__class__.__name__} object')
    return getattr(obj, key, default)



def _select(objs, attr='meta', **kwargs):
    r"""
    Select objects, used by several objects in sugar.core

    :param attr: Attribute where to look for keys
    :param \*\*kw: All kwargs need to be of the form
        ``key_op=value``, where op is one of
        the operators from the `python:operator` module.
        Additionally, the operators
        ``'in'`` (membership),
        ``'lowerin'`` (membership of lower case word),
        ``'lowereq'`` (equality of lower case word),
        ``'max'`` (alias for le),
        ``'min'`` (alias for ge)
        are supported.
        The different selection criteria are combined with
        the *and* operator.
    :return: Selected objects
    """
    import operator
    ops = {'max': operator.le,
           'min': operator.ge,
           'in': lambda a, b: a in b,
           }
    allowed_funcs = {'len': len}
    for kw, value in kwargs.items():
        if '_' in kw:
            key, kop = kw.rsplit('_', 1)
        else:
            key, kop = kw, 'eq'
        applynot = kop.startswith('not')
        kop = kop.removeprefix('not')
        applylower = kop.startswith('lower')
        kop = kop.removeprefix('lower')
        op = ops.get(kop)
        if op is None:
            op = getattr(operator, kop)
        getv = ((lambda obj: allowed_funcs[key](obj)) if key in allowed_funcs else
                (lambda obj: _getattr_warn(obj, key, None)) if attr is None else
                (lambda obj: _getattr_warn(getattr(obj, attr), key, None)))
        match (applynot, applylower):
            case (False, False): filt = lambda obj: op(getv(obj), value)
            case (True, False): filt = lambda obj: not op(getv(obj), value)
            case (False, True): filt = lambda obj: op(getv(obj).lower(), value)
            case (True, True): filt = lambda obj: not op(getv(obj).lower(), value)
        objs = [obj for obj in objs if filt(obj)]
    return objs


[docs] class BioMatch(object): """ The BioMatch object is returned by `~.cane.match()` and the different match methods. It is designed to behave like the original `python:re.Match` object. See there for available methods. It also has the `BioMatch.rf` attribute, which holds the reading frame (between -3 and 2, inclusive) of the match. .. rubric:: Example: >>> match = read()[0].match('AT.') >>> match <sugar.BioMatch object; seqid=AB047639; rf=2; span=(11, 14); match=ATA> >>> print(match.group(), match.rf) ATA 2 """ def __init__(self, match, rf=None, seqlen=None, seqid=None): #: original Match object self._match = match #: reading frame (-3 to 2) self.rf = rf self.seqlen = seqlen #: sequence id self.seqid = seqid def __getattr__(self, attr): return getattr(self._match, attr) def __repr__(self): return f'<sugar.BioMatch object; match={self.group()}; span={self.span()}; rf={self.rf}; seqid={self.seqid}>'
[docs] def start(self): if self.rf is not None and self.rf < 0: return self.seqlen - self._match.end() else: return self._match.start()
[docs] def end(self): if self.rf is not None and self.rf < 0: return self.seqlen - self._match.start() else: return self._match.end()
[docs] def span(self): return self.start(), self.end()
[docs] class BioMatchList(collections.UserList): """ List of `BioMatch` objects """
[docs] def groupby(self, keys='rf'): """ Group matches :param keys: Tuple of meta keys or functions to use for grouping. Can also be a single string or a callable. By default the method groups by rf. :return: Nested dict structure """ return _groupby(self, keys)
[docs] def select(self, **kw): """ Select matches :param keys: Tuple of meta keys or functions to use for grouping. Can also be a single string or a callable. By default the method groups by seqid only. :return: Nested dict structure """ return _select(self, attr=None, **kw)
@property def d(self): """ Group matches by seqid, alias for ``BioMatchList.groupby('seqid')`` """ return self.groupby('seqid')
[docs] def tostr(self): lines = [f'{len(self)} BioMatches'] for m in self: lines.append(f'match={m.group()} span={m.span()} rf={m.rf} seqid={m.seqid}') return '\n'.join(lines)
def __str__(self): return self.tostr()
[docs] def match(seq, sub, *, rf='fwd', start=0, gap='-', matchall=False): """ Return `BioMatch` object for first found match of regex sub, None if not found. Args: sub (str): regex or ``'start'`` or ``'stop'`` to find start/stop codon, please specify different codons like rf (int): Can be set to an integer between -3 and 2 inclusive to respect the corresponding reading frame. Rfs 0 to 2 are on the forward strand, rfs -3 to -1 are on the backward strand, You can also specify a set or tuple of reading frames. Additionally you can use one of ('fwd', 'bwd', 'all') to select all reading frames on the specified strands. Defaults to ``'fwd'`` -- all three reading frames on the forward strand. You may set rf to ``None`` to ignore reading frames (i.e. for aa seqs) start (int): Index of the nucleobase to start matching. Defaults to 0. gap (str): Consider gaps of given character, Defaults to '-'. The character is inserted between each two letters of the regex. Be careful, this approach does not work for arbitrary regexes. matchall (bool): False will return first match of type `BioMatch`, True will return all matches in a `BioMatchList`. Defaults to False. Returns: match (`BioMatch` or `BioMatchList` of matches or None), the list will be sorted by match position, matches on the forward strand first, then matches on the backward strand. """ from bisect import bisect import re from sugar.core.seq import BioSeq if isinstance(rf, int): rf = (rf,) elif isinstance(rf, str): assert rf in ('fwd', 'bwd', 'all', 'both') if rf == 'fwd': rf = (0, 1, 2) elif rf == 'bwd': rf = (-1, -2, -3) else: rf = (0, 1, 2, -1, -2, -3) if isinstance(sub, BioSeq): sub = sub.data if sub == 'start': sub = 'AUG|ATG' # if gap is not None: # sub = f'(A[{gap}]*U[{gap}]*G|A[{gap}]*T[{gap}]*G)' elif sub == 'stop': sub = 'UAG|UAA|UGA|TAG|TAA|TGA' if gap is not None: gapstr = f'[{gap}]*' sub = ''.join(ch + (gapstr if ((ch.isalpha() or ch=='.') and i+1 < len(sub) and (sub[i+1].isalpha() or sub[i+1] == '.')) else '') for i, ch in enumerate(sub)) if gap is None or rf is None: gaps = None else: gaps = [i for i, nt in enumerate(str(seq)) if nt in gap if i >= start] matches = [] if rf is not None: fwd_rfs = set(rf) & {0, 1, 2} bwd_rfs = set(rf) & {-1, -2, -3} if rf is None or len(fwd_rfs) > 0: for m in re.finditer(sub, str(seq)): if (i := m.start()) >= start: # bisect(gaps, i) gives number of gaps before index i this_rf = (i - start - (bisect(gaps, i) if gaps else 0)) % 3 if rf is not None else None if this_rf is None or this_rf in rf: m = BioMatch(m, rf=this_rf, seqlen=len(seq), seqid=seq.id) if matchall: matches.append(m) else: return m if rf is not None and len(bwd_rfs) > 0: seq = seq.copy().rc() for m in re.finditer(sub, str(seq)): if (i := m.start()) >= start: # bisect(gaps, i) gives number of gaps before index i this_rf = (i - start - (bisect(gaps, i) if gaps else 0)) % 3 if -1*this_rf-1 in rf: m = BioMatch(m, rf=-1*this_rf-1, seqlen=len(seq), seqid=seq.id) if matchall: matches.append(m) else: return m if matchall: return BioMatchList(matches)
[docs] class ORFList(FeatureList): """ List of open reading frames (ORFs) """ pass
def _inds2orf(i1, i2, rf, lensec, ftype='ORF', seqid=None, has_start=None, has_stop=None): """ Create ORF feature from indices """ from sugar import Feature if rf is None or rf >= 0: strand = '+' else: i1, i2 = lensec - i2, lensec - i1 strand = '-' assert i1 < i2 ft = Feature(ftype, start=i1, stop=i2, strand=strand) ft.seqid = seqid ft.meta.rf = rf ft.meta.has_start = has_start ft.meta.has_stop = has_stop return ft def _get_from_list(l, i, default=None): try: return l[i] except IndexError: return default
[docs] def find_orfs(seq, rf='all', start='start', stop='stop', need_start='always', need_stop=True, nested='other', gap=None, len_ge=0, ftype='ORF'): """ Find open reading frames (ORFs) :param seq: `.BioSeq` sequence :param rf: reading frame, possible values: int, string or tuple. See also `~match`. Default is ``'all'``. :param start: regular expression defining the start codons, defaults to ATG/AUG :param stop: regular expression defining the stop codons, defaults to stop codons in the default translation table :param need_start: One of ``('always', 'once', 'never')``. Always: Each ORF starts with a start codon. Once: Only the first ORF on the forward and backward strand starts with a start codon. Never: ORFs can start at each codon. :param need_stop: Whether the last ORF in each RF must end with a stop codon. :param nested: Allow nested ORFs fully contained within other ORFs, one of ``('no', 'other', 'all')``. No: No nested ORFs. Other: Nested ORFs allowed in other reading frames (default). All: Nested ORFs allowed in all reading frames. :param gap: Gap character inserted into the start and stop codon regexes, default is None. :param len_ge: Return only ORFs with length greater equal, default: 0. :param ftype: Feature type for found ORFs, default is ``'ORF'`` :returns: Returns a `~ORFList` of all found ORFs. You can attach these features to sequences using `.BioSeq.add_fts()` or `.BioBasket.add_fts()`. Use the `.BioSeq.fts` and `.BioBasket.fts` properties to overwrite features with the found ORFs. .. note:: Python's :func:`python:re.finditer` is used internally to find start and stop codons. The limitations of this function apply; for example, matches cannot overlap. Care must be taken in special cases. For instance, if ORFs do not need to start with a start codon, do not use the regular expression ``start='...'``; use the ``need_start='never'`` option instead. """ # rf 0, 1, 2, -1, -2, -3, 'fwd', 'bwd', 'all' assert need_start in ('never', 'always', 'once') assert nested in ('no', 'other', 'all') if isinstance(rf, int): rf = (rf,) if rf == 'fwd': rf = (0, 1, 2) elif rf == 'bwd': rf = (-1, -2, -3) elif rf in ('all', 'both'): rf = (0, 1, 2, -1, -2, -3) rfsort = lambda i: [0, 1, 2, -1, -2, -3].index(i) rf = sorted(rf, key=rfsort) all_starts = seq.matchall(start, rf=rf, gap=gap) all_starts.reverse() starts = all_starts.groupby('rf') all_stops = seq.matchall(stop, rf=rf, gap=gap) all_stops.reverse() stops = all_stops.groupby('rf') orfs = [] for frame in rf: start_pos = {m._match.start() for m in starts.get(frame, [])} framefwd = frame if frame >= 0 else abs(frame) - 1 i2 = None while i2 is None or i2 < len(seq) - 2: # find start position of ORF if i2 is None and need_start in ('never', 'once'): if need_start == 'once': kw = dict(rf=rf, start=start, stop=stop, need_start='always', need_stop=need_stop, nested='other', gap=gap, len_ge=len_ge, ftype='ORF') # TODO this can be enhanced, we only need the first ORF on the fwd and bwd strand. start_orfs = seq.find_orfs(**kw).select(rf_in=((0, 1, 2) if frame >= 0 else (-1, -2, -3))) if len(start_orfs) == 0: break first_start = start_orfs[0] i1 = first_start.loc.start - 1 frame0 = first_start.meta.rf if frame0 < 0: i1 = len(seq) - first_start.loc.stop - 1 frame0 = abs(frame0) - 1 else: i1 = -1 frame0 = 0 # find in-frame start position accounting for gaps for i in range(i1+1, len(seq)): if gap is None or seq.data[i if frame >= 0 else len(seq) - 1 - i] not in gap: i1 += 1 if i1 % 3 == framefwd: break else: break elif i2 is not None and need_start in ('never', 'once'): i1 = i2 else: try: i1 = starts[frame].pop()._match.start() except (KeyError, IndexError): # no new start codon found break # find end position of ORF if i2 is not None and i1 < i2: # start codon before last stop codon (nested ORF) if nested in ('other', 'no'): continue else: while stops.get(frame): i2 = stops[frame].pop()._match.end() if i1 < i2: # stop codons before ORF starts are ignored has_stop = True break else: # did not found any stop codon if need_stop: break has_stop = False if gap is not None and any(g in seq.data for g in gap): warnings.warn('The stop position of the last ORF is set to the end of the sequence for `need_stop=False` in gap mode') i2 = len(seq) # TODO: need to account for gaps # num_chars_from_end = (len(seq) - sum(seq.str.count(g) for g in gap) - framefwd) % 3 # while num_chars_from_end > 0: # i2 -= 1 # if seq.data[i2] not in gap: # num_chars_from_end -= 1 else: i2 = len(seq) - (len(seq) - framefwd) % 3 orf = _inds2orf(i1, i2, frame, len(seq), seqid=seq.id, ftype=ftype, has_start=i1 in start_pos, has_stop=has_stop) if len(orf) >= len_ge: orfs.append(orf) orfs = ORFList(orfs) if nested == 'no': orfs.remove_nested() return orfs
[docs] def translate(seq, *, complete=False, check_start=None, check_stop=False, final_stop=None, warn=False, astop='X', gap='-', gap_after=2, tt=1): """ Translate a string or `.BioSeq` object into an amino acid string :param bool complete: If set to ``True`` ignore stop codons, otherwise the translation is stopped at the first stop codon :param bool check_start: Check that the first codon is a start codon, default is True for ``complete=False`` otherwise False :param bool check_stop: Check that the sequence ends with the first stop codon, default is False :param bool final_stop: Append * for the final stop character, defaults to False for ``complete=False`` and True for ``complete=True`` :param bool warn: Warn if the first codon might not be a start codon, warn for ambiguous stop codons, warn if the sequence does not end with a stop codon, default is False :param str astop: Symbol for ambiguous stop codons :param str gap: Gap character, default ``'-'``, set to ``None`` to raise an error for non-nucleotide characters :param int gap_after: A single gap in the amino acid string is written after the first ``gap_after`` gaps in the nucleotide sequence and after every third gap thereafter, default is 2 :param int tt: the number of the translation table, default is 1 :return: Translated string """ gc = gcode(tt) aas = [] ngap = 0 check_start = check_start if check_start is not None else not complete check_start_warn = warn final_stop = final_stop if final_stop is not None else complete codon = '' for i, nt in enumerate(str(seq).replace('U', 'T')): if nt == gap: ngap += 1 else: codon = codon + nt if gap and gap_after is not None and ngap == gap_after: aas.append(gap) ngap -= 3 if len(codon) == 3: if check_start_warn or check_start: if codon not in gc.starts and codon not in gc.astarts: msg = f'Codon {codon} is not a start codon {gc.starts}' if check_start: raise ValueError(msg) else: warnings.warn(msg) check_start_warn = False check_start = False if check_start_warn: check_start_warn = False if codon not in gc.starts: msg = f'Codon {codon} possibly is not a start codon.' warnings.warn(msg) try: aa = gc.tt[codon] except (KeyError): aa = 'X' if codon in gc.astops: aa = astop if warn: warnings.warn(f'Codon {codon} might be a stop codon.') if codon in gc.stops: if (check_stop or warn) and i < len(seq) - 3: msg = 'First stop codon is not at the end of the sequence.' if check_stop: raise ValueError(msg) else: warnings.warn(msg) if i >= len(seq) - 3 or not complete: if final_stop: aas.append(aa) break aas.append(aa) codon = '' else: if (check_stop or warn) and codon not in gc.astops: msg = f'Last codon {codon} is not a stop codon {gc.stops}' if check_stop: raise ValueError(msg) else: warnings.warn(msg) elif warn and codon in gc.astops: msg = f'Last codon {codon} possibly is not a stop codon' warnings.warn(msg) return ''.join(aas)