# -*- coding: utf-8 -*-
"""This module contains code for the parsing of assay and study files.
"""
from __future__ import generator_stop
import csv
from datetime import datetime
from pathlib import Path
from typing import List, TextIO
from ..constants import table_tokens
from ..constants import table_headers
from ..exceptions import ParseIsatabException
from .headers import ColumnHeader, StudyHeaderParser, AssayHeaderParser, LabeledColumnHeader
from .helpers import list_strip
from . import models
__author__ = "Manuel Holtgrewe <manuel.holtgrewe@bihealth.de>"
class _NodeBuilderBase:
"""Base class for Material and Process builder objects"""
#: Headers to use for naming
name_headers = None
#: Allowed ``column_type``s.
allowed_column_types = None
def __init__(
self, column_headers: List[ColumnHeader], filename: str, study_id: str, assay_id: str
):
#: The column descriptions to build ``Material`` from.
self.column_headers = column_headers
#: The "Protocol REF" header, if any
self.protocol_ref_header = None
#: The header to use for building names, if any
self.name_header = None
#: The headers for the characteristics
self.characteristic_headers = []
#: The headers for comments
self.comment_headers = []
#: The factor value headers
self.factor_value_headers = []
#: The parameter value headers
self.parameter_value_headers = []
#: The header for array design ref
self.array_design_ref = None
#: The header for array design ref
self.array_design_ref_header = None
#: The header for first and second dimension
self.first_dimension_header = None
self.second_dimension_header = None
#: The header for extract label type
self.extract_label_header = None
#: The header for material type
self.material_type_header = None
#: The header for the performer
self.performer_header = None
#: The header for the date
self.date_header = None
#: The header for the unit
self.unit_header = None
#: Current counter value
self.counter_value = 0
#: Assign column headers to their roles (properties above)
self._assign_column_headers()
#: Study and assay ids used for unique node naming
self.study_id = study_id
self.assay_id = assay_id
#: Original file name
self.filename = filename
def _next_counter(self):
"""Increment counter value and return"""
self.counter_value += 1
return self.counter_value
def _assign_column_headers(self): # noqa: C901
# Record the last column header that is a primary annotation (e.g.,
# "Characteristics[*]" is but "Term Source REF" is not.
prev = None
# Interpret the full sequence of column headers.
for header in self.column_headers:
if (
header.column_type not in self.name_headers
and header.column_type not in self.allowed_column_types
):
tpl = 'Invalid column type occured "{}" not in {}'
msg = tpl.format(header.column_type, self.allowed_column_types)
raise ParseIsatabException(msg)
# Most headers are not secondary, so make this the default state.
is_secondary = False
if header.column_type == table_headers.PROTOCOL_REF:
assert not self.protocol_ref_header
self.protocol_ref_header = header
elif header.column_type in self.name_headers:
assert not self.name_header
self.name_header = header
elif header.column_type == table_headers.CHARACTERISTICS:
self.characteristic_headers.append(header)
elif header.column_type == table_headers.COMMENT:
self.comment_headers.append(header)
elif header.column_type == table_headers.FACTOR_VALUE:
self.factor_value_headers.append(header)
elif header.column_type == table_headers.PARAMETER_VALUE:
self.parameter_value_headers.append(header)
elif header.column_type == table_headers.MATERIAL_TYPE:
if self.material_type_header: # pragma: no cover
self._raise_seen_before("Material Type", header.col_no)
else:
self.material_type_header = header
elif header.column_type == table_headers.ARRAY_DESIGN_REF:
if self.array_design_ref_header: # pragma: no cover
self._raise_seen_before("Array Design REF", header.col_no)
else:
self.array_design_ref_header = header
elif header.column_type == table_headers.FIRST_DIMENSION:
if self.first_dimension_header: # pragma: no cover
self._raise_seen_before("First Dimension", header.col_no)
else:
self.first_dimension_header = header
elif header.column_type == table_headers.SECOND_DIMENSION:
if self.second_dimension_header: # pragma: no cover
self._raise_seen_before("Second Dimension", header.col_no)
else:
self.second_dimension_header = header
elif header.column_type == table_headers.LABEL:
if self.extract_label_header: # pragma: no cover
self._raise_seen_before("Label", header.col_no)
else:
self.extract_label_header = header
elif header.column_type == table_headers.DATE:
if self.date_header: # pragma: no cover
self._raise_seen_before("Date", header.col_no)
else:
self.date_header = header
elif header.column_type == table_headers.PERFORMER:
if self.performer_header: # pragma: no cover
self._raise_seen_before("Performer", header.col_no)
else:
self.performer_header = header
elif header.column_type == table_headers.TERM_SOURCE_REF:
# Guard against misuse / errors
if not prev: # pragma: no cover
tpl = "No primary annotation to annotate with term in " "col {}"
elif prev.column_type not in (
# According to ISA-tab specs, Characteristics, Factor Values,
# Parameter Values and Units as well as the special cases First
# Dimension and Second Dimension may be annotated with
# ontologies. However, official examples and configurations also
# feature Label and Material Type with ontologies.
table_headers.CHARACTERISTICS,
# COMMENT, this one is unclear
table_headers.FACTOR_VALUE,
table_headers.FIRST_DIMENSION,
table_headers.MATERIAL_TYPE,
table_headers.LABEL,
table_headers.PARAMETER_VALUE,
table_headers.SECOND_DIMENSION,
table_headers.UNIT,
): # pragma: no cover
tpl = (
"Ontologies not supported for primary annotation "
"'{}' (in col {}).".format(prev.column_type, "{}")
)
elif prev.term_source_ref_header: # pragma: no cover
tpl = 'Seen "Term Source REF" header for same entity ' "in col {}"
else:
tpl = None
if tpl: # pragma: no cover
msg = tpl.format(header.col_no)
raise ParseIsatabException(msg)
else:
# The previous non-secondary header is annotated with an ontology term.
prev.term_source_ref_header = header
is_secondary = True
elif header.column_type == table_headers.UNIT:
if prev.unit_header or prev.column_type == table_headers.UNIT: # pragma: no cover
self._raise_seen_before("Unit", header.col_no)
else:
# The previous non-secondary header is annotated with a unit.
prev.unit_header = header
# Update is secondary flag or not
if not is_secondary:
prev = header
@staticmethod
def _raise_seen_before(name, col_no): # pragma: no cover
tpl = 'Seen "{}" header for same entity in col {}'
msg = tpl.format(name, col_no)
raise ParseIsatabException(msg)
def _build_complex(self, header, line, klass, allow_list=False):
"""Build a complex annotation (e.g., may have term reference or unit."""
# First, build the individual components
value = self._build_freetext_or_term_ref(header, line, allow_list=allow_list)
unit = self._build_freetext_or_term_ref(header.unit_header, line)
# Then, constructing ``klass`` is easy
return klass(header.label, value, unit)
def _build_freetext_or_term_ref(
self, header, line: List[str], allow_list=False
) -> models.FreeTextOrTermRef:
if not header:
return None
elif header.term_source_ref_header:
header2 = header.term_source_ref_header
name = line[header.col_no]
ontology_name = line[header2.col_no]
accession = line[header2.col_no + 1]
# If list is allowed, split strings and create several ontology term references
if allow_list:
name = self._token_with_escape(name)
ontology_name = self._token_with_escape(ontology_name)
accession = self._token_with_escape(accession)
# There must be one ontology_name and accession per name
if len(name) == len(ontology_name) and len(name) == len(accession):
term_refs = [
models.OntologyTermRef(n, a, o)
for n, a, o in zip(name, accession, ontology_name)
]
return term_refs
else: # pragma: no cover
tpl = (
"Irregular numbers of fields in ontology term columns"
"(i.e. ';'-separated fields): {}"
)
msg = tpl.format(line[header.col_no : header2.col_no + 2])
raise ParseIsatabException(msg)
# Else, just create single ontology term references
else:
return models.OntologyTermRef(name, accession, ontology_name)
else:
if allow_list:
return self._token_with_escape(line[header.col_no])
return line[header.col_no]
def _build_simple_headers_list(self) -> List[str]:
return [h for headers in self.column_headers for h in headers.get_simple_string()]
@staticmethod
def _token_with_escape(string, escape="\\", separator=";"):
# Source: https://rosettacode.org/wiki/Tokenize_a_string_with_escaping#Python
result = []
segment = ""
state = 0
for c in string:
if state == 0:
if c == escape:
state = 1
elif c == separator:
result.append(segment)
segment = ""
else:
segment += c
elif state == 1:
segment += c
state = 0
result.append(segment)
return result
class _MaterialBuilder(_NodeBuilderBase):
"""Helper class to construct a ``Material`` object from a line"""
name_headers = table_headers.MATERIAL_NAME_HEADERS
allowed_column_types = (
# Primary annotations (not parametrized)
table_headers.MATERIAL_TYPE,
# Primary annotations (parametrized)
table_headers.CHARACTERISTICS,
table_headers.COMMENT,
table_headers.FACTOR_VALUE,
# Secondary annotations
table_headers.LABEL,
table_headers.TERM_SOURCE_REF,
table_headers.UNIT,
)
def build(self, line: List[str]) -> models.Material:
"""Build and return ``Material`` from TSV file line."""
counter_value = self._next_counter()
# First, build the individual components
assert self.name_header or self.protocol_ref_header
type_ = self.name_header.column_type
assay_id = "-{}".format(self.assay_id) if self.assay_id else ""
name = line[self.name_header.col_no]
if name:
# make material/data names unique by column
if self.name_header.column_type == table_headers.SOURCE_NAME:
unique_name = "{}-{}-{}".format(self.study_id, "source", name)
elif self.name_header.column_type == table_headers.SAMPLE_NAME:
# use static column identifier "sample-", since the same
# samples occur in different columns in study and assay
unique_name = "{}-{}-{}".format(self.study_id, "sample", name)
else:
# anything else gets the column id
unique_name = "{}{}-{}-COL{}".format(
self.study_id, assay_id, name, self.name_header.col_no + 1
)
else:
name_val = "{}{}-{} {}-{}-{}".format(
self.study_id,
assay_id,
table_tokens.TOKEN_EMPTY,
self.name_header.column_type,
self.name_header.col_no + 1,
counter_value,
)
unique_name = models.AnnotatedStr(name_val, was_empty=True)
extract_label = self._build_freetext_or_term_ref(self.extract_label_header, line)
characteristics = tuple(
self._build_complex(hdr, line, models.Characteristics, allow_list=True)
for hdr in self.characteristic_headers
)
comments = tuple(
models.Comment(hdr.label, line[hdr.col_no]) for hdr in self.comment_headers
)
factor_values = tuple(
self._build_complex(hdr, line, models.FactorValue) for hdr in self.factor_value_headers
)
material_type = self._build_freetext_or_term_ref(self.material_type_header, line)
# Then, constructing ``Material`` is easy
return models.Material(
type_,
unique_name,
name,
extract_label,
characteristics,
comments,
factor_values,
material_type,
self._build_simple_headers_list(),
)
class _ProcessBuilder(_NodeBuilderBase):
"""Helper class to construct ``Process`` objects."""
name_headers = table_headers.PROCESS_NAME_HEADERS
allowed_column_types = (
table_headers.PROTOCOL_REF,
# Primary annotations (not parametrized)
table_headers.PERFORMER,
table_headers.DATE,
# Special annotations (not parametrized)
table_headers.ARRAY_DESIGN_REF,
table_headers.FIRST_DIMENSION,
table_headers.SECOND_DIMENSION,
# Primary annotations (parametrized)
table_headers.PARAMETER_VALUE,
table_headers.COMMENT,
# Secondary annotations
table_headers.TERM_SOURCE_REF,
table_headers.UNIT,
)
def build(self, line: List[str]) -> models.Process:
"""Build and return ``Process`` from CSV file."""
# First, build the individual attributes of ``Process``
protocol_ref, unique_name, name, name_type = self._build_protocol_ref_and_name(line)
if self.date_header:
if line[self.date_header.col_no]:
try:
date = datetime.strptime(line[self.date_header.col_no], "%Y-%m-%d").date()
except ValueError as e: # pragma: no cover
tpl = 'Invalid ISO8601 date # pragma: no cover "{}"'
msg = tpl.format(line[self.date_header.col_no])
raise ParseIsatabException(msg) from e
else:
date = ""
else:
date = None
if self.performer_header:
performer = line[self.performer_header.col_no]
else:
performer = None
comments = tuple(
models.Comment(hdr.label, line[hdr.col_no]) for hdr in self.comment_headers
)
parameter_values = tuple(
self._build_complex(hdr, line, models.ParameterValue, allow_list=True)
for hdr in self.parameter_value_headers
)
# Check for special case annotations
array_design_ref = (
line[self.array_design_ref_header.col_no] if self.array_design_ref_header else None
)
first_dimension = self._build_freetext_or_term_ref(self.first_dimension_header, line)
second_dimension = self._build_freetext_or_term_ref(self.second_dimension_header, line)
# Then, constructing ``Process`` is easy
return models.Process(
protocol_ref,
unique_name,
name,
name_type,
date,
performer,
parameter_values,
comments,
array_design_ref,
first_dimension,
second_dimension,
self._build_simple_headers_list(),
)
def _build_protocol_ref_and_name(self, line: List[str]):
# At least one of these headers has to be specified
assert self.name_header or self.protocol_ref_header
# Perform case distinction on which case is actually true
counter_value = self._next_counter()
assay_id = "-{}".format(self.assay_id) if self.assay_id else ""
name = None
name_type = None
if not self.name_header:
# Name header is not given, will use auto-generated unique name
# based on protocol ref.
protocol_ref = line[self.protocol_ref_header.col_no]
name_val = "{}{}-{}-{}-{}".format(
self.study_id,
assay_id,
protocol_ref,
self.protocol_ref_header.col_no + 1,
counter_value,
)
unique_name = models.AnnotatedStr(name_val, was_empty=True)
elif not self.protocol_ref_header:
# Name header is given, but protocol ref header is not
protocol_ref = table_tokens.TOKEN_UNKNOWN
name = line[self.name_header.col_no]
name_type = self.name_header.column_type
if name: # Use name if available
unique_name = "{}{}-{}-{}".format(
self.study_id, assay_id, name, self.name_header.col_no + 1
)
else: # Empty! # pragma: no cover
name_val = "{}{}-{} {}-{}-{}".format(
self.study_id,
assay_id,
table_tokens.TOKEN_ANONYMOUS,
self.name_header.column_type.replace(" Name", ""),
self.name_header.col_no + 1,
counter_value,
)
unique_name = models.AnnotatedStr(name_val, was_empty=True)
else: # Both header are given
protocol_ref = line[self.protocol_ref_header.col_no]
name = line[self.name_header.col_no]
name_type = self.name_header.column_type
if name:
unique_name = "{}{}-{}-{}".format(
self.study_id, assay_id, name, self.name_header.col_no + 1
)
else:
name_val = "{}{}-{}-{}-{}".format(
self.study_id,
assay_id,
protocol_ref,
self.protocol_ref_header.col_no + 1,
counter_value,
)
unique_name = models.AnnotatedStr(name_val, was_empty=True)
if not protocol_ref: # pragma: no cover
tpl = "Missing protocol reference in column {} of file {} "
msg = tpl.format(self.protocol_ref_header.col_no + 1, self.filename)
raise ParseIsatabException(msg)
return protocol_ref, unique_name, name, name_type
class _RowBuilderBase:
"""Base class for row builders from study and assay files"""
#: Registry of column header to node builder
node_builders = None
def __init__(
self, header: List[ColumnHeader], filename: str, study_id: str, assay_id: str = None
):
self.header = header
self.filename = filename
self.study_id = study_id
self.assay_id = assay_id
self._builders = list(self._make_builders())
def _make_builders(self):
"""Construct the builder objects for the objects"""
breaks = list(self._make_breaks())
for start, end in zip(breaks, breaks[1:]):
self._intercept_duplicates(start, end)
klass = self.node_builders[self.header[start].column_type]
yield klass(self.header[start:end], self.filename, self.study_id, self.assay_id)
def _intercept_duplicates(self, start, end):
"""Check for duplicate primary annotations per node/builder
I.e. for duplicated Characteristics, Parameter Values, Comments, Factor Values, ...
"""
column_types_to_check = [
table_headers.CHARACTERISTICS,
table_headers.COMMENT,
table_headers.FACTOR_VALUE,
table_headers.PARAMETER_VALUE,
table_headers.DATE,
table_headers.LABEL,
table_headers.MATERIAL_TYPE,
table_headers.PERFORMER,
table_headers.ARRAY_DESIGN_REF,
table_headers.FIRST_DIMENSION,
table_headers.SECOND_DIMENSION,
]
header = [h for h in self.header[start:end] if h.column_type in column_types_to_check]
names = [
"{}[{}]".format(h.column_type, h.label)
if isinstance(h, LabeledColumnHeader)
else h.column_type
for h in header
]
duplicates = set([c for c in names if names.count(c) > 1])
if duplicates:
assay = " assay {}".format(self.assay_id) if self.assay_id else ""
tpl = "Found duplicated column types in header of study {}{}: {}"
msg = tpl.format(self.study_id, assay, ", ".join(duplicates))
raise ParseIsatabException(msg)
def _make_breaks(self):
"""Build indices to break the columns at
Life would be simpler if ISA-Tab would require a "Protocol REF"
before generic or specialized assay names (e.g., "Assay Name" or
"MS Assay Name") or at least define what happens if we see
("Protocol REF", "Assay Name", "Assay Name").
Our interpretation is that in the case above the first "Assay Name"
further qualifies (=annotates) the "Protocol REF") and the second
leads to an implicit "Protocol REF" creation with all cell values
set to "unknown". This somewhat emulates what the official ISA-Tab
API does.
"""
# Record whether we have seen a "Protocol REF" but no "Assay Name".
noname_protocol_ref = False
for i, col_hdr in enumerate(self.header):
if col_hdr.column_type in table_headers.MATERIAL_NAME_HEADERS:
noname_protocol_ref = False
yield i
elif col_hdr.column_type in self.node_builders:
# Column type has an associated node builder, can be
# "Protocol REF", an annotating assay name, or implicitely
# start a new process node.
if col_hdr.column_type == "Protocol REF":
noname_protocol_ref = True
yield i
else:
if not noname_protocol_ref:
# This one does not annotate a previous "Protocol
# REF" because we have already seen a name (it
# does not matter whether standalone or giving a
# name to a "Protocol REF").
yield i
# In any case, we have seen a name for a protocol now
noname_protocol_ref = False
yield len(self.header) # index to end of list
def build(self, line):
return [b.build(line) for b in self._builders]
class _StudyRowBuilder(_RowBuilderBase):
"""Build a row from an ISA-TAB study file."""
node_builders = {
# Material node builders
table_headers.SOURCE_NAME: _MaterialBuilder,
table_headers.SAMPLE_NAME: _MaterialBuilder,
# Process node builders
table_headers.PROTOCOL_REF: _ProcessBuilder,
}
class _AssayRowBuilder(_RowBuilderBase):
"""Build a row from an ISA-TAB assay file."""
node_builders = {
# Material node builders
table_headers.SAMPLE_NAME: _MaterialBuilder,
table_headers.EXTRACT_NAME: _MaterialBuilder,
table_headers.LABELED_EXTRACT_NAME: _MaterialBuilder,
table_headers.LIBRARY_NAME: _MaterialBuilder,
# Data node builders
table_headers.ARRAY_DATA_FILE: _MaterialBuilder,
table_headers.ARRAY_DATA_MATRIX_FILE: _MaterialBuilder,
table_headers.ARRAY_DESIGN_FILE: _MaterialBuilder,
table_headers.DERIVED_ARRAY_DATA_FILE: _MaterialBuilder,
table_headers.DERIVED_ARRAY_DATA_MATRIX_FILE: _MaterialBuilder,
table_headers.DERIVED_DATA_FILE: _MaterialBuilder,
table_headers.DERIVED_SPECTRAL_DATA_FILE: _MaterialBuilder,
table_headers.IMAGE_FILE: _MaterialBuilder,
table_headers.METABOLITE_ASSIGNMENT_FILE: _MaterialBuilder,
table_headers.PEPTIDE_ASSIGNMENT_FILE: _MaterialBuilder,
table_headers.POST_TRANSLATIONAL_MODIFICATION_ASSIGNMENT_FILE: _MaterialBuilder,
table_headers.PROTEIN_ASSIGNMENT_FILE: _MaterialBuilder,
table_headers.RAW_DATA_FILE: _MaterialBuilder,
table_headers.RAW_SPECTRAL_DATA_FILE: _MaterialBuilder,
table_headers.SPOT_PICKING_FILE: _MaterialBuilder,
# Process node builders
table_headers.ASSAY_NAME: _ProcessBuilder,
table_headers.DATA_TRANSFORMATION_NAME: _ProcessBuilder,
table_headers.GEL_ELECTROPHORESIS_ASSAY_NAME: _ProcessBuilder,
table_headers.HYBRIDIZATION_ASSAY_NAME: _ProcessBuilder,
table_headers.MS_ASSAY_NAME: _ProcessBuilder,
table_headers.NORMALIZATION_NAME: _ProcessBuilder,
table_headers.PROTOCOL_REF: _ProcessBuilder,
table_headers.SCAN_NAME: _ProcessBuilder,
}
class _AssayAndStudyBuilder:
"""Helper for building ``Assay`` and ``Study`` objects."""
def __init__(self, file_name, header, klass):
self.file_name = file_name
self.header = header
self.klass = klass
def build(self, rows):
return self._construct(self._postprocess_rows(rows))
def _postprocess_rows(self, rows):
"""Postprocess the ``rows``.
Right now we are looking for nodes (material of process) without an
original name (which would be equivalent to unique_name being an
``AnnotatedString`` and having the ``was_empty`` attribute set to
``True``) and their previous and next nodes with an original name. We
then assign the same unique names for all where the unique names of the
previous and next nodes with an original name is the same.
It is yet unclear whether this postprocessing is sufficient but this is
the place to build upon the postprocessing for further refinement.
"""
node_context = {}
for row in rows:
for idx, entry in enumerate(row):
# Skip first entry
if idx == 0:
continue
# Process nodes without an original name
if not entry.name:
# Find previous originally named node
ext = 0
while not row[idx - ext - 1].name:
ext += 1
start_entry = row[idx - ext - 1].unique_name
# Find next originally named node
# (may stay None, if a bubble is not closed at the end)
end_entry = None
ext = 0
while idx + ext + 1 < len(row) and not row[idx + ext + 1].name:
ext += 1
if idx + ext + 1 < len(row) and row[idx + ext + 1].name:
end_entry = row[idx + ext + 1].unique_name
# Compare idx, start and end with previous rows
# and perform the change if appropriate
key = (idx, start_entry, end_entry)
if key in node_context:
# TODO: complain if annotations differ?
row[idx] = node_context[key]
else:
node_context[key] = row[idx]
return rows
def _construct(self, rows):
"""Construct the ``Assay`` or ``Study`` object."""
materials = {}
processes = {}
arcs = []
arc_set = set()
for row in rows:
for i, entry in enumerate(row):
# Collect processes and materials
if isinstance(entry, models.Process):
if (
entry.unique_name in processes and entry != processes[entry.unique_name]
): # pragma: no cover
tpl = (
"Found processes with same name but different "
"annotation:\nprocess 1: {}\nprocess 2: {}"
)
msg = tpl.format(entry, processes[entry.unique_name])
raise ParseIsatabException(msg)
processes[entry.unique_name] = entry
else:
assert isinstance(entry, models.Material)
if (
entry.unique_name in materials and entry != materials[entry.unique_name]
): # pragma: no cover
tpl = (
"Found materials with same name but different "
"annotation:\nmaterial 1: {}\nmaterial 2: {}"
)
msg = tpl.format(entry, materials[entry.unique_name])
raise ParseIsatabException(msg)
materials[entry.unique_name] = entry
# Collect arc
if i > 0:
arc = models.Arc(row[i - 1].unique_name, row[i].unique_name)
if arc not in arc_set:
arc_set.add(arc)
arcs.append(arc)
return self.klass(Path(self.file_name), self.header, materials, processes, tuple(arcs))
[docs]class StudyRowReader:
"""Read an ISA-TAB study file (``s_*.txt``) into a tabular/object
representation.
This is a more low-level part of the interface. Please prefer
using :py:StudyReader: over using this class.
:type study_id: str
:param study_id: Unique identifier for the study, needed to disambiguate nodes between files.
:type input_file: TextIO
:param input_file: ISA-Tab study file
"""
[docs] @classmethod
def from_stream(klass, study_id: str, input_file: TextIO, filename: str = None):
"""Construct from file-like object"""
return StudyRowReader(study_id, input_file, filename)
def __init__(self, study_id: str, input_file: TextIO, filename: str):
self.study_id = study_id
self.input_file = input_file
self.filename = filename or getattr(input_file, "name", "<no file>")
self.unique_rows = set()
self.duplicate_rows = []
self._reader = csv.reader(input_file, delimiter="\t", quotechar='"')
self._line = None
self._read_next_line()
self.header = self._read_header()
def _read_header(self):
"""Read first line with header"""
try:
line = self._read_next_line()
except StopIteration as e: # pragma: no cover
msg = "Study file has no header!"
raise ParseIsatabException(msg) from e
return list(StudyHeaderParser(line).run())
def _read_next_line(self):
"""Read next line, skipping comments starting with ``'#'``."""
prev_line = self._line
try:
self._line = list_strip(next(self._reader))
while self._line is not None and (not self._line or self._line[0].startswith("#")):
self._line = list_strip(next(self._reader))
# Test and collect row duplicates
if "\t".join(self._line) in self.unique_rows:
self.duplicate_rows.append("\t".join(self._line))
else:
self.unique_rows.add("\t".join(self._line))
except StopIteration:
self._line = None
return prev_line
[docs] def read(self):
"""
Read the study rows
:returns: Nodes per row of the study file
"""
builder = _StudyRowBuilder(self.header, self.filename, self.study_id)
while True:
line = self._read_next_line()
if line:
yield builder.build(line)
else:
break
# Check if duplicated rows exist
if self.duplicate_rows:
lines = "\n{}" * len(self.duplicate_rows)
tpl = "Found duplicated rows in study {}:{}"
msg = tpl.format(self.study_id, lines.format(*self.duplicate_rows))
raise ParseIsatabException(msg)
[docs]class StudyReader:
"""Read an ISA-TAB study file (``s_*.txt``) into a ``Study`` object.
This is the main facade class for reading study objects. Prefer it
over using the more low-level code.
:type study_id: str
:param study_id: Unique identifier for the study, needed to disambiguate nodes between files.
:type input_file: TextIO
:param input_file: ISA-Tab study file
"""
[docs] @classmethod
def from_stream(klass, study_id: str, input_file: TextIO, filename=None):
"""Construct from file-like object"""
return StudyReader(study_id, input_file, filename)
def __init__(self, study_id: str, input_file: TextIO, filename=None):
self.row_reader = StudyRowReader.from_stream(study_id, input_file, filename)
# The file used for reading from
self.input_file = input_file
# A file name override
self._filename = filename or getattr(input_file, "name", "<no file>")
# The header of the ISA study file
self.header = self.row_reader.header
[docs] def read(self):
"""
Parse the study file
:rtype: models.Study
:returns: Study model including graph of material and process nodes
"""
study_data = _AssayAndStudyBuilder(self._filename, self.header, models.Study).build(
list(self.row_reader.read())
)
return study_data
# TODO: extract common parts of {Assay,Study}[Row]Reader into two base classes
[docs]class AssayRowReader:
"""Read an ISA-TAB assay file (``a_*.txt``) into a tabular/object
representation.
This is a more low-level part of the interface. Please prefer
using :py:AssayReader: over using this class.
:type study_id: str
:param study_id: Unique identifier for the study, needed to disambiguate nodes between files.
:type assay_id: str
:param assay_id: Unique identifier for the assay, needed to disambiguate nodes between files.
:type input_file: TextIO
:param input_file: ISA-Tab assay file
"""
[docs] @classmethod
def from_stream(klass, study_id: str, assay_id: str, input_file: TextIO, filename: str = None):
"""Construct from file-like object"""
return AssayRowReader(study_id, assay_id, input_file, filename)
def __init__(self, study_id: str, assay_id: str, input_file: TextIO, filename: str):
self.study_id = study_id
self.assay_id = assay_id
self.input_file = input_file
self.filename = filename or getattr(input_file, "name", "<no file>")
self.unique_rows = set()
self.duplicate_rows = []
self._reader = csv.reader(input_file, delimiter="\t", quotechar='"')
self._line = None
self._read_next_line()
self.header = self._read_header()
def _read_header(self):
"""Read first line with header"""
try:
line = self._read_next_line()
except StopIteration as e: # pragma: no cover
msg = "Assay file has no header!"
raise ParseIsatabException(msg) from e
return list(AssayHeaderParser(line).run())
def _read_next_line(self):
"""Read next line, skipping comments starting with ``'#'``."""
prev_line = self._line
try:
self._line = list_strip(next(self._reader))
while self._line is not None and (not self._line or self._line[0].startswith("#")):
self._line = list_strip(next(self._reader))
# Test and collect row duplicates
if "\t".join(self._line) in self.unique_rows:
self.duplicate_rows.append("\t".join(self._line))
else:
self.unique_rows.add("\t".join(self._line))
except StopIteration:
self._line = None
return prev_line
[docs] def read(self):
"""
Read assays rows
:return: Nodes per row of the assay file
"""
builder = _AssayRowBuilder(self.header, self.filename, self.study_id, self.assay_id)
while True:
line = self._read_next_line()
if line:
yield builder.build(line)
else:
break
# Check if duplicated rows exist
if self.duplicate_rows:
lines = "\n{}" * len(self.duplicate_rows)
tpl = "Found duplicated rows in assay {} of study {}:{}"
msg = tpl.format(self.assay_id, self.study_id, lines.format(*self.duplicate_rows))
raise ParseIsatabException(msg)
[docs]class AssayReader:
"""Read an ISA-TAB assay file (``a_*.txt``) into a ``Assay`` object.
This is the main facade class for reading assay objects. Prefer it
over using the more low-level code.
:type study_id: str
:param study_id: Unique identifier for the study, needed to disambiguate nodes between files.
:type assay_id: str
:param assay_id: Unique identifier for the assay, needed to disambiguate nodes between files.
:type input_file: TextIO
:param input_file: ISA-Tab assay file
"""
[docs] @classmethod
def from_stream(klass, study_id: str, assay_id: str, input_file: TextIO, filename=None):
"""Construct from file-like object"""
return AssayReader(study_id, assay_id, input_file, filename)
def __init__(self, study_id: str, assay_id: str, input_file: TextIO, filename=None):
self.row_reader = AssayRowReader.from_stream(study_id, assay_id, input_file, filename)
# The file used for reading from
self.input_file = input_file
self._filename = filename or getattr(input_file, "name", "<no file>")
# The header of the ISA assay file
self.header = self.row_reader.header
[docs] def read(self):
"""
Parse the assay file
:rtype: models.Assay
:returns: Assay model including graph of material and process nodes
"""
assay_data = _AssayAndStudyBuilder(self._filename, self.header, models.Assay).build(
list(self.row_reader.read())
)
return assay_data