"""
openva_pipeline.pipeline
------------------------
This module defines the primary API for the openVA Pipeline.
"""
import os
import csv
import datetime
import sys
from pandas import read_csv
from typing import Union, Dict
from .transfer_db import TransferDB
from .transfer_db import DatabaseConnectionError
from .odk import ODK
from .openva import OpenVA
from .dhis import DHIS
from .exceptions import PipelineError
[docs]class Pipeline:
"""Primary API for the openVA pipeline.
This class calls three others to move verbal autopsy data from an ODK
Aggregate server (using the ODK class), through the openVA R package to
assign cause of death (using the OpenVA class), and deposits the VA records
with assigned causes to either/both a DHIS server (using the DHIS class) or
the Transfer database -- a local database which also contains configuration
settings for the pipeline. The TransferDB class performs the final step of
storing the results locally as well as accessing the configuration
settings.
:parameter db_file_name: File name of the Transfer database.
:type db_file_name: str
:parameter db_directory: Path of folder containing the Transfer database.
:type db_directory: str
:parameter db_key: Encryption key for the Transfer database.
:type db_key: str
:parameter use_dhis: Indicator for telling pipeline to post records/tracked
entity instances to DHIS
:type use_dhis: bool
"""
def __init__(self,
db_file_name: str,
db_directory: str,
db_key: str,
use_dhis: bool = True):
self.db_file_name = db_file_name
self.db_directory = db_directory
self.db_key = db_key
self.db_path = os.path.join(db_directory, db_file_name)
now_date = datetime.datetime.now()
self.pipeline_run_date = now_date.strftime("%Y-%m-%d_%H:%M:%S")
self.use_dhis = use_dhis
self.xfer_db = TransferDB(
db_file_name=self.db_file_name,
db_directory=self.db_directory,
db_key=self.db_key,
pl_run_date=self.pipeline_run_date,
)
self.settings = None
self._config()
self.dhis = None
self.no_org_units = None
[docs] def log_event(self, event_desc, event_type):
"""Commit event or error message into EventLog table of transfer
database.
:parameter event_desc: Description of the event.
:type event_desc: string
:parameter event_type: Type of event (error or information)
:type event_type: string
"""
error_file = os.path.join(self.db_directory, "db_error_log.csv")
time_fmt = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
if not os.path.isfile(error_file):
try:
with open(error_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(
["Date"] + ["Description"] + ["Additional Information"]
)
except OSError as e:
print(str(e) + "...Can't create db_error_log.csv")
sys.exit(1)
try:
par = (event_desc, event_type, time_fmt)
self.xfer_db.insert_event_log(par)
except DatabaseConnectionError as e:
error_msg = [time_fmt, str(e), "Committed by Pipeline.log_event"]
try:
with open(error_file, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow(error_msg)
except (PermissionError, OSError) as exc:
print("Can't write to db_error_log.csv")
print(error_msg.append(str(exc)))
def _config(self) -> None:
"""Fetch configuration settings from Transfer DB.
This method queries the Transfer database (DB) and returns objects that
can be used as the arguments for other methods in this class, i.e.,
:meth:`Pipeline.run_odk() <run_odk>`,
:meth:`Pipeline.run_openva() <run_openva>`, and
:meth:`Pipeline.run_dhis() <run_dhis>`.
"""
settings_pipeline = self.xfer_db.config_pipeline()
settings_odk = self.xfer_db.config_odk()
settings_openva = self.xfer_db.config_openva(settings_pipeline.algorithm)
settings = {
"pipeline": settings_pipeline,
"odk": settings_odk,
"openva": settings_openva,
}
self.settings = settings
if self.use_dhis:
settings_dhis = self.xfer_db.config_dhis(settings_pipeline.algorithm)
settings["dhis"] = settings_dhis
[docs] def update_db(self):
"""Update transfer database created by previous version of the
pipeline."""
table_names = self._get_tables()
conn = self.xfer_db.connect_db()
c = conn.cursor()
if "VA_Org_Unit_Not_Found" not in table_names:
sql_make_table = (
"CREATE TABLE VA_Org_Unit_Not_Found "
"(id char(100) NOT NULL, "
"outcome char(100), "
"eventBlob blob, "
"evaBlob blob, "
"dhisOrgUnit char(500), "
"dateEntered date,"
"fixed char(5));"
)
c.execute(sql_make_table)
dhis_table = self._get_fields("DHIS_Conf")
dhis_fields = [entry[0] for entry in dhis_table]
if "dhisPostRoot" not in dhis_fields:
sql_make_field = "ALTER TABLE DHIS_Conf ADD dhisPostRoot char(5);"
c.execute(sql_make_field)
sql_fill_field = "UPDATE DHIS_Conf SET dhisPostRoot = 'False';"
c.execute(sql_fill_field)
odk_table = self._get_fields("ODK_Conf")
odk_fields = [entry[0] for entry in odk_table]
if "odkUseCentral" not in odk_fields:
sql_make_field = "ALTER TABLE ODK_Conf ADD odkUseCentral char(5);"
c.execute(sql_make_field)
sql_fill_field = "UPDATE ODK_Conf SET odkUseCentral = 'False';"
c.execute(sql_make_field)
if "odkProjectNumber" not in odk_fields:
sql_make_field = "ALTER TABLE ODK_Conf ADD odkProjectNumber char(6);"
c.execute(sql_make_field)
conn.close()
def _update_odk(self, field, value):
"""Update ODK_Conf.field(s) with value(s) in Transfer DB and
instance settings attribute."""
self.xfer_db.update_table("ODK_Conf", field, value)
self._config()
def _update_dhis(self, field, value):
"""Update dhis_Conf.field(s) with value(s) in Transfer DB and
instance settings attribute."""
self.xfer_db.update_table("DHIS_Conf", field, value)
self._config()
def _update_pipeline(self, field, value):
"""Update pipeline_Conf.field(s) with value(s) in Transfer DB and
instance settings attribute."""
self.xfer_db.update_table("Pipeline_Conf", field, value)
self._config()
def _get_odk_conf(self):
"""Get values from ODK_Conf table in Transfer DB."""
odk_conf_values = self.xfer_db.get_table_conf("ODK_Conf")
return odk_conf_values
def _get_dhis_conf(self):
"""Get values from dhis_Conf table in Transfer DB."""
dhis_conf_values = self.xfer_db.get_table_conf("DHIS_Conf")
return dhis_conf_values
def _get_pipeline_conf(self):
"""Get values from pipeline_Conf table in Transfer DB."""
pipeline_conf_values = self.xfer_db.get_table_conf("Pipeline_Conf")
return pipeline_conf_values
def _get_tables(self):
"""Get table names from Transfer DB."""
tables = self.xfer_db.get_tables()
return tables
def _get_fields(self, table):
"""Get field names from table in Transfer DB."""
fields = self.xfer_db.get_fields(table)
return fields
def _get_schema(self, table):
"""Get schema of table in Transfer DB."""
schema = self.xfer_db.get_schema(table)
return schema
def _get_log(self, n: int = 5, last: bool = True):
"""Get n rows from in EventLog table in Transfer DB."""
log_messages = self.xfer_db.get_event_log(n_messages=n,
recent=last)
return log_messages
[docs] def run_odk(self):
"""Run check duplicates, copy file, and briefcase.
This method downloads data from either (1) an ODK Central server,
using :meth:`ODK.central() <openva_pipeline.odk.ODK.central>`, or
(2) an ODK Aggregate server using the Java application ODK Briefcase,
by calling the method
:meth:`ODK.briefcase() <openva_pipeline.odk.ODK.briefcase>`. The
configuration settings are taken from the argument
argsODK (see :meth:`Pipeline.config() <config>`),
and downloads verbal autopsy (VA)
records as a (csv) export from an ODK Central/Aggregate server.
If there is a previous ODK export file, this method merges the files by
keeping only the unique VA records.
:return: Summary of results from ODK step
:rtype: tuple
"""
args_odk = self.settings["odk"]
pipeline_odk = ODK(self.settings)
pipeline_odk.merge_to_prev_export()
if args_odk.odk_use_central == "True":
odk_central = pipeline_odk.central()
else:
odk_bc = pipeline_odk.briefcase()
self.xfer_db.config_pipeline()
odk_summary = self.xfer_db.check_duplicates(self.use_dhis)
if args_odk.odk_use_central == "True":
return odk_central, odk_summary
else:
return odk_bc, odk_summary
[docs] def run_openva(self):
"""Create & run script or run smartva.
This method runs the through the suite of methods in the
:class:`OpenVA <openva_pipeline.openVA.OpenVA>`.
class. The list of tasks performed (in order) are: (1) call the method
:meth:`OpenVA.prep_va_data() <openva_pipeline.openVA.OpenVA.prep_va_data>`
to copy over CSV files with VA data (retrieved from ODK Aggregate);
(2) use the method
:meth:`OpenVA.r_script() <openva_pipeline.openVA.OpenVA.r_script>`
to create an R script; and (3) call the method
:meth:`OpenVA.get_cod() <openva_pipeline.openVA.OpenVA.get_cod>` to
run the R script that estimates the causes of death and stores the
results in "OpenVAFiles/recordStorage.csv" and
"OpenVAFiles/entityAttributeValue.csv" (the former serving as the
blob posted to DHIS2).
:return: an indicator of zero VA records in the ODK export
:rtype: dictionary
"""
pipeline_openva = OpenVA(
settings=self.settings,
pipeline_run_date=self.pipeline_run_date,
)
r_out = pipeline_openva.prep_va_data()
if r_out["n_to_openva"] > 0:
pipeline_openva.r_script()
completed = pipeline_openva.get_cod()
# r_out["completed"] = completed
r_out["return_code"] = completed.returncode
summary = pipeline_openva.get_summary()
r_out.update(summary)
else:
r_out["n_processed"] = 0
r_out["n_cod_missing"] = 0
return r_out
def _connect_dhis(self):
"""Create connection to DHIS API. """
args_dhis = self.settings['dhis']
args_pipeline = self.settings['pipeline']
self.dhis = DHIS(args_dhis, args_pipeline.working_directory)
def _check_use_dhis(self) -> None:
"""Check if Pipeline is set to use DHIS and, if so, check connection
and create if necessary"""
if self.use_dhis is False:
raise PipelineError("This Pipeline instance was created with the "
"argument use_dhis=False (so cannot access "
"configuration settings for DHIS used by "
"Pipeline.run_dhis)")
elif self.dhis is None:
self._connect_dhis()
[docs] def run_dhis(self) -> Dict[str, str]:
"""Connect to API and post events.
This method first calls the method
:meth:`DHIS.connect() <openva_pipeline.dhis.DHIS.connect>`
to establish a connection with a DHIS2 server and, second
calls the method
:meth:`DHIS.post_va() <openva_pipeline.dhis.DHIS.post_va>` to
post VA data, the assigned causes of death, and associated
metadata (concerning cause assignment).
:return: VA Program ID from the DHIS2 server, the log from
the DHIS2 connection, and the number of records posted to DHIS2
:rtype: dictionary
"""
self._check_use_dhis()
post_log = self.dhis.post_va(self.xfer_db)
if self.dhis.post_to_tracker:
self.dhis.verify_tei_post(post_log)
else:
self.dhis.verify_post(post_log)
dhis_out = {
"va_program_uid": self.dhis.va_program_uid,
"post_log": post_log,
"n_posted_events": self.dhis.n_posted_events,
"n_no_valid_org_unit": self.dhis.n_no_valid_org_unit,
}
return dhis_out
[docs] def get_no_org_unit(self,
va_id: str = None) -> Dict:
"""Get VA record IDs that do not have a valid organisation unit
for posting to DHIS2; or, if va_id is provided, get the eventBlob and
evaBlob for that VA.
:parameter va_id: VA's ID for which the eventBlob and evaBlob will be
returned.
:type va_id: str
:return: VA record ID and data used to find DHIS2 org unit; or, if
va_id is provided, the eventBlob and evaBlob for that VA.
:rtype: dictionary
"""
self._check_use_dhis()
return self.xfer_db.get_no_ou_va(va_id)
[docs] def fix_no_org_unit(self,
va_id: str,
org_unit: str) -> Dict:
"""Post a VA event to DHIS2 with provided organisation unit. If the
post is successful, the corresponding record will be removed from
the Transfer database table VA_Org_Unit_Not_Found and added to the
VA_Storage table.
:parameter va_id: ID for VA record to post
:type va_id: str
:parameter org_unit: New DHIS2 organisation unit (display name or ID) where
VA event will be posted.
:type org_unit: str
:return: Message indicating successful or unsuccessful post.
:rtype: str
"""
org_unit_id = self._check_fix_no_ou(va_id, org_unit)
if org_unit_id == "fail":
raise PipelineError("Post to DHIS2 was unsuccessful "
"(invalid va_id or org_unit)")
va_data = self.xfer_db.get_no_ou_va(va_id)
log = self.dhis.post_single_va(va_dict=va_data["va_dict"],
eav=va_data["eav_dataframe"],
org_unit=org_unit_id)
if self.dhis.post_to_tracker:
parsed_log = self.dhis._parse_tei_post_log(log=log)
else:
parsed_log = self.dhis._parse_post_log(log=log)
log_summary = list(parsed_log.values())[0]
verify_post = self.dhis.verify_single_va(va_id=va_id,
post_log=log)
if verify_post:
self.xfer_db.remove_no_ou_va(va_id=va_id)
self.xfer_db.store_single_va(va_dict=va_data["va_dict"],
org_unit_id=org_unit_id,
log_summary=log_summary,
dhis_tracker=self.dhis.post_to_tracker)
return log_summary
else:
print("Unable to verify the post to DHIS2 was successful.")
return log
def _check_fix_no_ou(self, va_id: str, org_unit: str) -> str:
"""Check that va_id is in the Transfer database table
VA_Org_Unit_Not_Found and that the org_unit is valid.
:parameter va_id: ID for VA record to post
:type va_id: str
:parameter org_unit: New DHIS2 organisation unit (display name or ID) where
VA event will be posted.
:type org_unit: str
:return: DHIS2 organisation unit ID or fail
:rtype: str
"""
self._check_use_dhis()
va_no_org_unit = self.xfer_db.get_no_ou_va()
if va_id not in va_no_org_unit.keys():
print(f"{va_id} is not in Transfer database table "
"VA_Org_Unit_Not_Found")
return "fail"
va_storage_ids = self.xfer_db._get_va_storage_ids()
if va_id in va_storage_ids:
print(f"{va_id} is already stored in VA_Storage table!")
return "fail"
valid_org_units = self.get_dhis_org_units()
if org_unit in valid_org_units.keys():
return valid_org_units[org_unit]
elif org_unit in valid_org_units.values():
return org_unit
else:
print(f"{org_unit} is not a valid DHIS2 organisation unit")
return "fail"
[docs] def get_dhis_org_units(self, va_program: bool = True) -> Dict:
"""Get DHIS organisation unit IDs and display names.
:parameter va_program: Indicator for returning only organisation units
in the DHIS VA Program (as opposed to all organisation units).
:type va_program: bool
:returns: displayName: id of DHIS organisation units.
:rtype: dict
"""
self._check_use_dhis()
return self.dhis._get_org_units(va_program=va_program)
[docs] def store_results_db(self):
"""Store VA results in Transfer database."""
self.xfer_db.config_pipeline()
if not self.use_dhis:
args_pipeline = self.settings["pipeline"]
working_directory = args_pipeline.working_directory
record_storage_path = os.path.join(
working_directory,
"OpenVAFiles/record_storage.csv")
new_storage_path = os.path.join(
working_directory,
"OpenVAFiles/new_storage.csv")
record_storage = read_csv(record_storage_path)
record_storage["pipelineOutcome"] = "Assigned a cause of death"
missing_cod = record_storage["cod"] == "MISSING"
record_storage.loc[missing_cod,
"pipelineOutcome"] = "No cause assigned"
record_storage.to_csv(new_storage_path)
self.xfer_db.store_va(self.dhis.post_to_tracker)
[docs] def close_pipeline(self):
"""Update ODK_Conf ODKLastRun in Transfer DB and clean up files.
This method calls methods in the
:class:`TransferDB <openva_pipeline.transferDB.TransferDB>`
class to remove the data files created at each step of the
pipeline. More specifically, it runs
:meth:`TransferDB.clean_odk()
<openva_pipeline.transferDB.TransferDB.clean_odk>`
to remove the ODK Briefcase export files ("ODKFiles/odkBCExportNew.csv"
and "ODKFiles/odkBCExportPrev.csv") if they exist;
:meth:`TransferDB.clean_openva()
<openva_pipeline.transferDB.TransferDB.clean_openva>`
to remove the input data file ("OpenVAFiles/openva_input.csv") and the
output files ("OpenVAFiles/record_storage.csv",
"OpenVAFiles/new_storage.csv", and
"OpenVAFiles/entity_attribute_value.csv") -- note that all of these
results are stored in either/both of the Transfer DB and the DHIS2
server's VA program; and, third, the method
:meth:`TransferDB.clean_dhis()
<openva_pipeline.transferDB.TransferDB.clean_dhis>`
is called to remove the blobs posted to the DHIS2 server and stored in
the folder "DHIS/blobs". Finally, this method updates the Transfer
DB's value in the ODK_Conf table's variable odk_last_run so the next ODK
Export file does not include VA records already processed through the
pipeline.
"""
self.xfer_db.config_pipeline()
self.xfer_db.clean_odk()
self.xfer_db.clean_openva()
if self.use_dhis:
self.xfer_db.clean_dhis()
self.xfer_db.update_odk_last_run()