Step 5: Edit parser.py
This file implements a class whose purpose is parsing the data of a single file, then insert the parsed data in the relational database.
Explanation
The parser.py
file should include an implementation of a parser class (in our
example, that would be a class named InterproParser
). As previously stated,
this parser class is responsible for parsing the data of a single file and
inserting said data in the relational database.
The template includes the boilerplate for this class and, most importantly, a
placeholder for the to_dataframe
method. Even if you are dealing with a file
format that is not tabular, converting it to a DataFrame first can be awfully
helpful for parsing the data.
The parsomics
team recommends using the polars
library
for this (we already added it as a dependency in the Step 2: Initialize a
package section), but feel free to use
whichever you are most comfortable with (e.g. pandas
). Just make sure you
install whichever additional libraries you need with poetry add
, so they are
properly registered as dependencies.
The template also includes a placeholder for the parse
method, which should
pretty close to its final implementation already. The parse
method takes the
parsed DataFrame, converts it to a list of dictionaries (mappings
), and adds
them to the database in batches.
The parser.py
file is the heart and soul of your plugin! It is the file that
performs the crucial step of adapting the format of the files to the format
accepted by the relational database. It will likely be the file with the most
code in your plugin.
In the to_dataframe
method, you need to:
-
Use the right data types: ensure the values in each column have the right data types. For example, you should check that numeric values are not being interpreted as strings.
-
Tidy the data: make sure each row represents a single observation and each column represents a single property. You can read more about the concept of tidy data here.
-
Match the database schema: rename all the columns in your DataFrame to match the names of the properties of the
proteinannotationentry
table of the relational database. Columns that don't match anyproteinannotationentry
property can be dropped or included as key-value pairs in the JSONBdetails
property.ImportantConsider the
details
property a last resort. JSONB properties are more flexible, but you pay the price: they don't benefit from the standardization nor the performance optimizations that regular database properties have.As a reminder, this is the schema of the
proteinannotationentry
table:
You will also need to write helper methods that find the necessary foreign
keys. These are the foreign keys of the proteinannotationentry
table:
protein_key
: the protein referenced by each annotation (mandatory)file_key
: the file that contains each annotation (mandatory)source_key
: the source of each annotaiton (optional)
Your implementation of the method that looks for the source_key
by the
source's name should add a new source
entry to the database if no source
with that name is found.
It is a lot of stuff! Hopefully it will get clearer in the "Hands on" section.
Hands on
-
Open the
parser.py
file and remove the triple quotes. You should see this:import logging
import polars as pl
from parsomics_core.entities import ProteinAnnotationEntry, ProteinAnnotationFile
from parsomics_core.entities.workflow.source import (
Source,
SourceCreate,
SourceTransactions,
)
from parsomics_core.plugin_utils import search_protein_by_name
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from sqlmodel import Session
class InterproTsvParser(BaseModel):
file: ProteinAnnotationFile
assembly_key: int
tool_key: int
def to_dataframe(self) -> pl.DataFrame:
pass
def parse(self, engine) -> None:
df = self.to_dataframe()
mappings = df.to_dicts()
with Session(engine) as session:
try:
session.bulk_insert_mappings(ProteinAnnotationEntry, mappings)
session.commit()
logging.info(
f"Added Interpro entries from {self.file.path} to the database."
)
except IntegrityError as e:
logging.warning(
f"Failed to add Interpro entries from {self.file.path} to "
f"the database. Exception caught: {e}"
) -
Let's start with the
to_dataframe
method. First, define the types of the columns of the DataFrame:schema: dict[str, pl.PolarsDataType] = {
"protein_name": pl.String,
"sequence_hash": pl.String,
"sequence_length": pl.Int32,
"source_name": pl.String,
"signature_accession": pl.String,
"signature_description": pl.String,
"coord_start": pl.Int32,
"coord_stop": pl.Int32,
"score": pl.Float64,
"status": pl.String,
"date": pl.String,
"interpro_annotation_accession": pl.String,
"interpro_annotation_description": pl.String,
}ImportantThe spec of column types for
polars
DataFrames is called "schema". Bear in mind that this is completely separate from the schema of the relational database. It's just the same name for altogether different things. -
Read the data into the DataFrame using
pd.read_csv
. This is also where you can define which values should be considered null. In InterproScan's case, null entrie have a "-", so our code for reading the data ends up like this:df = pl.read_csv(
self.file.path,
separator="\t",
schema=schema,
infer_schema_length=0,
has_header=False,
null_values=["-"], # values that should be considered null
quote_char=None,
) -
Drop columns that don't match any property of the
proteinannotationentry
table.df = df.drop(
[
"sequence_hash",
"sequence_length",
"status",
"date",
]
) -
Rename remaining columns to match the names of the properties of the
proteinannotationentry
table.df = df.rename(
{
"signature_accession": "accession",
"signature_description": "description",
}
) -
Now, tackle the methods the methods that add the foreign keys to the DataFrame. The most complex among them is most certainly
_add_source_key_to_df
, since it not only looks for matching sources to obtain the neededsource_key
, but also adds a new source to the database if it doesn't find one.Since the
source
table typically only has a few dozen entries, we can avoid querying the database and drastically improve performance by "cashing" results in a dictionary. The implementatio below does that, and should serve as a reference for when you write your own plugins:def _add_source_key_to_df(self, engine, df) -> pl.DataFrame:
with Session(engine) as session:
# First, add all sources that are already in the database (and,
# thus, already have a primary key) to the dictionary that relates
# source name to primary key
sources_in_db = session.exec(select(Source)).all()
source_name_to_key = {source.name: source.key for source in sources_in_db}
# Then, iterate over the sources in the DataFrame and add them
# to the database if they are not present in the source_name_to_key
# dictionary. Add them to the dictionary once they have been added
# to the database and have a primary key
source_names_in_df = df.select(pl.col("source_name")).unique().to_series()
for source_name in source_names_in_df:
if source_name not in source_name_to_key:
source_create_model = SourceCreate(
name=source_name,
tool_key=self.tool_key,
)
with Session(engine) as session:
source_key = (
SourceTransactions()
.create(
session,
source_create_model,
)
.key
)
source_name_to_key[source_name] = source_key
# Finally, use source_name_to_key to add source_key to the DataFrame
df = df.with_columns(
source_key=pl.col("source_name").replace(
source_name_to_key,
default=None,
)
)
df = df.drop("source_name")
return df -
Implement the method that adds the
file_key
to the DataFrame. This is much easier compared to the previous one:def _add_file_key_to_df(self, df):
return df.with_columns(pl.lit(self.file.key).alias("file_key")) -
polars
DataFrame don't support values of certain data types. For example, you can't have a dictionary as a value within a DataFrame. Because of that, you will have to include properties likedetails
after converting the DataFrame to mappings.I chose to save only these two fields in
details
, though I could have just as easily have cosidered them to be separate annotations when I normalized the data.def _add_details_to_mappings(self, mappings):
for mapping in mappings:
mapping["details"] = {}
for k in [
"interpro_annotation_accession",
"interpro_annotation_description",
]:
mapping["details"][k] = mapping[k]
mapping.pop(k) -
Write the method that finds the
protein_key
for each annotation. For that, the functionsearch_protein_by_name
fromparsomics-core
plugin utils comes in handy.def _add_protein_key_to_mappings(self, mappings):
protein_name_to_key = {}
for mapping in mappings:
protein_name = mapping["protein_name"]
if protein_name not in protein_name_to_key:
protein_key = search_protein_by_name(protein_name, self.assembly_key)
protein_name_to_key[protein_name] = protein_key
protein_key = protein_name_to_key[protein_name]
mapping["protein_key"] = protein_key
mapping.pop("protein_name") -
At last, finish the implementation of the parse method:
def parse(self, engine) -> None:
df = self.to_dataframe()
df = self._add_source_key_to_df(engine, df)
df = self._add_file_key_to_df(df)
mappings = df.to_dicts()
self._add_details_to_mappings(mappings)
self._add_protein_key_to_mappings(mappings)
# Below unchanged... -
Stage
parser.py
and commit it.
Good news
Phew! That was a handful. Rest assured, it's smooth sailing from here on out.
The remaining files are processor.py
, populate.py
, plugin_initializer.py
,
and __init__.py
. All of them were most likely correctly generated by the
template. You will only need to remove the triple quotes and they will be good
to go.
Read the next four sections if you want to learn more about the role of these files in your plugin. Otherwise, feel free to skip to Step 10 - Format the codebase.
Result
parser.py
import logging
import polars as pl
from parsomics_core.entities import ProteinAnnotationEntry, ProteinAnnotationFile
from parsomics_core.entities.workflow.source import (
Source,
SourceCreate,
SourceTransactions,
)
from parsomics_core.plugin_utils import search_protein_by_name
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from sqlmodel import Session, select
class InterproTsvParser(BaseModel):
file: ProteinAnnotationFile
assembly_key: int
tool_key: int
def to_dataframe(self) -> pl.DataFrame:
schema: dict[str, pl.PolarsDataType] = {
"protein_name": pl.String,
"sequence_hash": pl.String,
"sequence_length": pl.Int32,
"source_name": pl.String,
"signature_accession": pl.String,
"signature_description": pl.String,
"coord_start": pl.Int32,
"coord_stop": pl.Int32,
"score": pl.Float64,
"status": pl.String,
"date": pl.String,
"interpro_annotation_accession": pl.String,
"interpro_annotation_description": pl.String,
}
df = pl.read_csv(
self.file.path,
separator="\t",
schema=schema,
infer_schema_length=0,
has_header=False,
null_values=["-"], # values that should be considered null
quote_char=None,
)
# Only consider annotations with status "T" for True
df = df.with_columns(pl.col("status").eq("T"))
df = df.filter(pl.col("status"))
# Convert the date string to a date object
df = df.with_columns(pl.col("date").str.to_date("%d-%m-%Y"))
# Drop columns that don't match any properties of the
# proteinannotationentry table
df = df.drop(
[
"sequence_hash",
"sequence_length",
"status",
"date",
]
)
# Rename columns to match the names of properties of the
# proteinannotationentry table
df = df.rename(
{
"signature_accession": "accession",
"signature_description": "description",
}
)
return df
def _add_source_key_to_df(self, engine, df) -> pl.DataFrame:
with Session(engine) as session:
# First, add all sources that are already in the database (and,
# thus, already have a primary key) to the dictionary that relates
# source name to primary key
sources_in_db = session.exec(select(Source)).all()
source_name_to_key = {source.name: source.key for source in sources_in_db}
# Then, iterate over the sources in the DataFrame and add them
# to the database if they are not present in the source_name_to_key
# dictionary. Add them to the dictionary once they have been added
# to the database and have a primary key
source_names_in_df = df.select(pl.col("source_name")).unique().to_series()
for source_name in source_names_in_df:
if source_name not in source_name_to_key:
source_create_model = SourceCreate(
name=source_name,
tool_key=self.tool_key,
)
with Session(engine) as session:
source_key = (
SourceTransactions()
.create(
session,
source_create_model,
)
.key
)
source_name_to_key[source_name] = source_key
# Finally, use source_name_to_key to add source_key to the DataFrame
df = df.with_columns(
source_key=pl.col("source_name").replace(
source_name_to_key,
default=None,
)
)
df = df.drop("source_name")
return df
def _add_file_key_to_df(self, df):
return df.with_columns(pl.lit(self.file.key).alias("file_key"))
def _add_details_to_mappings(self, mappings):
for mapping in mappings:
mapping["details"] = {}
for k in [
"interpro_annotation_accession",
"interpro_annotation_description",
]:
mapping["details"][k] = mapping[k]
mapping.pop(k)
def _add_protein_key_to_mappings(self, mappings):
protein_name_to_key = {}
for mapping in mappings:
protein_name = mapping["protein_name"]
if protein_name not in protein_name_to_key:
protein_key = search_protein_by_name(protein_name, self.assembly_key)
protein_name_to_key[protein_name] = protein_key
protein_key = protein_name_to_key[protein_name]
mapping["protein_key"] = protein_key
mapping.pop("protein_name")
def parse(self, engine) -> None:
df = self.to_dataframe()
df = self._add_source_key_to_df(engine, df)
df = self._add_file_key_to_df(df)
mappings = df.to_dicts()
self._add_details_to_mappings(mappings)
self._add_protein_key_to_mappings(mappings)
with Session(engine) as session:
try:
session.bulk_insert_mappings(ProteinAnnotationEntry, mappings)
session.commit()
logging.info(
f"Added Interpro entries from {self.file.path} to the database."
)
except IntegrityError as e:
logging.warning(
f"Failed to add Interpro entries from {self.file.path} to "
f"the database. Exception caught: {e}"
)