Documentation for classes, functions, and methods

Running the OpenVA Pipeline

The openva_pipeline package includes two convenience functions for

  1. creating the Transfer Database – a database that holds configuration settings, VA data and results, and a table for logging events and errors; and

  2. running through all of the steps in the openVA Pipeline

openva_pipeline.run_pipeline.create_transfer_db(database_file_name, database_directory, database_key)[source]

Create the (SQLite encrypted) Transfer Database.

Parameters:
  • database_file_name – File name for the Transfer Database.

  • database_directory – Path of the Transfer Database.

  • database_key – Encryption key for the Transfer Database

openva_pipeline.run_pipeline.run_pipeline(database_file_name, database_directory, database_key, export_to_dhis=True)[source]

Runs through all steps of the OpenVA Pipeline

This function is a wrapper for the Pipeline class, which runs through all steps of the OpenVA Pipeline – (1) connect to Transfer Database (to retrieve configuration settings); (2) connect to ODK Aggregate to download a CSV file with VA records; (3) run openVA (or SmartVA) to assign cause of death; and (4) store CoD results and VA data in the Transfer Database as well as a DHIS2 VA Program (if requested).

Parameters:
  • database_file_name – File name for the Transfer Database.

  • database_directory – Path of the Transfer Database.

  • database_key – Encryption key for the Transfer Database

  • export_to_dhis ((Boolean)) – Indicator for posting VA records to a DHIS2 server.

Main Interface

The OpenVA Pipeline is run using the following function

class openva_pipeline.pipeline.Pipeline(db_file_name: str, db_directory: str, db_key: str, use_dhis: bool = True)[source]

Primary API for the openVA pipeline.

This class calls three others to move verbal autopsy data from an ODK Aggregate server (using the ODK class), through the openVA R package to assign cause of death (using the OpenVA class), and deposits the VA records with assigned causes to either/both a DHIS server (using the DHIS class) or the Transfer database – a local database which also contains configuration settings for the pipeline. The TransferDB class performs the final step of storing the results locally as well as accessing the configuration settings.

Parameters:
  • db_file_name (str) – File name of the Transfer database.

  • db_directory (str) – Path of folder containing the Transfer database.

  • db_key (str) – Encryption key for the Transfer database.

  • use_dhis (bool) – Indicator for telling pipeline to post records/tracked entity instances to DHIS

close_pipeline()[source]

Update ODK_Conf ODKLastRun in Transfer DB and clean up files.

This method calls methods in the TransferDB class to remove the data files created at each step of the pipeline. More specifically, it runs TransferDB.clean_odk() to remove the ODK Briefcase export files (“ODKFiles/odkBCExportNew.csv” and “ODKFiles/odkBCExportPrev.csv”) if they exist; TransferDB.clean_openva() to remove the input data file (“OpenVAFiles/openva_input.csv”) and the output files (“OpenVAFiles/record_storage.csv”, “OpenVAFiles/new_storage.csv”, and “OpenVAFiles/entity_attribute_value.csv”) – note that all of these results are stored in either/both of the Transfer DB and the DHIS2 server’s VA program; and, third, the method TransferDB.clean_dhis() is called to remove the blobs posted to the DHIS2 server and stored in the folder “DHIS/blobs”. Finally, this method updates the Transfer DB’s value in the ODK_Conf table’s variable odk_last_run so the next ODK Export file does not include VA records already processed through the pipeline.

fix_no_org_unit(va_id: str, org_unit: str) Dict[source]

Post a VA event to DHIS2 with provided organisation unit. If the post is successful, the corresponding record will be removed from the Transfer database table VA_Org_Unit_Not_Found and added to the VA_Storage table.

Parameters:
  • va_id (str) – ID for VA record to post

  • org_unit (str) – New DHIS2 organisation unit (display name or ID) where VA event will be posted.

Returns:

Message indicating successful or unsuccessful post.

Return type:

str

get_dhis_org_units(va_program: bool = True) Dict[source]

Get DHIS organisation unit IDs and display names.

Parameters:

va_program (bool) – Indicator for returning only organisation units in the DHIS VA Program (as opposed to all organisation units).

Returns:

