# -*- coding: utf-8 -*-
"""Code for parsing investigation files.
"""
from __future__ import generator_stop
import os
import csv
from datetime import datetime
from pathlib import Path
from typing import Iterator, TextIO
import warnings
from ..constants import investigation_headers
from ..exceptions import ParseIsatabException, ParseIsatabWarning
from .helpers import list_strip
from . import models
__author__ = "Manuel Holtgrewe <manuel.holtgrewe@bihealth.de>"
# Helper function to extract comment headers and values from a section dict
def _parse_comments(section, comment_keys, i=None):
def _parse_comment_header(val):
# key might start with "Comment[" but NOT "Comment ["
tok = val[len("Comment") :]
if not tok or tok[0] != "[" or tok[-1] != "]": # pragma: no cover
tpl = 'Problem parsing comment header "{}"'
msg = tpl.format(val)
raise ParseIsatabException(msg)
return tok[1:-1]
if i is not None:
comments = tuple(
models.Comment(_parse_comment_header(k), section[k][i]) for k in comment_keys
)
else:
comments = tuple(models.Comment(_parse_comment_header(k), section[k]) for k in comment_keys)
return comments
# Helper function to extract protocol parameters
def _split_study_protocols_parameters(
names, name_term_accs, name_term_srcs
) -> Iterator[models.FreeTextOrTermRef]:
names = names.split(";")
name_term_accs = name_term_accs.split(";")
name_term_srcs = name_term_srcs.split(";")
if not (len(names) == len(name_term_accs) == len(name_term_srcs)): # pragma: no cover
tpl = 'Unequal protocol parameter splits; found: "{}", "{}", "{}"'
msg = tpl.format(names, name_term_accs, name_term_srcs)
raise ParseIsatabException(msg)
if len(names) > len(set(names)): # pragma: no cover
tpl = "Repeated protocol parameter; found: {}"
msg = tpl.format(names)
raise ParseIsatabException(msg)
for (name, acc, src) in zip(names, name_term_accs, name_term_srcs):
if any((name, acc, src)): # skips empty parameters
yield models.OntologyTermRef(name, acc, src)
# Helper function to extract protocol components
def _split_study_protocols_components(
names, types, type_term_accs, type_term_srcs
) -> Iterator[models.ProtocolComponentInfo]:
names = names.split(";")
types = types.split(";")
type_term_accs = type_term_accs.split(";")
type_term_srcs = type_term_srcs.split(";")
if not (
len(names) == len(types) == len(type_term_accs) == len(type_term_srcs)
): # pragma: no cover
tpl = "Unequal protocol component splits; " 'found: "{}", "{}", "{}", "{}"'
msg = tpl.format(names, types, type_term_accs, type_term_srcs)
raise ParseIsatabException(msg)
if len(names) > len(set(names)): # pragma: no cover
tpl = "Repeated protocol components; found: {}"
msg = tpl.format(names)
raise ParseIsatabException(msg)
for (name, ctype, acc, src) in zip(
names, types, type_term_accs, type_term_srcs
): # pragma: no cover
if not name and any((ctype, acc, src)):
tpl = "Missing protocol component name; " 'found: "{}", "{}", "{}", "{}"'
msg = tpl.format(name, ctype, acc, src)
raise ParseIsatabException(msg)
if any((name, ctype, acc, src)): # skips empty components
yield models.ProtocolComponentInfo(name, models.OntologyTermRef(ctype, acc, src))
# Helper function to validate and convert string dates to date objects
def _parse_date(date_string) -> datetime.date:
if date_string:
try:
date = datetime.strptime(date_string, "%Y-%m-%d").date()
except ValueError as e: # pragma: no cover
tpl = 'Invalid ISO8601 date "{}"'
msg = tpl.format(date_string)
raise ParseIsatabException(msg) from e
else:
date = None
return date
[docs]class InvestigationReader:
"""
Main class to read an investigation file into an ``InvestigationInfo`` object.
:type input_file: TextIO
:param input_file: ISA-Tab investigation file
"""
[docs] @classmethod
def from_stream(self, input_file: TextIO, filename=None):
"""Construct from file-like object"""
return InvestigationReader(input_file, filename)
def __init__(self, input_file: TextIO, filename=None):
self._filename = filename or getattr(input_file, "name", "<no file>")
self._reader = csv.reader(input_file, delimiter="\t", quotechar='"')
self._line = None
self._read_next_line()
def _read_next_line(self):
"""Read next line, skipping comments starting with ``'#'``."""
prev_line = self._line
try:
self._line = list_strip(next(self._reader))
while self._line is not None and (not self._line or self._line[0].startswith("#")):
self._line = list_strip(next(self._reader))
except StopIteration:
self._line = None
return prev_line
def _next_line_startswith_comment(self):
if not self._line:
return False
else:
return self._line[0].startswith("Comment")
def _next_line_startswith(self, token):
"""Return whether line starts with ``token``"""
if not self._line:
return False
else:
return self._line[0].startswith(token)
[docs] def read(self) -> models.InvestigationInfo:
"""
Read the investigation file
:rtype: models.InvestigationInfo
:returns: Investigation model including all information from the investigation file
"""
# Read sections in fixed order
# ("section headings MUST appear in the Investigation file (in order)")
ontology_refs = {o.name: o for o in self._read_ontology_source_reference()}
info = self._read_basic_info()
publications = list(self._read_publications())
contacts = list(self._read_contacts())
studies = list(self._read_studies())
investigation = models.InvestigationInfo(
ontology_refs, info, publications, contacts, studies
)
return investigation
# reader for content of sections with possibly multiple columns
# i.e. ONTOLOGY SOURCE REFERENCE, INVESTIGATION PUBLICATIONS,
# INVESTIGATION CONTACTS, STUDY DESIGN DESCRIPTORS, STUDY PUBLICATIONS,
# STUDY FACTORS, STUDY ASSAYS, STUDY PROTOCOLS, STUDY CONTACTS
def _read_multi_column_section(self, prefix, ref_keys, section_name):
section = {}
comment_keys = []
while self._next_line_startswith(prefix) or self._next_line_startswith_comment():
line = self._read_next_line()
key = line[0]
if key.startswith("Comment"):
comment_keys.append(key)
elif key not in ref_keys: # pragma: no cover
tpl = "Line must start with one of {} but is {}"
msg = tpl.format(ref_keys, line)
raise ParseIsatabException(msg)
if key in section: # pragma: no cover
tpl = 'Key {} repeated, previous value "{}"'
msg = tpl.format(key, section[key])
raise ParseIsatabException(msg)
section[key] = line[1:]
# Check that all keys are given and all contain the same number of entries
if len(section) != len(ref_keys) + len(comment_keys): # pragma: no cover
tpl = "Missing entries in section {}; only found: {}"
msg = tpl.format(section_name, list(sorted(section)))
raise ParseIsatabException(msg) # TODO: should be warning?
if not len(set([len(v) for v in section.values()])) == 1: # pragma: no cover
tpl = "Inconsistent entry lengths in section {}"
msg = tpl.format(section_name)
raise ParseIsatabException(msg)
return section, comment_keys
# reader for content of a section with only one column
# i.e. INVESTIGATION and STUDY
def _read_single_column_section(self, prefix, ref_keys, section_name):
# Read the lines in this section.
section = {}
comment_keys = []
while self._next_line_startswith(prefix) or self._next_line_startswith_comment():
line = self._read_next_line()
if len(line) > 2: # pragma: no cover
tpl = "Line {} contains more than one value: {}"
msg = tpl.format(line[0], line[1:])
raise ParseIsatabException(msg)
key = line[0]
if key.startswith("Comment"):
comment_keys.append(key)
elif key not in ref_keys: # pragma: no cover
tpl = "Line must start with one of {} but is {}"
msg = tpl.format(ref_keys, line)
raise ParseIsatabException(msg)
if key in section: # pragma: no cover
tpl = 'Key {} repeated, previous value "{}"'
msg = tpl.format(key, section[key])
raise ParseIsatabException(msg)
# read value if field is available, empty string else
section[key] = line[1] if len(line) > 1 else ""
# Check that all keys are given
if len(section) != len(ref_keys) + len(comment_keys): # pragma: no cover
tpl = "Missing entries in section {}; only found: {}"
msg = tpl.format(section_name, list(sorted(section)))
raise ParseIsatabException(msg) # TODO: should be warning?
return section, comment_keys
def _read_ontology_source_reference(self) -> Iterator[models.OntologyRef]:
# Read ONTOLOGY SOURCE REFERENCE header
line = self._read_next_line()
if not line[0] == investigation_headers.ONTOLOGY_SOURCE_REFERENCE: # pragma: no cover
tpl = "Expected {} but got {}"
msg = tpl.format(investigation_headers.ONTOLOGY_SOURCE_REFERENCE, line)
raise ParseIsatabException(msg)
# Read the other four lines in this section.
section, comment_keys = self._read_multi_column_section(
"Term Source",
investigation_headers.ONTOLOGY_SOURCE_REF_KEYS,
investigation_headers.ONTOLOGY_SOURCE_REFERENCE,
)
# Create resulting objects
columns = zip(*(section[k] for k in investigation_headers.ONTOLOGY_SOURCE_REF_KEYS))
for i, (name, file_, version, desc) in enumerate(columns):
comments = _parse_comments(section, comment_keys, i)
# If ontology source is empty, skip it
# (since ISAcreator always adds a last empty ontology column)
if not any((name, file_, version, desc, any(comments))):
tpl = "Skipping empty ontology source: {}, {}, {}, {}"
msg = tpl.format(name, file_, version, desc)
warnings.warn(msg, ParseIsatabWarning)
continue
yield models.OntologyRef(name, file_, version, desc, comments, list(section.keys()))
def _read_basic_info(self) -> models.BasicInfo:
# Read INVESTIGATION header
line = self._read_next_line()
if not line[0] == investigation_headers.INVESTIGATION: # pragma: no cover
tpl = "Expected {} but got {}"
msg = tpl.format(investigation_headers.INVESTIGATION, line)
raise ParseIsatabException(msg)
# Read the other lines in this section.
section, comment_keys = self._read_single_column_section(
"Investigation",
investigation_headers.INVESTIGATION_INFO_KEYS,
investigation_headers.INVESTIGATION,
)
# Create resulting object
# TODO: do we really need the name of the investigation file?
comments = _parse_comments(section, comment_keys)
return models.BasicInfo(
Path(os.path.basename(self._filename)),
section[investigation_headers.INVESTIGATION_IDENTIFIER],
section[investigation_headers.INVESTIGATION_TITLE],
section[investigation_headers.INVESTIGATION_DESCRIPTION],
_parse_date(section[investigation_headers.INVESTIGATION_SUBMISSION_DATE]),
_parse_date(section[investigation_headers.INVESTIGATION_PUBLIC_RELEASE_DATE]),
comments,
list(section.keys()),
)
def _read_publications(self) -> Iterator[models.PublicationInfo]:
# Read INVESTIGATION PUBLICATIONS header
line = self._read_next_line()
if not line[0] == investigation_headers.INVESTIGATION_PUBLICATIONS: # pragma: no cover
tpl = "Expected {} but got {}"
msg = tpl.format(investigation_headers.INVESTIGATION_PUBLICATIONS, line)
raise ParseIsatabException(msg)
# Read the other lines in this section.
section, comment_keys = self._read_multi_column_section(
"Investigation Pub",
investigation_headers.INVESTIGATION_PUBLICATIONS_KEYS,
investigation_headers.INVESTIGATION_PUBLICATIONS,
)
# Create resulting objects
columns = zip(*(section[k] for k in investigation_headers.INVESTIGATION_PUBLICATIONS_KEYS))
for (
i,
(pubmed_id, doi, authors, title, status_term, status_term_acc, status_term_src),
) in enumerate(columns):
status = models.OntologyTermRef(status_term, status_term_acc, status_term_src)
comments = _parse_comments(section, comment_keys, i)
yield models.PublicationInfo(
pubmed_id, doi, authors, title, status, comments, list(section.keys())
)
def _read_contacts(self) -> Iterator[models.ContactInfo]:
# Read INVESTIGATION CONTACTS header
line = self._read_next_line()
if not line[0] == investigation_headers.INVESTIGATION_CONTACTS: # pragma: no cover
tpl = "Expected {} but got {}"
msg = tpl.format(investigation_headers.INVESTIGATION_CONTACTS, line)
raise ParseIsatabException(msg)
# Read the other lines in this section.
section, comment_keys = self._read_multi_column_section(
"Investigation Person",
investigation_headers.INVESTIGATION_CONTACTS_KEYS,
investigation_headers.INVESTIGATION_CONTACTS,
)
# Create resulting objects
columns = zip(*(section[k] for k in investigation_headers.INVESTIGATION_CONTACTS_KEYS))
for (
i,
(
last_name,
first_name,
mid_initial,
email,
phone,
fax,
address,
affiliation,
role_term,
role_term_acc,
role_term_src,
),
) in enumerate(columns):
role = models.OntologyTermRef(role_term, role_term_acc, role_term_src)
comments = _parse_comments(section, comment_keys, i)
yield models.ContactInfo(
last_name,
first_name,
mid_initial,
email,
phone,
fax,
address,
affiliation,
role,
comments,
list(section.keys()),
)
def _read_studies(self) -> Iterator[models.StudyInfo]:
while self._line:
# Read STUDY header
line = self._read_next_line()
if not line[0] == investigation_headers.STUDY: # pragma: no cover
tpl = "Expected {} but got {}"
msg = tpl.format(investigation_headers.INVESTIGATION, line)
raise ParseIsatabException(msg)
# Read the other lines in this section.
section, comment_keys = self._read_single_column_section(
"Study", investigation_headers.STUDY_INFO_KEYS, investigation_headers.STUDY
)
# From this, parse the basic information from the study
comments = _parse_comments(section, comment_keys)
basic_info = models.BasicInfo(
Path(section[investigation_headers.STUDY_FILE_NAME])
if section[investigation_headers.STUDY_FILE_NAME]
else None,
section[investigation_headers.STUDY_IDENTIFIER],
section[investigation_headers.STUDY_TITLE],
section[investigation_headers.STUDY_DESCRIPTION],
_parse_date(section[investigation_headers.STUDY_SUBMISSION_DATE]),
_parse_date(section[investigation_headers.STUDY_PUBLIC_RELEASE_DATE]),
comments,
list(section.keys()),
)
# Read the remaining sections for this study in fixed order
# (though the study specification says the "order MAY vary", the overall investigation
# specification demands that "section headings MUST appear in the Investigation file
# (in order)", which we perceive as higher priority.)
design_descriptors = tuple(self._read_study_design_descriptors())
publications = tuple(self._read_study_publications())
factors = {f.name: f for f in self._read_study_factors()}
assays = tuple(self._read_study_assays())
protocols = {p.name: p for p in self._read_study_protocols()}
contacts = tuple(self._read_study_contacts())
# Create study object
yield models.StudyInfo(
basic_info, design_descriptors, publications, factors, assays, protocols, contacts
)
def _read_study_design_descriptors(self) -> Iterator[models.FreeTextOrTermRef]:
# Read STUDY DESIGN DESCRIPTORS header
line = self._read_next_line()
if not line[0] == investigation_headers.STUDY_DESIGN_DESCRIPTORS: # pragma: no cover
tpl = "Expected {} but got {}"
msg = tpl.format(investigation_headers.STUDY_DESIGN_DESCRIPTORS, line)
raise ParseIsatabException(msg)
# Read the other lines in this section.
section, comment_keys = self._read_multi_column_section(
"Study Design",
investigation_headers.STUDY_DESIGN_DESCR_KEYS,
investigation_headers.STUDY_DESIGN_DESCRIPTORS,
)
# Create resulting objects
columns = zip(*(section[k] for k in investigation_headers.STUDY_DESIGN_DESCR_KEYS))
for i, (type_term, type_term_acc, type_term_src) in enumerate(columns):
otype = models.OntologyTermRef(type_term, type_term_acc, type_term_src)
comments = _parse_comments(section, comment_keys, i)
yield models.DesignDescriptorsInfo(otype, comments, list(section.keys()))
def _read_study_publications(self) -> Iterator[models.PublicationInfo]:
# Read STUDY PUBLICATIONS header
line = self._read_next_line()
if not line[0] == investigation_headers.STUDY_PUBLICATIONS: # pragma: no cover
tpl = "Expected {} but got {}"
msg = tpl.format(investigation_headers.STUDY_PUBLICATIONS, line)
raise ParseIsatabException(msg)
# Read the other lines in this section.
section, comment_keys = self._read_multi_column_section(
"Study Pub",
investigation_headers.STUDY_PUBLICATIONS_KEYS,
investigation_headers.STUDY_PUBLICATIONS,
)
# Create resulting objects
columns = zip(*(section[k] for k in investigation_headers.STUDY_PUBLICATIONS_KEYS))
for (
i,
(pubmed_id, doi, authors, title, status_term, status_term_acc, status_term_src),
) in enumerate(columns):
status = models.OntologyTermRef(status_term, status_term_acc, status_term_src)
comments = _parse_comments(section, comment_keys, i)
yield models.PublicationInfo(
pubmed_id, doi, authors, title, status, comments, list(section.keys())
)
def _read_study_factors(self) -> Iterator[models.FactorInfo]:
# Read STUDY FACTORS header
line = self._read_next_line()
if not line[0] == investigation_headers.STUDY_FACTORS: # pragma: no cover
tpl = "Expected {} but got {}"
msg = tpl.format(investigation_headers.STUDY_FACTORS, line)
raise ParseIsatabException(msg)
# Read the other lines in this section.
section, comment_keys = self._read_multi_column_section(
"Study Factor",
investigation_headers.STUDY_FACTORS_KEYS,
investigation_headers.STUDY_FACTORS,
)
# Create resulting objects
columns = zip(*(section[k] for k in investigation_headers.STUDY_FACTORS_KEYS))
for i, (name, type_term, type_term_acc, type_term_src) in enumerate(columns):
otype = models.OntologyTermRef(type_term, type_term_acc, type_term_src)
comments = _parse_comments(section, comment_keys, i)
yield models.FactorInfo(name, otype, comments, list(section.keys()))
def _read_study_assays(self) -> Iterator[models.AssayInfo]:
# Read STUDY ASSAYS header
line = self._read_next_line()
if not line[0] == investigation_headers.STUDY_ASSAYS: # pragma: no cover
tpl = "Expected {} but got {}"
msg = tpl.format(investigation_headers.STUDY_ASSAYS, line)
raise ParseIsatabException(msg)
# Read the other lines in this section.
section, comment_keys = self._read_multi_column_section(
"Study Assay",
investigation_headers.STUDY_ASSAYS_KEYS,
investigation_headers.STUDY_ASSAYS,
)
# Create resulting objects
columns = zip(*(section[k] for k in investigation_headers.STUDY_ASSAYS_KEYS))
for (
i,
(
file_,
meas_type,
meas_type_term_acc,
meas_type_term_src,
tech_type,
tech_type_term_acc,
tech_type_term_src,
tech_plat,
),
) in enumerate(columns):
if any(
(
file_,
meas_type,
meas_type_term_acc,
meas_type_term_src,
tech_type,
tech_type_term_acc,
tech_type_term_src,
tech_plat,
)
):
meas = models.OntologyTermRef(meas_type, meas_type_term_acc, meas_type_term_src)
tech = models.OntologyTermRef(tech_type, tech_type_term_acc, tech_type_term_src)
comments = _parse_comments(section, comment_keys, i)
yield models.AssayInfo(
meas,
tech,
tech_plat,
Path(file_) if file_ else None,
comments,
list(section.keys()),
)
# else, i.e. if all assay fields are empty --> Nothing
def _read_study_protocols(self) -> Iterator[models.ProtocolInfo]:
# Read STUDY PROTOCOLS header
line = self._read_next_line()
if not line[0] == investigation_headers.STUDY_PROTOCOLS: # pragma: no cover
tpl = "Expected {} but got {}"
msg = tpl.format(investigation_headers.STUDY_PROTOCOLS, line)
raise ParseIsatabException(msg)
# Read the other lines in this section.
section, comment_keys = self._read_multi_column_section(
"Study Protocol",
investigation_headers.STUDY_PROTOCOLS_KEYS,
investigation_headers.STUDY_PROTOCOLS,
)
# Create resulting objects
columns = zip(*(section[k] for k in investigation_headers.STUDY_PROTOCOLS_KEYS))
for (
i,
(
name,
type_term,
type_term_acc,
type_term_src,
description,
uri,
version,
para_names,
para_name_term_accs,
para_name_term_srcs,
comp_names,
comp_types,
comp_type_term_accs,
comp_type_term_srcs,
),
) in enumerate(columns):
if not name: # don't allow unnamed protocol columns # pragma: no cover
tpl = 'Expected protocol name in line {}; found: "{}"'
msg = tpl.format(investigation_headers.STUDY_PROTOCOL_NAME, name)
raise ParseIsatabException(msg)
type_ont = models.OntologyTermRef(type_term, type_term_acc, type_term_src)
paras = {
p.name if hasattr(p, "name") else p: p
for p in _split_study_protocols_parameters(
para_names, para_name_term_accs, para_name_term_srcs
)
}
comps = {
c.name: c
for c in _split_study_protocols_components(
comp_names, comp_types, comp_type_term_accs, comp_type_term_srcs
)
}
comments = _parse_comments(section, comment_keys, i)
yield models.ProtocolInfo(
name,
type_ont,
description,
uri,
version,
paras,
comps,
comments,
list(section.keys()),
)
def _read_study_contacts(self) -> Iterator[models.ContactInfo]:
# Read STUDY CONTACTS header
line = self._read_next_line()
if not line[0] == investigation_headers.STUDY_CONTACTS: # pragma: no cover
tpl = "Expected {} but got {}"
msg = tpl.format(investigation_headers.STUDY_CONTACTS, line)
raise ParseIsatabException(msg)
# Read the other lines in this section.
section, comment_keys = self._read_multi_column_section(
"Study Person",
investigation_headers.STUDY_CONTACTS_KEYS,
investigation_headers.STUDY_CONTACTS,
)
# Create resulting objects
columns = zip(*(section[k] for k in investigation_headers.STUDY_CONTACTS_KEYS))
for (
i,
(
last_name,
first_name,
mid_initial,
email,
phone,
fax,
address,
affiliation,
role_term,
role_term_acc,
role_term_src,
),
) in enumerate(columns):
role = models.OntologyTermRef(role_term, role_term_acc, role_term_src)
comments = _parse_comments(section, comment_keys, i)
yield models.ContactInfo(
last_name,
first_name,
mid_initial,
email,
phone,
fax,
address,
affiliation,
role,
comments,
list(section.keys()),
)