Coverage for encodermap/trajinfo/info_all.py: 70%
383 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-07 11:05 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-07 11:05 +0000
1# -*- coding: utf-8 -*-
2# encodermap/trajinfo/info_all.py
3################################################################################
4# Encodermap: A python library for dimensionality reduction.
5#
6# Copyright 2019-2022 University of Konstanz and the Authors
7#
8# Authors:
9# Kevin Sawade
10#
11# Encodermap is free software: you can redistribute it and/or modify
12# it under the terms of the GNU Lesser General Public License as
13# published by the Free Software Foundation, either version 2.1
14# of the License, or (at your option) any later version.
15# This package is distributed in the hope that it will be useful to other
16# researches. IT DOES NOT COME WITH ANY WARRANTY WHATSOEVER; without even the
17# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18# See the GNU Lesser General Public License for more details.
19#
20# See <http://www.gnu.org/licenses/>.
21################################################################################
22"""Classes to work with ensembles of trajectories.
24The statistics of a protein can be better described by an ensemble of proteins,
25rather than a single long trajectory. Treating a protein in such a way opens great
26possibilities and changes the way one can treat molecular dynamics data.
27Trajectory ensembles allow:
28 * Faster convergence via adaptive sampling.
31This subpackage contains two classes which are containers of trajecotry data.
32The SingleTraj trajecotry contains information about a single trajecotry.
33The TrajEnsemble class contains information about multiple trajectories. This adds
34a new dimension to MD data. The time and atom dimension are already established.
35Two frames can be appended along the time axis to get a trajectory with multiple
36frames. If they are appended along the atom axis, the new frame contains the
37atoms of these two. The trajectory works in a similar fashion. Adding two trajectories
38along the trajectory axis returns a trajectory ensemble, represented as an TrajEnsemble
39class in this package.
41See also:
42 http://statisticalbiophysicsblog.org/?p=92
44"""
46################################################################################
47# Imports
48################################################################################
51from __future__ import annotations
53import copy
54import glob
55import os
56import sys
57from io import StringIO
58from pathlib import Path
59from typing import TYPE_CHECKING, Callable, Iterator, Literal, Optional, Union
61import numpy as np
63from .._optional_imports import _optional_import
64from ..misc.errors import BadError
65from ..misc.misc import (
66 _TOPOLOGY_EXTS,
67 _can_be_feature,
68 _datetime_windows_and_linux_compatible,
69 get_full_common_str_and_ref,
70)
72################################################################################
73# Typing
74################################################################################
77if TYPE_CHECKING: 77 ↛ 78line 77 didn't jump to line 78, because the condition on line 77 was never true
78 import mdtraj as md
79 import pandas as pd
80 import xarray as xr
82 from .info_single import SingleTraj
83 from .trajinfo_utils import TrajEnsembleFeatureType
86################################################################################
87# Optional Imports
88################################################################################
91md = _optional_import("mdtraj")
92pd = _optional_import("pandas")
93xr = _optional_import("xarray")
96################################################################################
97# Globals
98################################################################################
101__all__ = ["TrajEnsemble"]
104################################################################################
105# Utilities
106################################################################################
109class Capturing(list):
110 """Class to capture print statements from function calls.
112 Examples:
113 >>> # write a function
114 >>> def my_func(arg='argument'):
115 ... print(arg)
116 ... return('fin')
117 >>> # use capturing context manager
118 >>> with Capturing() as output:
119 ... my_func('new_argument')
120 >>> print(output)
121 ['new_argument', "'fin'"]
123 """
125 def __enter__(self):
126 self._stdout = sys.stdout
127 sys.stdout = self._stringio = StringIO()
128 return self
130 def __exit__(self, *args):
131 self.extend(self._stringio.getvalue().splitlines())
132 del self._stringio # free up some memory
133 sys.stdout = self._stdout
136##############################################################################
137# Functions
138##############################################################################
141class TrajEnsemble:
142 """This class contains the info about many trajectories.
143 Topologies can be mismatching.
145 This class is a fancy list of `encodermap.trajinfo.SingleTraj` objects. Trajectories can have different topologies and will
146 be grouped by the `common_str` argument.
148 `TrajEnsemble` supports fancy indexing. You can slice to your liking trajs[::5] returns an `TrajEnsemble`
149 object that only consideres every fifth frame. Besides indexing by slices and integers you can pass
150 a 2 dimensional np.array. np.array([[0, 5], [1, 10], [5, 20]]) will return a `TrajEnsemble` object with
151 frame 5 of trajectory 0, frame 10 of trajectory 1 and frame 20 of trajectory 5. Simply passing an integer
152 as index returns the corresponding `SingleTraj` object.
154 The `TrajEnsemble` class also contains an iterator to iterate over trajectores. You could do::
155 >>> for traj in trajs:
156 ... for frame in traj:
157 ... print(frame)
159 Attributes:
160 CVs (dict): The collective variables of the `SingleTraj` classes. Only CVs with matching names in all
161 `SingleTraj` classes are returned. The data is stacked along a hypothetical time axis along the trajs.
162 _CVs (xarray.Dataset): The same data as in CVs but with labels. Additionally, the xarray is not stacked along the
163 time axis. It contains an extra dimension for trajectories.
164 n_trajs (int): Number of individual trajectories in this class.
165 n_frames (int): Number of frames, sum over all trajectories.
166 locations (list of str): A list with the locations of the trajectories.
167 top (list of mdtraj.Topology): A list with the reference pdb for each trajecotry.
168 basenames (list of str): A list with the names of the trajecotries.
169 The leading path and the file extension is omitted.
170 name_arr (np.ndarray of str): An array with len(name_arr) == n_frames.
171 This array keeps track of each frame in this object by identifying each
172 frame with a filename. This can be useful, when frames are mixed inside
173 an TrajEnsemble class.
174 index_arr (np.ndarray of str): index_arr.shape = (n_frames, 2). This array keeps track
175 of each frame with two ints. One giving the number of the trajectory, the other the frame.
177 Examples:
178 >>> # Create a trajectory ensemble from a list of files
179 >>> import encodermap as em
180 >>> trajs = em.TrajEnsemble(['https://files.rcsb.org/view/1YUG.pdb', 'https://files.rcsb.org/view/1YUF.pdb'])
181 >>> # trajs are inernally numbered
182 >>> print([traj.traj_num for traj in trajs])
183 [0, 1]
184 >>> # Build a new traj from random frames
185 >>> # Let's say frame 2 of traj 0, frame 5 of traj 1 and again frame 2 of traj 0
186 >>> # Doing this every frame will now be its own trajectory for easier bookkepping
187 >>> arr = np.array([[0, 2], [1, 5], [0, 2]])
188 >>> new_trajs = trajs[arr]
189 >>> print(new_trajs.n_trajs)
190 3
191 >>> # trace back a single frame
192 >>> frame_num = 28
193 >>> index = trajs.index_arr[frame_num]
194 >>> print('Frame {}, originates from trajectory {}, frame {}.'.format(frame_num, trajs.basenames[index[0]], index[1]))
195 Frame 28, originates from trajectory 1YUF, frame 13.
197 """
199 def __init__(
200 self,
201 trajs: Union[list[str], list[md.Trajectory], list[SingleTraj], list[Path]],
202 tops: Optional[list[str]] = None,
203 backend: Literal["mdtraj", "no_load"] = "no_load",
204 common_str: Optional[list[str]] = None,
205 basename_fn: Optional[Callable] = None,
206 ) -> None:
207 """Initialize the Info class with two lists of files.
209 Args:
210 trajs (Union[list[str], list[md.Trajectory], list[SingleTraj], list[Path]]):
211 List of strings with paths to trajectories.
212 tops (Optional[list[str]]): List of strings with paths to reference pdbs.
213 backend (str, optional): Chooses the backend to load trajectories.
214 * 'mdtraj' uses mdtraj which loads all trajecoties into RAM.
215 * 'no_load' creates an empty trajectory object.
216 Defaults to 'no_load'.
217 common_str (list of str, optional): If you want to include trajectories with
218 different topology. The common string is used to pair traj-files
219 (.xtc, .dcd, .lammpstrj) with their topology (.pdb, .gro, ...). The common-string
220 should be a substring of matching trajs and topologies.
221 basename_fn (Union[None, function], optional): A function to apply to the `traj_file` string to return the
222 basename of the trajectory. If None is provided, the filename without extension will be used. When
223 all files are named the same and the folder they're in defines the name of the trajectory you can supply
224 `lambda x: split('/')[-2]` as this argument. Defaults to None.
226 Raises:
227 TypeError: If some of your inputs are mismatched. If your input lists
228 contain other types than str or mdtraj.Trajecotry.
230 """
231 # defaults
232 if not trajs: 232 ↛ 233line 232 didn't jump to line 233, because the condition on line 232 was never true
233 raise BadError("You provided an empty list for `trajs`.")
235 self.backend = backend
237 # basename function
238 if basename_fn is None:
239 basename_fn = lambda x: os.path.basename(x).split(".")[0]
240 self.basename_fn = basename_fn
242 # common string
243 if common_str is None:
244 common_str = []
245 if isinstance(common_str, str): 245 ↛ 246line 245 didn't jump to line 246, because the condition on line 245 was never true
246 self.common_str = [common_str]
247 else:
248 self.common_str = common_str
250 # loading with setters
251 if tops is None:
252 tops = []
253 self._top_files = tops
254 if all([isinstance(traj, str) for traj in trajs]):
255 if self._top_files == [] and all( 255 ↛ exit, 255 ↛ 2582 missed branches: 1) line 255 didn't jump to the function exit, 2) line 255 didn't jump to line 258, because the condition on line 255 was never true
256 ["." + top.split(".")[-1] in _TOPOLOGY_EXTS for top in trajs]
257 ):
258 self._top_files = trajs
259 if isinstance(tops, str):
260 self._top_files = [tops]
261 self.traj_files = trajs
263 @classmethod
264 def from_textfile(
265 cls,
266 fname,
267 basename_fn=None,
268 ) -> TrajEnsemble:
269 """Creates an `TrajEnsemble` object from a textfile.
271 The textfile needs to be space-separated with two or three columns.
272 Column 1: The trajectory file.
273 Column 2: The corresponding topology file (If you are using .h5 trajs,
274 column 1 and 2 will be identical).
275 Column 3: The common string of the trajectory. This column can be left
276 out, which will result in an `TrajEnsemble` without common_strings.
278 Args:
279 fname (str): File to be read.
280 basename_fn (Union[None, function], optional): A function to apply to the `traj_file` string to return the
281 basename of the trajectory. If None is provided, the filename without extension will be used. When
282 all files are named the same and the folder they're in defines the name of the trajectory you can supply
283 `lambda x: split('/')[-2]` as this argument. Defaults to None.
285 Returns:
286 TrajEnsemble: An instantiated TrajEnsemble class.
288 """
289 from ..trajinfo import info_single
291 traj_files = []
292 top_files = []
293 common_str = []
295 with open(fname, "r") as f:
296 for row in f:
297 traj_files.append(row.split()[0])
298 top_files.append(row.split()[1])
299 try:
300 common_str.append(row.split()[2])
301 except IndexError:
302 common_str.append("")
304 trajs = []
305 for i, (traj_file, top_file, cs) in enumerate(
306 zip(traj_files, top_files, common_str)
307 ):
308 trajs.append(info_single.SingleTraj(traj_file, top_file, cs))
310 return cls(trajs, common_str=np.unique(common_str), basename_fn=basename_fn)
312 @classmethod
313 def from_xarray(
314 cls,
315 fnames,
316 basename_fn=None,
317 ) -> TrajEnsemble:
318 from ..trajinfo import SingleTraj
320 if isinstance(fnames, str):
321 fnames = glob.glob(fnames)
322 ds = xr.open_mfdataset(
323 fnames,
324 group="CVs",
325 concat_dim="traj_num",
326 combine="nested",
327 engine="h5netcdf",
328 )
329 trajs = []
330 for traj_num in ds.traj_num:
331 sub_ds = ds.sel(traj_num=traj_num)
332 fname = [
333 fname for fname in fnames if str(sub_ds.traj_name.values) in fname
334 ][0]
335 traj = SingleTraj(fname, traj_num=traj_num)
336 traj._CVs = sub_ds
337 trajs.append(traj)
338 return cls(trajs, basename_fn=basename_fn)
340 @property
341 def traj_files(self) -> list[str]:
342 """list: A list of the traj_files of the individual SingleTraj classes."""
343 return self._traj_files
345 @traj_files.setter
346 def traj_files(self, trajs):
347 from ..trajinfo import info_single
349 # fill this lists
350 self.trajs = []
352 if all([isinstance(traj, Path) for traj in trajs]):
353 trajs = [str(traj) for traj in trajs]
355 if all([isinstance(i, md.Trajectory) for i in trajs]):
356 self.backend = "mdtraj"
357 self.trajs = [
358 info_single.SingleTraj(traj, traj_num=i, basename_fn=self.basename_fn)
359 for i, traj in enumerate(trajs)
360 ]
361 self._traj_files = []
362 self._top_files = []
363 elif all([i.__class__.__name__ == "SingleTraj" for i in trajs]):
364 self.trajs = trajs
365 self._top_files = [traj.top_file for traj in self.trajs]
366 self._traj_files = [traj.traj_file for traj in self.trajs]
367 # check backends and common str
368 if (
369 not all([traj.backend == "no_load" for traj in trajs])
370 or self.backend == "mdtraj"
371 ):
372 (traj.load_traj() for traj in trajs) 372 ↛ exitline 372 didn't run the generator expression on line 372
373 for i, traj in enumerate(trajs):
374 if traj.traj_num is None:
375 traj.traj_num = i
376 elif all([isinstance(i, str) for i in trajs]) and self.top_files:
377 # find common_str matches in top_files and traj_files
378 (
379 self._traj_files,
380 self._top_files,
381 self._common_str,
382 ) = get_full_common_str_and_ref(trajs, self._top_files, self.common_str)
383 for i, (t, top, cs) in enumerate(
384 zip(self._traj_files, self._top_files, self._common_str)
385 ):
386 self.trajs.append(
387 info_single.SingleTraj(
388 traj=t,
389 top=top,
390 backend=self.backend,
391 common_str=cs,
392 traj_num=i,
393 basename_fn=self.basename_fn,
394 )
395 )
396 elif all([isinstance(i, str) for i in trajs]) and not self.top_files: 396 ↛ 406line 396 didn't jump to line 406, because the condition on line 396 was never false
397 for i, traj_file in enumerate(trajs):
398 self.trajs.append(
399 info_single.SingleTraj(
400 traj=traj_file,
401 basename_fn=self.basename_fn,
402 traj_num=i,
403 )
404 )
405 else:
406 raise TypeError(
407 "The objects in the list are not of the correct type or inconsistent. "
408 f"You provided {[c.__class__.__name__ for c in trajs]}. "
409 "Please provide a list of `str`, list of `mdtraj.Trajectory` or list of `SingleTraj`."
410 )
412 @property
413 def top(self) -> list[md.Topology]:
414 """list: Returns a minimal set of mdtraj.Topologies.
416 If all trajectories share the same topology a list
417 with len 1 will be returned.
419 """
420 out = []
421 for traj in self.trajs:
422 try:
423 if traj.top not in out:
424 out.append(traj.top)
425 except IOError:
426 print(self.trajs)
427 print("A rather peculiar error")
428 raise
429 return out
431 @property
432 def id(self) -> np.ndarray:
433 """np.ndarray: Duplication of self.index_arr"""
434 return self.index_arr
436 @property
437 def top_files(self) -> list[str]:
438 """list: Returns minimal set of topology files.
440 If yoy want a list of top files with the same
441 length as self.trajs use self._top_files and
442 self._traj_files.
444 """
445 return list(dict.fromkeys(self._top_files))
447 @property
448 def n_residues(self) -> int:
449 """list: List of number of residues of the SingleTraj classes"""
450 return [traj.n_residues for traj in self.trajs]
452 @property
453 def basenames(self) -> list[str]:
454 """list: List of the basenames in the Info single classes."""
455 return [traj.basename for traj in self.trajs]
457 @property
458 def traj_nums(self) -> list[int]:
459 """list: Number of info single classes in self."""
460 return [traj.traj_num for traj in self.trajs]
462 @property
463 def n_trajs(self) -> int:
464 """int: Number of trajectories in this encemble."""
465 return len(self.trajs)
467 @property
468 def _CVs(self) -> xr.Dataset:
469 """xarray.Dataset: Returns x-array Dataset of matching CVs. stacked along the trajectory-axis."""
470 if len(self.top) > 1:
471 print(
472 "This `TrajEnsemble` object contains mulitple topologies. The "
473 "output of _CVs can contain nans for some features."
474 )
475 return xr.combine_nested(
476 [traj._CVs for traj in self.trajs],
477 concat_dim="traj_num",
478 compat="broadcast_equals",
479 fill_value=np.nan,
480 coords="all",
481 join="outer",
482 )
483 # return xr.concat([traj._CVs for traj in self.trajs], dim='traj_num')
484 # except ValueError:
485 # # when ensemble is used, concatenation is more difficult
486 # # some values cannot be combined into a smooth array (with nan)
487 # DAs = {}
488 # matching_keys = list(
489 # set.intersection(*[set(traj.CVs.keys()) for traj in self.trajs])
490 # )
492 # for key in matching_keys:
493 # data = []
494 # feat_name = key.upper()
495 # for traj in self.trajs:
496 # data.append(traj._CVs[key].values.squeeze())
497 # data = np.stack(data)
498 # da = make_ensemble_xarray(key, data)
499 # DAs[key] = da
500 # ds = xr.Dataset(DAs)
502 # return ds
504 @property
505 def CVs(self) -> dict[str, np.ndarray]:
506 """dict: Returns dict of CVs in SingleTraj classes. Only CVs with the same names
507 in all SingleTraj classes are loaded.
509 """
510 if (
511 not all([traj.CVs for traj in self.trajs])
512 or [traj.CVs for traj in self.trajs] == []
513 ):
514 return {}
515 else:
516 CVs = {}
517 matching_keys = list(
518 set.intersection(*[set(traj.CVs.keys()) for traj in self.trajs])
519 )
520 dropping_keys = set(matching_keys).difference(
521 *[set(traj.CVs.keys()) for traj in self.trajs]
522 )
523 if dropping_keys: 523 ↛ 524line 523 didn't jump to line 524, because the condition on line 523 was never true
524 print(
525 f"The CVs {dropping_keys} will not be in the `CVs` dictionary, "
526 f"as they are only present in some, but not all of the {len(self.trajs)} "
527 f"trajectories. You can access them with "
528 f"`TrajEnsemble([t for t in trajs if any([cv in {dropping_keys} for cv in t.CVs.keys()])])`"
529 )
530 if matching_keys != []: 530 ↛ 553line 530 didn't jump to line 553, because the condition on line 530 was never false
531 for key in matching_keys:
532 data = []
533 for traj in self.trajs:
534 data.append(traj._CVs[key].values)
535 # check if all shapes are the same
536 shapes = [d.shape[2:] for d in data]
537 if not len(set(shapes)) == 1: 537 ↛ 538line 537 didn't jump to line 538, because the condition on line 537 was never true
538 print(
539 f"I am not returning the CVs {key}. As, some trajectories have different "
540 f"shapes for these CVs. The shapes are {set(shapes)}."
541 )
542 continue
543 if np.all(
544 [
545 any([isinstance(ind, int) for ind in traj.index])
546 for traj in self.trajs
547 ]
548 ):
549 data = np.vstack([d for d in data]).squeeze()
550 else:
551 data = np.concatenate([d.squeeze() for d in data], axis=0)
552 CVs[key] = data
553 return CVs
555 @property
556 def locations(self) -> list[str]:
557 """list: Duplication of self.traj_files but using the trajs own traj_file attribute.
558 Ensures that traj files are always returned independent from current load state."""
559 return [traj.traj_file for traj in self.trajs]
561 @property
562 def index_arr(self) -> np.ndarray:
563 """np.ndarray: Returns np.ndarray with ndim = 2. Clearly assigning every loaded frame an identifier of
564 traj_num (self.index_arr[:,0]) and frame_num (self.index_arr[:,1]). Can be used to create
565 a unspecified subset of frames and can be useful when used with clustering.
567 """
568 # can also be made to use the SingleTraj.index_arr attribute
569 # but doing it this way the traj is loaded.
570 # which might slow down thing significantly
571 return np.vstack([traj.id for traj in self.trajs])
573 @property
574 def name_arr(self) -> np.ndarray:
575 """np.ndarray: Trajectory names with the same length as self.n_frames."""
576 name_arr = []
577 if not np.all([traj.n_frames for traj in self.trajs]):
578 return np.array(name_arr)
579 else:
580 for x, traj in enumerate(self.trajs):
581 names = [traj.basename for i in range(traj.n_frames)]
582 name_arr.extend(names)
583 return np.array(name_arr)
585 @property
586 def n_frames(self) -> int:
587 """int: Sum of the loaded frames."""
588 return sum([traj.n_frames for traj in self.trajs])
590 @property
591 def frames(self) -> list[int]:
592 """list: Frames of individual trajectories."""
593 return [traj.n_frames for traj in self.trajs]
595 @property
596 def CVs_in_file(self) -> bool:
597 """bool: Is true, if CVs can be loaded from file. Can be used to build a data generator from."""
598 return all([traj.CVs_in_file for traj in self.trajs])
600 @property
601 def traj_joined(self) -> md.Trajectory:
602 """mdtraj.Trajectory: Returns a mdtraj Trajectory with every frame of this class appended along the time axis.
604 Can also work if different topologies (with the same number of atoms) are loaded.
605 In that case, the first frame in self will be used as topology parent and the remaining frames'
606 xyz coordinates are used to position the parents' atoms accordingly.
609 Examples:
610 >>> import encodermap as em
611 >>> single_mdtraj = trajs.split_into_frames().traj_joined
612 >>> print(single_mdtraj)
613 <mdtraj.Trajectory with 31 frames, 720 atoms, 50 residues, without unitcells>
615 """
616 # use traj[0] of the trajs list as the traj from which the topology will be used
617 parent_traj = self.trajs[0].traj
619 # join the correct number of trajs
620 # by use of the divmod method, the frames parent_traj traj will be
621 # appended for a certain amount, until the remainder of the division
622 # is met by that time, the parent traj will be sliced to fill the correct number of frames
623 try:
624 no_of_iters, rest = divmod(self.n_frames, parent_traj.n_frames)
625 except Exception:
626 print(parent_traj.n_frames)
627 raise
628 for i in range(no_of_iters + 1):
629 if i == 0:
630 dummy_traj = copy.deepcopy(parent_traj)
631 elif i == no_of_iters:
632 dummy_traj = dummy_traj.join(copy.deepcopy(parent_traj)[:rest])
633 else:
634 dummy_traj = dummy_traj.join(copy.deepcopy(parent_traj))
636 # some checks
637 assert self.n_frames == dummy_traj.n_frames
638 assert self.n_frames == len(self.trajs)
640 # change the xyz coordinates of dummy_traj according to the frames in joined trajs
641 for i, traj in enumerate(self.trajs):
642 dummy_traj.xyz[i] = traj.xyz
644 return dummy_traj
646 @property
647 def xyz(self) -> np.ndarray:
648 """np.ndarray: xyz coordinates of all atoms stacked along the traj-time axis. Only works if all trajs share the same topology."""
649 if len(self.top) == 1: 649 ↛ 653line 649 didn't jump to line 653, because the condition on line 649 was never false
650 xyz = np.vstack([traj.xyz for traj in self.trajs])
651 return xyz
652 else:
653 try:
654 xyz = np.vstack([traj.xyz for traj in self.trajs])
655 return xyz
656 except Exception as e:
657 msg = (
658 "Non consistent topologies don't allow to return a "
659 "common xyz. This could be achived by implementing a "
660 "high-dimensional masked numpy array with nans at "
661 "non-defined positions."
662 )
663 e2 = Exception(msg)
664 raise e2 from e
666 def split_into_frames(self, inplace: bool = False) -> None:
667 """Splits self into separate frames.
669 Args:
670 inplace (bool, optionale): Whether to do the split inplace or not.
671 Defaults to False and thus, returns a new `TrajEnsemble` class.
673 """
674 frames = []
675 for i, frame in self.iterframes():
676 frames.append(frame)
677 if inplace: 677 ↛ 678line 677 didn't jump to line 678, because the condition on line 677 was never true
678 self = TrajEnsemble(frames)
679 else:
680 return TrajEnsemble(frames)
682 def save_CVs(self, path: Union[str, Path]) -> None:
683 """Saves the CVs to a NETCDF file using xarray."""
684 self._CVs.to_netcdf(path, format="NETCDF4", engine="h5netcdf")
686 def load_CVs(
687 self,
688 data: TrajEnsembleFeatureType,
689 attr_name: Optional[str] = None,
690 cols: Optional[list[int]] = None,
691 labels: Optional[list[str]] = None,
692 directory: Optional[Union[str, Path]] = None,
693 ensemble: bool = False,
694 ) -> None:
695 """Loads CVs in various ways. Easiest way is to provide a single numpy array and a name for that array.
697 Besides np.ndarrays, files (.txt and .npy) can be loaded. Features or Featurizers can be
698 provided. An xarray.Dataset can be provided. A str can be provided that either
699 is the name of one of encodermap's features (encodermap.loading.features) or the
700 string can be 'all', which loads all features required for encodermap's
701 `AngleDihedralCarteisanEncoderMap` class.
703 Args:
704 data (Union[str, list, np.ndarray, 'all', xr.Dataset]): The CV to load. When a numpy array is provided,
705 it needs to have a shape matching n_frames. The data is distributed to the trajs.
706 When a list of files is provided, len(data) needs to match n_trajs. The first file
707 will be loaded by the first traj and so on. If a list of np.arrays is provided,
708 the first array will be assigned to the first traj. If a None is provided,
709 the arg directory will be used to construct
710 fname = directory + traj.basename + '_' + attr_name. The filenames will be used.
711 These files will then be loaded and put into the trajs. Defaults to None.
712 attr_name (Optional[str]): The name under which the CV should be found in the class.
713 Choose whatever you like. `highd`, `lowd`, `dists`, etc...
714 cols (Optional[list[int]]): A list of integers indexing the columns of the data to be loaded.
715 This is useful, if a file contains feature1, feature1, ..., feature1_err, feature2_err
716 formatted data. This option will only be used, when loading multiple .txt files. If None is
717 provided all columns will be loaded. Defaults to None.
718 labels (list): A list containing the labels for the dimensions of the data.
719 Defaults to None.
720 directory (Optional[str]): The directory to save the data at, if data is an instance of `em.Featurizer`
721 and this featurizer has in_memory set to Fase. Defaults to ''.
722 ensemble (bool): Whether the trajs in this class belong to an ensemble. This implies that
723 they contain either the same topology or are very similar (think wt, and mutant). Setting this
724 option True will try to match the CVs of the trajs onto a same dataset. If a VAL residue has been replaced
725 by LYS in the mutant, the number of sidechain dihedrals will increase. The CVs of the trajs with
726 VAL will thus contain some NaN values. Defaults to False.
728 Raises:
729 TypeError: When wrong Type has been provided for data.
731 """
732 from .trajinfo_utils import load_CVs_ensembletraj
734 load_CVs_ensembletraj(self, data, attr_name, cols, labels, directory, ensemble)
736 def save(self):
737 raise NotImplementedError()
739 def _return_trajs_by_index(self, index: list[int]) -> TrajEnsemble:
740 """Creates a TrajEnsemble object with the trajs specified by index."""
741 # new_common_str = []
742 # new_trajs = []
743 # new_refs = []
744 # new_lowd = []
745 # new_highd = []
746 # for i, traj in enumerate(self.trajs):
747 # if i not in index:
748 # continue
749 # new_common_str.append(traj.common_str)
750 # new_trajs.append(traj.traj_file)
751 # new_refs.append(traj.top_file)
752 # new_lowd.append(traj.lowd)
753 # new_highd.append(traj.highd)
754 # new_common_str = list(set(new_common_str))
755 # trajs_subset = TrajEnsemble(new_trajs, tops=new_refs, backend=self.backend, common_str=new_common_str)
756 # return trajs_subset
758 # is this faster?
759 new_common_str = []
760 for i, traj in enumerate(self.trajs):
761 if i not in index:
762 continue
763 new_common_str.append(traj.common_str)
764 new_common_str = list(set(new_common_str))
765 for i, ind in enumerate(index):
766 if i == 0:
767 trajs_subset = self.trajs[ind]._gen_ensemble()
768 else:
769 new_traj = self.trajs[ind]._gen_ensemble()
770 trajs_subset = trajs_subset + new_traj
771 trajs_subset.common_str = new_common_str
772 trajs_subset.basename_fn = self.basename_fn
773 return trajs_subset
775 def _pyemma_indexing(self, key: np.ndarray) -> TrajEnsemble:
776 """Returns a new TrajEnsemble by giving the indices of traj and frame"""
777 if key.ndim == 1: 777 ↛ 778line 777 didn't jump to line 778, because the condition on line 777 was never true
778 key = key.reshape(len(key), 1).T
779 trajs = []
780 for i, (num, frame) in enumerate(key):
781 trajs.append(self.trajs[num][frame])
782 return TrajEnsemble(
783 trajs, basename_fn=self.basename_fn, common_str=self.common_str
784 )
786 def subsample(
787 self,
788 stride: int,
789 inplace: bool = False,
790 ) -> Union[None, TrajEnsemble]:
791 """Returns a subset of this TrajEnsemble class given the provided stride.
793 This is a faster alternative than using the trajs[trajs.index_arr[::1000]]
794 when HDF5 trajs are used, because the slicing information is saved in the
795 respective SingleTraj classes and loading of single frames is faster in
796 HDF5 formatted trajs.
798 Note:
799 The result from `subsample()` is different from `trajs[trajs.index_arr[::1000]]`.
800 With subsample every trajectory is subsampled independently. Cosnider
801 a TrajEnsemble with two SingleTraj trajectories with 18 frames each.
802 `subsampled = trajs.subsample(5)` would return an TrajEnsemble with two
803 trajs with 3 frames each (`subsampled.n_frames` is 6). Whereas
804 `subsampled = trajs[trajs.index_arr[::5]]` would return an TrajEnsemble
805 with 7 SingleTrajs with 1 frame each (`subsampled.n_frames` is 7).
806 Because the times and frame numbers are saved all the time this should not
807 be too much of a problem.
810 """
811 trajs = []
812 for i, traj in enumerate(self.trajs):
813 _ = traj[slice(None, None, stride)]
814 trajs.append(_)
815 if inplace: 815 ↛ 816line 815 didn't jump to line 816, because the condition on line 815 was never true
816 self = TrajEnsemble(
817 trajs, common_str=self.common_str, basename_fn=self.basename_fn
818 )
819 else:
820 return TrajEnsemble(
821 trajs, common_str=self.common_str, basename_fn=self.basename_fn
822 )
824 def get_single_frame(self, key: int) -> SingleTraj:
825 """Returns a single frame from all loaded trajectories.
827 Consider a TrajEnsemble class with two SingleTraj classes. One has 10 frames,
828 the other 5 (`trajs.n_frames` is 15). Calling `trajs.get_single_frame(12)`
829 is equal to calling `trajs[1][1]`.
831 Args:
832 key (int): The frame to return.
834 Returns:
835 encodermap.SingleTraj: The frame.
837 """
838 # some input checks
839 if self.n_frames == 0: 839 ↛ 840line 839 didn't jump to line 840, because the condition on line 839 was never true
840 raise BadError(
841 "Indexing a no_load backend does not work. I need some information about the frames in each trajectory. Please load either highd or lowd."
842 )
843 if key >= self.n_frames: 843 ↛ 844line 843 didn't jump to line 844, because the condition on line 843 was never true
844 raise IndexError(
845 "index {} is out of bounds for trajectory with {} frames".format(
846 key, self.n_frames
847 )
848 )
849 if not isinstance(key, (int, np.int32, np.int64)): 849 ↛ 850line 849 didn't jump to line 850, because the condition on line 849 was never true
850 raise IndexError(
851 "if you want a single frame, please provide an integer. If you want multiple frames use ep.TrajEnsemble[]"
852 )
854 if len(self.trajs) == 1: 854 ↛ 855line 854 didn't jump to line 855, because the condition on line 854 was never true
855 return self.trajs[0][key]
856 else:
857 num, frame = np.hstack(
858 [
859 np.array([np.full(t.n_frames, t.traj_num), np.arange(t.n_frames)])
860 for t in self.trajs
861 ]
862 ).T[key]
863 traj_out = self.trajs[num][frame]
864 return traj_out
866 def unload(self) -> None:
867 """Unloads all trajs in self."""
868 [traj.unload() for traj in self]
869 self.backend = "no_load"
871 def load_trajs(self) -> None:
872 """Loads all trajs in self."""
873 [traj.load_traj() for traj in self]
874 self.backend = "mdtraj"
876 def itertrajs(self) -> Iterator[tuple[int, SingleTraj]]:
877 """Generator over the SingleTraj classes.
879 Yields:
880 tuple: A tuple containing the following:
881 int: A loop-counter integer. Is identical with traj.traj_num.
882 encodermap.SingleTraj: An SingleTraj object.
884 Examples:
885 >>> import encodermap as em
886 >>> trajs = em.TrajEnsemble(['https://files.rcsb.org/view/1YUG.pdb', 'https://files.rcsb.org/view/1YUF.pdb'])
887 >>> for i, traj in trajs.itertrajs():
888 ... print(traj.basename)
889 1YUG
890 1YUF
892 """
893 for i, traj in enumerate(self.trajs):
894 yield i, traj
896 def iterframes(self) -> Iterator[tuple[int, SingleTraj]]:
897 """Generator over the frames in this class.
899 Yields:
900 tuple: A tuple containing the following:
901 int: A loop-counter integer.
902 encodermap.SingleTraj: An SingleTraj object.
904 Examples:
905 >>> import encodermap as em
906 >>> trajs = em.TrajEnsemble(['https://files.rcsb.org/view/1YUG.pdb', 'https://files.rcsb.org/view/1YUF.pdb'])
907 >>> for i, frame in trajs.iterframes():
908 ... print(frame.basename)
909 ... print(frame.n_frames)
910 ... break
911 1YUG
912 1
914 """
915 iter_ = 0
916 for traj in self.trajs:
917 for frame in traj:
918 yield iter_, frame
919 iter_ += 1
921 def __copy__(self):
922 cls = self.__class__
923 result = cls.__new__(cls)
924 result.__dict__.update(self.__dict__)
925 return result
927 def __deepcopy__(self, memo):
928 from copy import deepcopy
930 cls = self.__class__
931 result = cls.__new__(cls)
932 memo[id(self)] = result
933 for k, v in self.__dict__.items():
934 setattr(result, k, deepcopy(v, memo))
935 return result
937 def __getitem__(self, key):
938 if isinstance(key, (int, np.int32, np.int64)):
939 return self.trajs[key]
940 elif isinstance(key, list): 940 ↛ 941line 940 didn't jump to line 941, because the condition on line 940 was never true
941 new_class = self._return_trajs_by_index(key)
942 return new_class
943 elif isinstance(key, np.ndarray): 943 ↛ 956line 943 didn't jump to line 956, because the condition on line 943 was never false
944 if key.ndim == 1: 944 ↛ 945line 944 didn't jump to line 945, because the condition on line 944 was never true
945 new_class = self._return_trajs_by_index(key)
946 return new_class
947 elif key.ndim == 2: 947 ↛ 951line 947 didn't jump to line 951, because the condition on line 947 was never false
948 new_class = self._pyemma_indexing(key)
949 return new_class
950 else:
951 raise IndexError(
952 "Passing a key with more than 2 dims makes no sense. One dim for trajs, one for frames. Your key has {} dims.".format(
953 key.ndims
954 )
955 )
956 elif isinstance(key, slice):
957 start, stop, step = key.indices(self.n_trajs)
958 list_ = list(range(start, stop, step))
959 new_class = self[list_]
960 return new_class
961 else:
962 raise IndexError("Invalid argument for slicing.")
964 def __reversed__(self):
965 raise NotImplementedError()
967 def __eq__(self, other):
968 # check if traj_files and ids are the same
969 if len(self) != len(other): 969 ↛ 970line 969 didn't jump to line 970, because the condition on line 969 was never true
970 return False
971 else:
972 import functools
974 same_strings = functools.reduce(
975 lambda x, y: x and y,
976 map(
977 lambda a, b: a == b,
978 [traj.traj_file for traj in self.trajs],
979 [traj2.traj_file for traj2 in other.trajs],
980 ),
981 True,
982 )
983 same_ids = all(
984 [
985 np.array_equal(traj1.id, traj2.id)
986 for traj1, traj2 in zip(self.trajs, other.trajs)
987 ]
988 )
989 same_CVs = self._CVs.equals(other._CVs)
990 return same_strings and same_ids and same_CVs
992 def __iter__(self):
993 self._index = 0
994 return self
996 def __next__(self):
997 if self._index >= self.n_trajs:
998 raise StopIteration
999 else:
1000 self._index += 1
1001 return self.trajs[self._index - 1]
1003 def __add__(self, y):
1004 """Addition of two TrajEnsemble objects returns new TrajEnsemble with
1005 trajectories joined along the traj axis.
1007 """
1008 # decide on the new backend
1009 if self.backend != y.backend: 1009 ↛ 1010line 1009 didn't jump to line 1010, because the condition on line 1009 was never true
1010 print("Mismatch between the backends. Using 'mdtraj'.")
1011 y.load_trajs()
1012 self.load_trajs()
1013 # build a common_str_ array with the correct number of entries
1014 # use this to create a new class
1015 # if there are no references in self or y. One of them was created from mdtraj.Trajectories
1016 if not any([self._top_files + y._top_files]): 1016 ↛ 1017line 1016 didn't jump to line 1017, because the condition on line 1016 was never true
1017 new_class = self.__class__(self.trajs + y.trajs, backend=self.backend)
1018 else:
1019 common_str_ = (
1020 get_full_common_str_and_ref(
1021 self.traj_files, self._top_files, self.common_str
1022 )[2]
1023 + get_full_common_str_and_ref(y.traj_files, y._top_files, y.common_str)[
1024 2
1025 ]
1026 )
1027 common_str_ = list(dict.fromkeys(common_str_))
1028 new_class = self.__class__(
1029 self.traj_files + y.traj_files,
1030 self._top_files + y._top_files,
1031 backend=self.backend,
1032 common_str=common_str_,
1033 )
1034 # put the trajs directly in the new class. This way the frames of the SingleTraj classes are preserved
1035 new_class.trajs = self.trajs + y.trajs
1037 return new_class
1039 def __getattr__(self, attr: str):
1040 if attr in self.CVs: 1040 ↛ 1043line 1040 didn't jump to line 1043, because the condition on line 1040 was never false
1041 return self.CVs[attr]
1042 else:
1043 return self.__getattribute__(attr)
1045 def _string_summary(self) -> str:
1046 if all([i.trajectory for i in self.trajs]): 1046 ↛ 1047line 1046 didn't jump to line 1047, because the condition on line 1046 was never true
1047 types = "frames"
1048 amount = self.n_frames
1049 else:
1050 types = "trajs"
1051 amount = self.n_trajs
1052 s = f"encodermap.TrajEnsemble object. Current backend is {self.backend}. Containing {amount} {types}."
1053 if "n_frames" in self.__dict__.keys(): 1053 ↛ 1054line 1053 didn't jump to line 1054, because the condition on line 1053 was never true
1054 s += f" In {self.n_frames} frames total."
1055 if self.common_str: 1055 ↛ 1056line 1055 didn't jump to line 1056, because the condition on line 1055 was never true
1056 s += f" Common str is {self.common_str}."
1057 if self.CVs: 1057 ↛ 1058line 1057 didn't jump to line 1058, because the condition on line 1057 was never true
1058 for key, value in self.CVs.items():
1059 s += f" CV {key} with shape {value.shape} loaded."
1060 else:
1061 s += " Not containing any CVs."
1062 return s
1064 def __len__(self) -> int:
1065 return self.n_frames
1067 def __str__(self) -> str:
1068 return self._string_summary()
1070 def __repr__(self) -> str:
1071 return f"<{self._string_summary()} Object at 0x{id(self):02x}>"