displayName: id of DHIS organisation units.

Return type:

dict

get_no_org_unit(va_id: str | None = None) Dict[source]

Get VA record IDs that do not have a valid organisation unit for posting to DHIS2; or, if va_id is provided, get the eventBlob and evaBlob for that VA.

Parameters:

va_id (str) – VA’s ID for which the eventBlob and evaBlob will be returned.

Returns:

VA record ID and data used to find DHIS2 org unit; or, if va_id is provided, the eventBlob and evaBlob for that VA.

Return type:

dictionary

log_event(event_desc, event_type)[source]

Commit event or error message into EventLog table of transfer database.

Parameters:
  • event_desc (string) – Description of the event.

  • event_type (string) – Type of event (error or information)

run_dhis() Dict[str, str][source]

Connect to API and post events.

This method first calls the method DHIS.connect() to establish a connection with a DHIS2 server and, second calls the method DHIS.post_va() to post VA data, the assigned causes of death, and associated metadata (concerning cause assignment).

Returns:

VA Program ID from the DHIS2 server, the log from the DHIS2 connection, and the number of records posted to DHIS2

Return type:

dictionary

run_odk()[source]

Run check duplicates, copy file, and briefcase.

This method downloads data from either (1) an ODK Central server, using ODK.central(), or (2) an ODK Aggregate server using the Java application ODK Briefcase, by calling the method ODK.briefcase(). The configuration settings are taken from the argument argsODK (see Pipeline.config()), and downloads verbal autopsy (VA) records as a (csv) export from an ODK Central/Aggregate server. If there is a previous ODK export file, this method merges the files by keeping only the unique VA records.

Returns:

Summary of results from ODK step

Return type:

tuple

run_openva()[source]

Create & run script or run smartva.

This method runs the through the suite of methods in the OpenVA. class. The list of tasks performed (in order) are: (1) call the method OpenVA.prep_va_data() to copy over CSV files with VA data (retrieved from ODK Aggregate); (2) use the method OpenVA.r_script() to create an R script; and (3) call the method OpenVA.get_cod() to run the R script that estimates the causes of death and stores the results in “OpenVAFiles/recordStorage.csv” and “OpenVAFiles/entityAttributeValue.csv” (the former serving as the blob posted to DHIS2).

Returns:

an indicator of zero VA records in the ODK export

Return type:

dictionary

store_results_db()[source]

Store VA results in Transfer database.

update_db()[source]

Update transfer database created by previous version of the pipeline.

API for Transfer Database

class openva_pipeline.transfer_db.TransferDB(db_file_name: str, db_directory: str, db_key: str, pl_run_date: str)[source]

This class handles interactions with the Transfer database.

The Pipeline accesses configuration information from the Transfer database, and also stores log messages and verbal autopsy records in the DB. The Transfer database is encrypted using sqlcipher3 (and the pysqlcipher3 module is imported to establish DB connection).

Parameters

Parameters:
  • db_file_name (str) – File name of the Transfer database.

  • db_directory (str) – Path of folder containing the Transfer database.

  • db_key (str) – Encryption key for the Transfer database.

  • pl_run_date (str) – Date when pipeline started latest run (YYYY-MM-DD_hh:mm:ss).

check_duplicates(use_dhis: bool) dict[source]

Search for duplicate VA records.

This method searches for duplicate VA records in ODK export file and the Transfer DB. If duplicates are found, a warning message is logged to the EventLog table in the Transfer database and the duplicate records are removed from the ODK export file.

Parameters:

use_dhis (bool) – Indicator for posting records to DHIS2. If True, then check VA_Org_Unit_Not_Found table for additional duplicate VA records.

Raises:

DatabaseConnectionError, PipelineError

Returns:

Number of duplicates found and number of VA records sending to openVA.

Return type:

dict

clean_dhis() None[source]

Remove DHIS2 blob files.

clean_odk() None[source]

Remove ODK Briefcase Export files.

clean_openva() None[source]

Remove openVA files with COD results.

config_dhis(algorithm: str) List[NamedTuple | Dict][source]

Query DHIS configuration settings from database.

