Documentation for classes, functions, and methods¶
Running the OpenVA Pipeline¶
The openva_pipeline package includes two convenience functions for
creating the Transfer Database – a database that holds configuration settings, VA data and results, and a table for logging events and errors; and
running through all of the steps in the openVA Pipeline
- openva_pipeline.run_pipeline.create_transfer_db(database_file_name, database_directory, database_key)[source]¶
Create the (SQLite encrypted) Transfer Database.
- Parameters:
database_file_name – File name for the Transfer Database.
database_directory – Path of the Transfer Database.
database_key – Encryption key for the Transfer Database
- openva_pipeline.run_pipeline.run_pipeline(database_file_name, database_directory, database_key, export_to_dhis=True)[source]¶
Runs through all steps of the OpenVA Pipeline
This function is a wrapper for the Pipeline class, which runs through all steps of the OpenVA Pipeline – (1) connect to Transfer Database (to retrieve configuration settings); (2) connect to ODK Aggregate to download a CSV file with VA records; (3) run openVA (or SmartVA) to assign cause of death; and (4) store CoD results and VA data in the Transfer Database as well as a DHIS2 VA Program (if requested).
- Parameters:
database_file_name – File name for the Transfer Database.
database_directory – Path of the Transfer Database.
database_key – Encryption key for the Transfer Database
export_to_dhis ((Boolean)) – Indicator for posting VA records to a DHIS2 server.
Main Interface¶
The OpenVA Pipeline is run using the following function
- class openva_pipeline.pipeline.Pipeline(db_file_name: str, db_directory: str, db_key: str, use_dhis: bool = True)[source]¶
Primary API for the openVA pipeline.
This class calls three others to move verbal autopsy data from an ODK Aggregate server (using the ODK class), through the openVA R package to assign cause of death (using the OpenVA class), and deposits the VA records with assigned causes to either/both a DHIS server (using the DHIS class) or the Transfer database – a local database which also contains configuration settings for the pipeline. The TransferDB class performs the final step of storing the results locally as well as accessing the configuration settings.
- Parameters:
db_file_name (str) – File name of the Transfer database.
db_directory (str) – Path of folder containing the Transfer database.
db_key (str) – Encryption key for the Transfer database.
use_dhis (bool) – Indicator for telling pipeline to post records/tracked entity instances to DHIS
- close_pipeline()[source]¶
Update ODK_Conf ODKLastRun in Transfer DB and clean up files.
This method calls methods in the
TransferDBclass to remove the data files created at each step of the pipeline. More specifically, it runsTransferDB.clean_odk()to remove the ODK Briefcase export files (“ODKFiles/odkBCExportNew.csv” and “ODKFiles/odkBCExportPrev.csv”) if they exist;TransferDB.clean_openva()to remove the input data file (“OpenVAFiles/openva_input.csv”) and the output files (“OpenVAFiles/record_storage.csv”, “OpenVAFiles/new_storage.csv”, and “OpenVAFiles/entity_attribute_value.csv”) – note that all of these results are stored in either/both of the Transfer DB and the DHIS2 server’s VA program; and, third, the methodTransferDB.clean_dhis()is called to remove the blobs posted to the DHIS2 server and stored in the folder “DHIS/blobs”. Finally, this method updates the Transfer DB’s value in the ODK_Conf table’s variable odk_last_run so the next ODK Export file does not include VA records already processed through the pipeline.
- fix_no_org_unit(va_id: str, org_unit: str) Dict[source]¶
Post a VA event to DHIS2 with provided organisation unit. If the post is successful, the corresponding record will be removed from the Transfer database table VA_Org_Unit_Not_Found and added to the VA_Storage table.
- Parameters:
va_id (str) – ID for VA record to post
org_unit (str) – New DHIS2 organisation unit (display name or ID) where VA event will be posted.
- Returns:
Message indicating successful or unsuccessful post.
- Return type:
str
- get_dhis_org_units(va_program: bool = True) Dict[source]¶
Get DHIS organisation unit IDs and display names.
- Parameters:
va_program (bool) – Indicator for returning only organisation units in the DHIS VA Program (as opposed to all organisation units).
- Returns:
displayName: id of DHIS organisation units.
- Return type:
dict
- get_no_org_unit(va_id: str | None = None) Dict[source]¶
Get VA record IDs that do not have a valid organisation unit for posting to DHIS2; or, if va_id is provided, get the eventBlob and evaBlob for that VA.
- Parameters:
va_id (str) – VA’s ID for which the eventBlob and evaBlob will be returned.
- Returns:
VA record ID and data used to find DHIS2 org unit; or, if va_id is provided, the eventBlob and evaBlob for that VA.
- Return type:
dictionary
- log_event(event_desc, event_type)[source]¶
Commit event or error message into EventLog table of transfer database.
- Parameters:
event_desc (string) – Description of the event.
event_type (string) – Type of event (error or information)
- run_dhis() Dict[str, str][source]¶
Connect to API and post events.
This method first calls the method
DHIS.connect()to establish a connection with a DHIS2 server and, second calls the methodDHIS.post_va()to post VA data, the assigned causes of death, and associated metadata (concerning cause assignment).- Returns:
VA Program ID from the DHIS2 server, the log from the DHIS2 connection, and the number of records posted to DHIS2
- Return type:
dictionary
- run_odk()[source]¶
Run check duplicates, copy file, and briefcase.
This method downloads data from either (1) an ODK Central server, using
ODK.central(), or (2) an ODK Aggregate server using the Java application ODK Briefcase, by calling the methodODK.briefcase(). The configuration settings are taken from the argument argsODK (seePipeline.config()), and downloads verbal autopsy (VA) records as a (csv) export from an ODK Central/Aggregate server. If there is a previous ODK export file, this method merges the files by keeping only the unique VA records.- Returns:
Summary of results from ODK step
- Return type:
tuple
- run_openva()[source]¶
Create & run script or run smartva.
This method runs the through the suite of methods in the
OpenVA. class. The list of tasks performed (in order) are: (1) call the methodOpenVA.prep_va_data()to copy over CSV files with VA data (retrieved from ODK Aggregate); (2) use the methodOpenVA.r_script()to create an R script; and (3) call the methodOpenVA.get_cod()to run the R script that estimates the causes of death and stores the results in “OpenVAFiles/recordStorage.csv” and “OpenVAFiles/entityAttributeValue.csv” (the former serving as the blob posted to DHIS2).- Returns:
an indicator of zero VA records in the ODK export
- Return type:
dictionary
API for Transfer Database¶
- class openva_pipeline.transfer_db.TransferDB(db_file_name: str, db_directory: str, db_key: str, pl_run_date: str)[source]¶
This class handles interactions with the Transfer database.
The Pipeline accesses configuration information from the Transfer database, and also stores log messages and verbal autopsy records in the DB. The Transfer database is encrypted using sqlcipher3 (and the pysqlcipher3 module is imported to establish DB connection).
Parameters
- Parameters:
db_file_name (str) – File name of the Transfer database.
db_directory (str) – Path of folder containing the Transfer database.
db_key (str) – Encryption key for the Transfer database.
pl_run_date (str) – Date when pipeline started latest run (YYYY-MM-DD_hh:mm:ss).
- check_duplicates(use_dhis: bool) dict[source]¶
Search for duplicate VA records.
This method searches for duplicate VA records in ODK export file and the Transfer DB. If duplicates are found, a warning message is logged to the EventLog table in the Transfer database and the duplicate records are removed from the ODK export file.
- Parameters:
use_dhis (bool) – Indicator for posting records to DHIS2. If True, then check VA_Org_Unit_Not_Found table for additional duplicate VA records.
- Raises:
DatabaseConnectionError, PipelineError
- Returns:
Number of duplicates found and number of VA records sending to openVA.
- Return type:
dict
- config_dhis(algorithm: str) List[NamedTuple | Dict][source]¶
Query DHIS configuration settings from database.
This method is intended to be used in conjunction with (1)
TransferDB.connect_db(), which establishes a connection to a database with the Pipeline configuration settings; and (2)DHIS.connect(), which establishes a connection to a DHIS server. Thus, TransferDB.config_dhis() gets its input fromTransferDB.connect_db()and the output from TransferDB.config() is a valid argument forDHIS.connect()- Parameters:
algorithm (str) – VA algorithm used by R package openVA
- Returns:
First item contains all parameters for
DHIS.connect(), and the second item contains the causes of death used by the VA Program (in DHIS2)- Return type:
list [named tuple, dict]
- Raises:
DHISConfigurationError
- config_odk() NamedTuple[source]¶
Query ODK configuration settings from database.
This method is intended to be used in conjunction with (1)
TransferDB.connect_db(), which establishes a connection to a database with the Pipeline configuration settings; and (2)ODK.briefcase(), which establishes a connection to an ODK Aggregate server. Thus, TransferDB.config_odk() gets its input fromTransferDB.connect_db()and the output from TransferDB.config_odk() is a valid argument forODK.briefcase().- Returns:
Contains all parameters for
ODK.briefcase().- Return type:
(named) tuple
- Raises:
ODKConfigurationError
- config_openva(algorithm: str) NamedTuple[source]¶
Query OpenVA configuration settings from database.
This method is intended to receive its input (a Connection object) from
TransferDB.connect_db(), which establishes a connection to a database with the Pipeline configuration settings. It sets up the configuration for the VA algorithms included in the R package openVA. The output fromconfig_openva()serves as an input to the n methodOpenVA.setAlgorithmParameters(). This is a wrapper function that calls_config_interva(),_config_insilicova(), and_config_smartva()to actually pull configuration settings from the database.- Parameters:
algorithm (str) – VA algorithm used by R package openVA
- Returns:
Contains all parameters needed for OpenVA.setAlgorithmParameters().
- Return type:
(named) tuple
- Raises:
OpenVAConfigurationError
- config_pipeline() NamedTuple[source]¶
Grabs Pipeline configuration settings.
This method queries the Pipeline_Conf table in Transfer database and returns a tuple with attributes (1) algorithmMetadataCode; (2) codSource; (3) algorithm; and (4) working_directory.
- Returns:
Arguments needed to configure the OpenVA Pipeline algorithmMetadataCode - attribute describing VA data codSource - attribute detailing the source of the Cause of Death list algorithm - attribute indicating which VA algorithm to use working_directory - attribute indicating the working directory
- Return type:
(named) tuple
- Raises:
PipelineConfigurationError
- get_event_log(n_messages: int, recent: bool) list[source]¶
Get rows from EventLog table in Transfer Database
- Parameters:
n_messages (int) – Number of messages to retrieve
recent – Get messages starting from the most recent
- Type:
bool
- Return type:
list
- get_fields(table: List[str]) List[Tuple[str, str]][source]¶
Get field names from table in Transfer Database
- Parameters:
table (str) – Name of table
- Return type:
list of (field name, data type)
- get_no_ou_va(va_id: str | None = None) Dict[source]¶
Get the VA IDs and org unit data columns for records that do not have a valid DHIS2 org unit assignment; or select the eventBlob and evaBlob if the va_id is provided.
- get_schema(table: str) list[source]¶
Get schema from table in Transfer Database
- Parameters:
table (str) – Name of table
- Return type:
list
- get_table_conf(table_name: str) list[source]¶
Get values in ODK_Conf table from Transfer Database
- Parameters:
table_name (str) – name of table in Transfer Database
- Return type:
list
- insert_event_log(values: Tuple[str, str, str]) None[source]¶
Insert new row in Transfer Database table EventLog
- Parameters:
values (Tuple of 3 strings) – Event description, event type (e.g., Event, Error, Warning, Summary), and date and time of entry
- make_pipeline_dirs() None[source]¶
Create directories for storing files (if they don’t exist).
The method creates the following folders in the working directory (as set in the Transfer database table Pipeline_Conf): (1) ODKFiles for files containing verbal autopsy records from the ODK Aggregate server; (2) OpenVAFiles containing R scripts and results from the cause assignment algorithms; and (3) DHIS for holding blobs that will be stored in a data repository (DHIS2 server and/or the local Transfer database).
- Raises:
PipelineError
- remove_no_ou_va(va_id: str) None[source]¶
Remove the VA record from Transfer database table VA_Org_Unit_Not_Found.
- store_no_ou_va(va_record: dict, eav: DataFrame, data_ou: str) None[source]¶
Store VA record without valid organisation unit (ou) in Transfer database table VA_Org_Unit_Not_Found.
- Parameters:
va_record – VA record processed by openVA along with cause and metadata
eav – VA record in EAV format (Entity, Attribute, Value) prepared by openVA
data_ou – Organisation unit (for DHIS2) found in data
dhis_ou – Organisation unit the pipeline found and wanted to use for posting to DHIS2
- Type:
dict:
- Type:
DataFrame
- Type:
str
- Type:
str
- Raises:
PipelineError, DatabaseConnectionError
- store_single_va(va_dict: Dict, org_unit_id: str, log_summary: Dict, dhis_tracker: bool = False) None[source]¶
Store a single VA record in Transfer database table VA_Storage.
This method is intended to be used in conjunction with the
DHISclass, which prepares the records into the proper format for storage in the Transfer database.- Parameters:
va_dict (dict) – VA record
org_unit_id (str) – DHIS2 organisation unit ID
log_summary (dict) – Parsed log from DHIS2 post
dhis_tracker (bool) – Indicator of using DHIS2 VA tracker program
- Raises:
PipelineError, DatabaseConnectionError
- store_va(dhis_tracker: bool = False) None[source]¶
Store VA records in Transfer database.
This method is intended to be used in conjunction with the
DHISclass, which prepares the records into the proper format for storage in the Transfer database.- Parameters:
dhis_tracker (bool) – Indicator of using DHIS2 VA tracker program
- Raises:
PipelineError, DatabaseConnectionError
- update_table(table_name: str, field: str | list, value: str | list) None[source]¶
Update value(s) into Transfer Database table_name.field(s)
- Parameters:
table_name (str) – name of table in Transfer Database
field (str or list of str) – field name(s) in table_name
value (str or list of str) – new values to update in table_name.fields
API for ODK Briefcase¶
- class openva_pipeline.odk.ODK(settings)[source]¶
Manages Pipeline’s interaction with ODK Aggregate.
This class handles the segment of the pipeline related to ODK. The ODK.connect() method calls ODK Briefcase to connect with an ODK Aggregate server and export VA records. It also checks for previously exported files and updates them as needed. Finally, it logs messages and errors to the pipeline database.
- Parameters:
settings (dictionary of named tuples) – Configuration settings for pipeline steps (which is returned from
Pipeline.config()).
- briefcase()[source]¶
Calls ODK Briefcase.
This method spawns a new process that runs the ODK Briefcase Java application (via a command-line interface) to download a CSV file with verbal autopsy records from an ODK Aggregate server.
- Returns:
Return value from method subprocess.run()
- Return type:
subprocess.CompletedProcess
- Raises:
ODKError
API for OpenVA¶
- class openva_pipeline.openva.OpenVA(settings, pipeline_run_date)[source]¶
Assign cause of death (COD) to verbal autopsies (VA) R package openVA.
This class creates and executes an R script that copies (and merges) ODK Briefcase exports, runs openVA to assign CODs, and creates outputs for depositing in the Transfers DB and to a DHIS server.
- Parameters:
settings (dictionary of named tuples) – Configuration settings for pipeline steps (which is returned from
Pipeline.config()).pipeline_run_date (datetime.datetime.now() with formatting strftime("%Y-%m-%d_%H:%M:%S")) – Date and time when instance of
Pipelinewas created (instance attribute).
- Raises:
OpenVAError
- get_cod()[source]¶
Create and execute R script to assign a COD with openVA; or call the SmartVA CLI to assign COD.
- get_summary() dict[source]¶
Get summary of openVA step.
- Returns:
Get the number of records passed to openVA and the number of records without an assigned cause of death (CoD).
- Return type:
dict
API for DHIS2¶
- class openva_pipeline.dhis.DHIS(dhis_args, working_directory)[source]¶
Class for transferring VA records (with assigned CODs) to the DHIS2.
This class includes methods for importing VA results (i.e. assigned causes of death from openVA or SmartVA) as CSV files, connecting to a DHIS2 server with the Verbal Autopsy Program, and posting the results to the DHIS2 server and/or the local Transfer database.
- Parameters:
dhis_args (list of namedtuple and dictionary with COD codes) – Contains parameter values for connected to DHIS2, as returned by transferDB.config_dhis().
working_directory (string) – Working directory for the openVA Pipeline
- Raises:
DHISError
- post_single_va(va_dict: Dict, eav: DataFrame, org_unit: str) Dict[source]¶
Post a single event to DHIS2
- Parameters:
va_dict (dict) – VA record with cause of death and metadata
eav (DataFrame) – VA record in Entity-Attribute-Value format
org_unit (str) – DHIS2 organisation unit where the event will be posted
- post_va(xfer_db: TransferDB) Dict[source]¶
Post VA records to DHIS.
This method reads in a CSV file (“entity_attributes_value.csv”) with cause of death results (from openVA) then formats events and posts them to a VA Program (installed on DHIS2 server).
- Parameters:
xfer_db (openva_pipeline.transfer_db.TransferDB) – Transfer Database instance
- Returns:
Log information received after posting events to the VA Program on a DHIS2 server (see
API.post).- Return type:
dict
- Raises:
DHISError
- verify_post(post_log)[source]¶
Verify that VA records were posted to DHIS2 server.
- Parameters:
post_log (dictionary) – Log information retrieved after posting events to a VA Program on a DHIS2 server; this is the return object from
DHIS.post_va.- Raises:
DHISError
- verify_single_va(va_id: str, post_log: Dict) bool[source]¶
Verify that VA tracked entity instance (tei) was posted to DHIS2 server.
- Parameters:
va_id (str) – Verbal Autopsy ID
post_log (dictionary) – Log information retrieved after posting events to a VA Program on a DHIS2 server; this is the return object from
DHIS.post_va.
- Raises:
DHISError
- verify_tei_post(post_log)[source]¶
Verify that VA tracked entity instances (tei) were posted to DHIS2 server.
- Parameters:
post_log (dictionary) – Log information retrieved after posting events to a VA Program on a DHIS2 server; this is the return object from
DHIS.post_va.- Raises:
DHISError
- class openva_pipeline.dhis.API(dhis_url, dhis_user, dhis_password)[source]¶
This class provides methods for interacting with the DHIS2 API.
This class is called by an instance of the
DHISto retrieve information from and post verbal autopsy records (and results) to a DHIS2 server that has the Verbal Autopsy program installed.- Parameters:
dhis_url (string) – Web address for DHIS2 server (e.g. “play.dhis2.org/demo”).
dhis_user (string) – Username for DHIS2 account.
dhis_password (string) – Password for DHIS2 account.
- Raises:
DHISError
- class openva_pipeline.dhis.VerbalAutopsyEvent(va_id, program, dhis_org_unit, event_date, sex, dob, age, cod_code, algorithm_metadata, odk_id, file_id)[source]¶
Create DHIS2 event + a BLOB file resource
- Parameters:
va_id (string) – UID for verbal autopsy record (used as a DHIS2 data element)
program (string) – UID of the DHIS2’s Verbal Autopsy program
dhis_org_unit (string) – UID for the DHIS2 Organization Unit where the event (death) should be registered.
event_date (date) – Date of death with YYYY-MM-DD format
sex (string or integer) – Sex of the deceased (used as a DHIS2 data element). Possible values must fit to an option in the VA Program’s “Sex” optionSet: female, male, missing, unknown). If SmartVA is used to assign cause of death, then sex is an integer with 1 = male and 2 = female).
dob (date) – Date of birth of the deceased with YYYY-MM-DD format (used as a DHIS2 data element)
age (integer) – Age (in years) at time of death
cod_code (string) – Coded cause of death (must fit to an option in the VA Program’s “CoD codes” optionSet.
algorithm_metadata (string) – Code for how the CoD was obtained (must fit in VA Program’s “Algorithm Metadata” optionSet.
odk_id (string) – UID for the VA record assigned by the ODK Aggregate server
file_id (string) – UID for the blob file (containing the VA data and results) posted to (and assigned by) DHIS2 server.
- format_se_to_dhis2(dhis_user, dhis_org_unit)[source]¶
Format object to DHIS2 (single event) compatible event for DHIS2 API
- Parameters:
dhis_user – DHIS2 username for account posting the event
dhis_org_unit – code for DHIS2 organization unit where the death will be posted
- Returns:
DHIS2 event
- Return type:
dict
- format_tea_to_dhis2(dhis_user, dhis_org_unit, tei_id=None)[source]¶
Format object to DHIS2 (tracker) compatible event for DHIS2 API
- Parameters:
dhis_user – DHIS2 username for account posting the event
dhis_org_unit – code for DHIS2 organization unit where the death will be posted
tei_id – ID for the registered tracked entity instance (None if the tei has not been registered)
- Returns:
DHIS2 event
- Return type:
dict
- openva_pipeline.dhis.create_db(file_name, eva_list)[source]¶
Create a SQLite database with VA data + COD
- Parameters:
file_name (str) – Name (including path) of sqlite3 db file (blob) that will be posted to DHIS2
eva_list (list) – Event-Value-Attribute data structure with verbal autopsy data, cause of death result, and VA metadata.
- Return type:
None
Exceptions¶
- exception openva_pipeline.exceptions.PipelineError[source]¶
Base class for exceptions in the openva_pipeline module.
- exception openva_pipeline.exceptions.DatabaseConnectionError[source]¶
An error occurred connecting to the Transfer database.
- exception openva_pipeline.exceptions.PipelineConfigurationError[source]¶
An error occurred accessing the Pipeline_Conf table in the DB.
- exception openva_pipeline.exceptions.ODKConfigurationError[source]¶
An error occurred accessing the ODK_Conf table in the DB.
- exception openva_pipeline.exceptions.OpenVAConfigurationError[source]¶
An error occurred accessing the OpenVA_Conf table in the DB.
- exception openva_pipeline.exceptions.DHISConfigurationError[source]¶
An error occurred accessing the DHIS_Conf table in the DB.