This method is intended to be used in conjunction with (1) TransferDB.connect_db(), which establishes a connection to a database with the Pipeline configuration settings; and (2) DHIS.connect(), which establishes a connection to a DHIS server. Thus, TransferDB.config_dhis() gets its input from TransferDB.connect_db() and the output from TransferDB.config() is a valid argument for DHIS.connect()

Parameters:

algorithm (str) – VA algorithm used by R package openVA

Returns:

First item contains all parameters for DHIS.connect(), and the second item contains the causes of death used by the VA Program (in DHIS2)

Return type:

list [named tuple, dict]

Raises:

DHISConfigurationError

config_odk() NamedTuple[source]

Query ODK configuration settings from database.

This method is intended to be used in conjunction with (1) TransferDB.connect_db(), which establishes a connection to a database with the Pipeline configuration settings; and (2) ODK.briefcase(), which establishes a connection to an ODK Aggregate server. Thus, TransferDB.config_odk() gets its input from TransferDB.connect_db() and the output from TransferDB.config_odk() is a valid argument for ODK.briefcase().

Returns:

Contains all parameters for ODK.briefcase().

Return type:

(named) tuple

Raises:

ODKConfigurationError

config_openva(algorithm: str) NamedTuple[source]

Query OpenVA configuration settings from database.

This method is intended to receive its input (a Connection object) from TransferDB.connect_db(), which establishes a connection to a database with the Pipeline configuration settings. It sets up the configuration for the VA algorithms included in the R package openVA. The output from config_openva() serves as an input to the n method OpenVA.setAlgorithmParameters(). This is a wrapper function that calls _config_interva(), _config_insilicova(), and _config_smartva() to actually pull configuration settings from the database.

Parameters:

algorithm (str) – VA algorithm used by R package openVA

Returns:

Contains all parameters needed for OpenVA.setAlgorithmParameters().

Return type:

(named) tuple

Raises:

OpenVAConfigurationError

config_pipeline() NamedTuple[source]

Grabs Pipeline configuration settings.

This method queries the Pipeline_Conf table in Transfer database and returns a tuple with attributes (1) algorithmMetadataCode; (2) codSource; (3) algorithm; and (4) working_directory.

Returns:

Arguments needed to configure the OpenVA Pipeline algorithmMetadataCode - attribute describing VA data codSource - attribute detailing the source of the Cause of Death list algorithm - attribute indicating which VA algorithm to use working_directory - attribute indicating the working directory

Return type:

(named) tuple

Raises:

PipelineConfigurationError

get_event_log(n_messages: int, recent: bool) list[source]

Get rows from EventLog table in Transfer Database

Parameters:
  • n_messages (int) – Number of messages to retrieve

  • recent – Get messages starting from the most recent

Type:

bool

Return type:

list

get_fields(table: List[str]) List[Tuple[str, str]][source]

Get field names from table in Transfer Database

Parameters:

table (str) – Name of table

Return type:

list of (field name, data type)

get_no_ou_va(va_id: str | None = None) Dict[source]

Get the VA IDs and org unit data columns for records that do not have a valid DHIS2 org unit assignment; or select the eventBlob and evaBlob if the va_id is provided.

get_schema(table: str) list[source]

Get schema from table in Transfer Database

Parameters:

table (str) – Name of table

Return type:

list

get_table_conf(table_name: str) list[source]

Get values in ODK_Conf table from Transfer Database

Parameters:

table_name (str) – name of table in Transfer Database

Return type:

list

get_tables() list[source]

Get table names from Transfer Database

Return type:

list

insert_event_log(values: Tuple[str, str, str]) None[source]

Insert new row in Transfer Database table EventLog

Parameters:

values (Tuple of 3 strings) – Event description, event type (e.g., Event, Error, Warning, Summary), and date and time of entry

make_pipeline_dirs() None[source]

Create directories for storing files (if they don’t exist).

The method creates the following folders in the working directory (as set in the Transfer database table Pipeline_Conf): (1) ODKFiles for files containing verbal autopsy records from the ODK Aggregate server; (2) OpenVAFiles containing R scripts and results from the cause assignment algorithms; and (3) DHIS for holding blobs that will be stored in a data repository (DHIS2 server and/or the local Transfer database).

Raises:

PipelineError

remove_no_ou_va(va_id: str) None[source]

Remove the VA record from Transfer database table VA_Org_Unit_Not_Found.

store_no_ou_va(va_record: dict, eav: DataFrame, data_ou: str) None[source]

Store VA record without valid organisation unit (ou) in Transfer database table VA_Org_Unit_Not_Found.

Parameters:
  • va_record – VA record processed by openVA along with cause and metadata

  • eav – VA record in EAV format (Entity, Attribute, Value) prepared by openVA

  • data_ou – Organisation unit (for DHIS2) found in data

  • dhis_ou – Organisation unit the pipeline found and wanted to use for posting to DHIS2

Type:

dict:

Type:

DataFrame

Type:

str

Type:

str

Raises:

PipelineError, DatabaseConnectionError

store_single_va(va_dict: Dict, org_unit_id: str, log_summary: Dict, dhis_tracker: bool = False) None[source]

Store a single VA record in Transfer database table VA_Storage.

This method is intended to be used in conjunction with the DHIS class, which prepares the records into the proper format for storage in the Transfer database.

Parameters:
  • va_dict (dict) – VA record

  • org_unit_id (str) – DHIS2 organisation unit ID

  • log_summary (dict) – Parsed log from DHIS2 post

  • dhis_tracker (bool) – Indicator of using DHIS2 VA tracker program

Raises:

PipelineError, DatabaseConnectionError

store_va(dhis_tracker: bool = False) None[source]

Store VA records in Transfer database.

This method is intended to be used in conjunction with the DHIS class, which prepares the records into the proper format for storage in the Transfer database.

Parameters:

dhis_tracker (bool) – Indicator of using DHIS2 VA tracker program

Raises:

PipelineError, DatabaseConnectionError

update_odk_last_run() None[source]

Update Transfer Database table ODK_Conf.odk_last_run

update_table(table_name: str, field: str | list, value: str | list) None[source]

Update value(s) into Transfer Database table_name.field(s)

Parameters:
  • table_name (str) – name of table in Transfer Database

  • field (str or list of str) – field name(s) in table_name

  • value (str or list of str) – new values to update in table_name.fields

API for ODK Briefcase

class openva_pipeline.odk.ODK(settings)[source]

Manages Pipeline’s interaction with ODK Aggregate.

This class handles the segment of the pipeline related to ODK. The ODK.connect() method calls ODK Briefcase to connect with an ODK Aggregate server and export VA records. It also checks for previously exported files and updates them as needed. Finally, it logs messages and errors to the pipeline database.

Parameters:

settings (dictionary of named tuples) – Configuration settings for pipeline steps (which is returned from Pipeline.config()).

briefcase()[source]

Calls ODK Briefcase.

This method spawns a new process that runs the ODK Briefcase Java application (via a command-line interface) to download a CSV file with verbal autopsy records from an ODK Aggregate server.

Returns:

Return value from method subprocess.run()

Return type:

subprocess.CompletedProcess

Raises:

ODKError

central()[source]

Connects to ODK Central through api.

This method calls requests.get to download a CSV file with verbal autopsy records from an ODK Collect server.

Returns:

Returns a string indicating the number of downloaded records.

Return type:

string

Raises:

ODKError

merge_to_prev_export()[source]

Merge previous ODK Briefcase export files.

API for OpenVA

class openva_pipeline.openva.OpenVA(settings, pipeline_run_date)[source]

Assign cause of death (COD) to verbal autopsies (VA) R package openVA.

This class creates and executes an R script that copies (and merges) ODK Briefcase exports, runs openVA to assign CODs, and creates outputs for depositing in the Transfers DB and to a DHIS server.

Parameters:
  • settings (dictionary of named tuples) – Configuration settings for pipeline steps (which is returned from Pipeline.config()).

  • pipeline_run_date (datetime.datetime.now() with formatting strftime("%Y-%m-%d_%H:%M:%S")) – Date and time when instance of Pipeline was created (instance attribute).

Raises:

OpenVAError

get_cod()[source]

Create and execute R script to assign a COD with openVA; or call the SmartVA CLI to assign COD.

get_summary() dict[source]

Get summary of openVA step.

Returns:

Get the number of records passed to openVA and the number of records without an assigned cause of death (CoD).

Return type:

dict

prep_va_data()[source]
Create data file for openVA by merging ODK export files & converting

with pycrossva.

Returns:

Summary of the number of VA records at each step – previous ODK export (0 if there isn’t one), new ODK export, and number of VA records sent to openVA.

Return type:

dict

r_script()[source]

Create an R script for running openVA and assigning CODs.

smartva_to_csv()[source]

Write two CSV files: (1) Entity Value Attribute blob pushed to DHIS2 (entity_attribute_value.csv) (2) table for transfer database (record_storage.csv)

Both CSV files are stored in the OpenVA folder.

API for DHIS2

class openva_pipeline.dhis.DHIS(dhis_args, working_directory)[source]

Class for transferring VA records (with assigned CODs) to the DHIS2.

This class includes methods for importing VA results (i.e. assigned causes of death from openVA or SmartVA) as CSV files, connecting to a DHIS2 server with the Verbal Autopsy Program, and posting the results to the DHIS2 server and/or the local Transfer database.

Parameters:
  • dhis_args (list of namedtuple and dictionary with COD codes) – Contains parameter values for connected to DHIS2, as returned by transferDB.config_dhis().

  • working_directory (string) – Working directory for the openVA Pipeline

Raises:

DHISError

post_single_va(va_dict: Dict, eav: DataFrame, org_unit: str) Dict[source]

Post a single event to DHIS2

Parameters:
  • va_dict (dict) – VA record with cause of death and metadata

  • eav (DataFrame) – VA record in Entity-Attribute-Value format

  • org_unit (str) – DHIS2 organisation unit where the event will be posted

post_va(xfer_db: TransferDB) Dict[source]

Post VA records to DHIS.

This method reads in a CSV file (“entity_attributes_value.csv”) with cause of death results (from openVA) then formats events and posts them to a VA Program (installed on DHIS2 server).

Parameters:

xfer_db (openva_pipeline.transfer_db.TransferDB) – Transfer Database instance

Returns:

Log information received after posting events to the VA Program on a DHIS2 server (see API.post).

Return type:

dict

Raises:

DHISError

verify_post(post_log)[source]

Verify that VA records were posted to DHIS2 server.

Parameters:

post_log (dictionary) – Log information retrieved after posting events to a VA Program on a DHIS2 server; this is the return object from DHIS.post_va.

Raises:

DHISError

verify_single_va(va_id: str, post_log: Dict) bool[source]

Verify that VA tracked entity instance (tei) was posted to DHIS2 server.

Parameters:
  • va_id (str) – Verbal Autopsy ID

  • post_log (dictionary) – Log information retrieved after posting events to a VA Program on a DHIS2 server; this is the return object from DHIS.post_va.

Raises:

DHISError

verify_tei_post(post_log)[source]

Verify that VA tracked entity instances (tei) were posted to DHIS2 server.

Parameters:

post_log (dictionary) – Log information retrieved after posting events to a VA Program on a DHIS2 server; this is the return object from DHIS.post_va.

Raises:

DHISError

class openva_pipeline.dhis.API(dhis_url, dhis_user, dhis_password)[source]

This class provides methods for interacting with the DHIS2 API.

This class is called by an instance of the DHIS to retrieve information from and post verbal autopsy records (and results) to a DHIS2 server that has the Verbal Autopsy program installed.

Parameters:
  • dhis_url (string) – Web address for DHIS2 server (e.g. “play.dhis2.org/demo”).

  • dhis_user (string) – Username for DHIS2 account.

  • dhis_password (string) – Password for DHIS2 account.

Raises:

DHISError

get(endpoint, params=None)[source]

GET method for DHIS2 API.

Return type:

dict

post(endpoint, data, params=None)[source]

POST method for DHIS2 API.

Return type:

dict

post_blob(db_file)[source]

Post file to DHIS2 and return created UID for that file

Return type:

str

put(endpoint, data)[source]

PUT method for DHIS2 API.

Return type:

dict

class openva_pipeline.dhis.VerbalAutopsyEvent(va_id, program, dhis_org_unit, event_date, sex, dob, age, cod_code, algorithm_metadata, odk_id, file_id)[source]

Create DHIS2 event + a BLOB file resource

Parameters:
  • va_id (string) – UID for verbal autopsy record (used as a DHIS2 data element)

  • program (string) – UID of the DHIS2’s Verbal Autopsy program

  • dhis_org_unit (string) – UID for the DHIS2 Organization Unit where the event (death) should be registered.

  • event_date (date) – Date of death with YYYY-MM-DD format

  • sex (string or integer) – Sex of the deceased (used as a DHIS2 data element). Possible values must fit to an option in the VA Program’s “Sex” optionSet: female, male, missing, unknown). If SmartVA is used to assign cause of death, then sex is an integer with 1 = male and 2 = female).

  • dob (date) – Date of birth of the deceased with YYYY-MM-DD format (used as a DHIS2 data element)

  • age (integer) – Age (in years) at time of death

  • cod_code (string) – Coded cause of death (must fit to an option in the VA Program’s “CoD codes” optionSet.

  • algorithm_metadata (string) – Code for how the CoD was obtained (must fit in VA Program’s “Algorithm Metadata” optionSet.

  • odk_id (string) – UID for the VA record assigned by the ODK Aggregate server

  • file_id (string) – UID for the blob file (containing the VA data and results) posted to (and assigned by) DHIS2 server.

format_se_to_dhis2(dhis_user, dhis_org_unit)[source]

Format object to DHIS2 (single event) compatible event for DHIS2 API

Parameters:
  • dhis_user – DHIS2 username for account posting the event

  • dhis_org_unit – code for DHIS2 organization unit where the death will be posted

Returns:

DHIS2 event

Return type:

dict

format_tea_to_dhis2(dhis_user, dhis_org_unit, tei_id=None)[source]

Format object to DHIS2 (tracker) compatible event for DHIS2 API

Parameters:
  • dhis_user – DHIS2 username for account posting the event

  • dhis_org_unit – code for DHIS2 organization unit where the death will be posted

  • tei_id – ID for the registered tracked entity instance (None if the tei has not been registered)

Returns:

DHIS2 event

Return type:

dict

openva_pipeline.dhis.create_db(file_name, eva_list)[source]

Create a SQLite database with VA data + COD

Parameters:
  • file_name (str) – Name (including path) of sqlite3 db file (blob) that will be posted to DHIS2

  • eva_list (list) – Event-Value-Attribute data structure with verbal autopsy data, cause of death result, and VA metadata.

Return type:

None

openva_pipeline.dhis.get_cod_code(my_dict, search_for)[source]

Return COD label expected by (DHIS2) VA Program.

Parameters:
  • my_dict (dict) – Dictionary to search

  • search_for (string) – Cause of Death label returned by openVA.

Return type:

str

openva_pipeline.dhis.find_key_value(key, my_dict)[source]

Return a key’s value in a nested dictionary.

Exceptions

exception openva_pipeline.exceptions.PipelineError[source]

Base class for exceptions in the openva_pipeline module.

exception openva_pipeline.exceptions.DatabaseConnectionError[source]

An error occurred connecting to the Transfer database.

exception openva_pipeline.exceptions.PipelineConfigurationError[source]

An error occurred accessing the Pipeline_Conf table in the DB.

exception openva_pipeline.exceptions.ODKConfigurationError[source]

An error occurred accessing the ODK_Conf table in the DB.

exception openva_pipeline.exceptions.OpenVAConfigurationError[source]

An error occurred accessing the OpenVA_Conf table in the DB.

exception openva_pipeline.exceptions.DHISConfigurationError[source]

An error occurred accessing the DHIS_Conf table in the DB.

exception openva_pipeline.exceptions.ODKError[source]

An error occurred with the odk module.

exception openva_pipeline.exceptions.OpenVAError[source]

An error occurred with the openVA module.

exception openva_pipeline.exceptions.SmartVAError[source]

An error occurred with the openVA module.

exception openva_pipeline.exceptions.DHISError[source]

An error occurred with the dhis module.