Documentation for classes, functions, and methods

Running the OpenVA Pipeline

The openva_pipeline package includes two convenience functions for

  1. creating the Transfer Database – a database that holds configuration settings, VA data and results, and a table for logging events and errors; and
  2. running through all of the steps in the openVA Pipeline
openva_pipeline.runPipeline.createTransferDB(database_file_name, database_directory, database_key)[source]

Create the (SQLite encrypted) Transfer Database.

Parameters:
  • database_file_name – File name for the Transfer Database.
  • database_directory – Path of the Transfer Database.
  • datatbase_key – Encryption key for the Transfer Database
  • export_to_DHIS – Indicator for posting VA records to a DHIS2 server.
openva_pipeline.runPipeline.runPipeline(database_file_name, database_directory, database_key, export_to_DHIS=True)[source]

Runs through all steps of the OpenVA Pipeline

This function is a wrapper for the Pipeline class, which runs through all steps of the OpenVA Pipeline – (1) connect to Transfer Database (to retrieve configuration settings); (2) connect to ODK Aggregate to download a CSV file with VA records; (3) run openVA (or SmartVA) to assign cause of death; and (4) store CoD results and VA data in the Transfer Database as well as a DHIS2 VA Program (if requested).

Parameters:
  • database_file_name – File name for the Transfer Database.
  • database_directory – Path of the Transfer Database.
  • datatbase_key – Encryption key for the Transfer Database
  • export_to_DHIS ((Boolean)) – Indicator for posting VA records to a DHIS2 server.

Main Interface

The OpenVA Pipeline is run using the following function

class openva_pipeline.pipeline.Pipeline(dbFileName, dbDirectory, dbKey, useDHIS=True)[source]

Primary API for the openVA pipeline.

This class calls three others to move verbal autopsy data from an ODK Aggregate server (using the ODK class), through the openVA R package to assign cause of death (using the OpenVA class), and deposits the VA records with assigned causes to either/both a DHIS server (using the DHIS class) or the Transfer database – a local database which also contains configuration settings for the pipeline. The TransferDB class performs the final step of storing the results locally as well as accessing the configuration settings.

Parameters:
  • dbFileName (string) – File name of the Tranfser database.
  • dbDirectory (string) – str Path of folder containing the Transfer database.
  • dbKey (string) – Encryption key for the Transfer database.
closePipeline()[source]

Update ODK_Conf ODKLastRun in Transfer DB and clean up files.

This method calls methods in the TransferDB class to remove the data files created at each step of the pipeline. More specifically, it runs TransferDB.cleanODK() to remove the ODK Briefcase export files (“ODKFiles/odkBCExportNew.csv” and “ODKFiles/odkBCExportPrev.csv”) if they exist; TransferDB.cleanOpenVA() to remove the input data file (“OpenVAFiles/openVA_input.csv”) and the output files (“OpenVAFiles/recordStorage.csv”, “OpenVAFiles/newStorage.csv”, and “OpenVAFiles/entityAttributeValue.csv”) – note that all of these results are stored in either/both of the Transfer DB and the DHIS2 server’s VA program; and, third, the method TransferDB.cleanDHIS() is called to remove the blobs posted to the DHIS2 server and stored in the folder “DHIS/blobs”. Finally, this method updates the Transfer DB’s value in the ODK_Conf table’s variable odkLastRun so the next ODK Export file does not include VA records already processed through the pipeline.

config()[source]

Fetch configuration settings from Transfer DB.

This method queries the Transfer database (DB) and returns objects that can be used as the arguments for other methods in this class, i.e., Pipeline.runODK(), Pipeline.runOpenVA(), and Pipeline.runDHIS().

Parameters:
  • dbFileName (str) – File name of the Transfer DB. (e.g., Pipeline.db)
  • dbDirectory (str) – Path to the location of the Transfer DB.
  • dbKey (str) – Encryption key for the Transfer DB
  • plRunDate (date) – Date when pipeline started latest run (YYYY-MM-DD_hh:mm:ss).
Returns:

Configuration settings for pipeline steps (e.g. connecting to ODK Aggregate, running openVA, or posting records to DHIS)

Return type:

dictionary

logEvent(eventDesc, eventType)[source]

Commit event or error message into EventLog table of transfer database.

Parameters:
  • eventDesc (string) – Description of the event.
  • eventType – Type of event (error or information)
runDHIS(argsDHIS, argsPipeline)[source]

Connect to API and post events.

This method first calls the method DHIS.connect() to establish a connection with a DHIS2 server and, second calls the method DHIS.postVA() to post VA data, the assigned causes of death, and associated metadata (concerning cause assignment).

Parameters:
  • argsDHIS – Configuration settings for connecting to DHIS2 server.
  • argsPipeline (named tuple) – Configuration settings for OpenVA Pipeline
Returns:

VA Program ID from the DHIS2 server, the log from the DHIS2 connection, and the number of records posted to DHIS2

Return type:

dictionary

runODK(argsODK, argsPipeline)[source]

Run check duplicates, copy file, and briefcase.

This method downloads data from either (1) an ODK Central server, using ODK.central(), or (2) an ODK Aggregate server using the Java application ODK Briefcase, by calling the method ODK.briefcase(). The configuration settings are taken from the argument argsODK (see Pipeline.config()) , and downloads verbal autopsy (VA) records as a (csv) export from an ODK Central/Aggregate server. If there is a previous ODK export file, this method merges the files by keeping only the unique VA records.

Parameters:
  • argsODK (named tuple) – Arguments passed to connect and download records from the ODK Central/Aggregate server.
  • argsPipeline (named tuple) – Arguments for configuration the openva pipeline.
Returns:

Return value from method subprocess.run()

Return type:

subprocess.CompletedProcess

runOpenVA(argsOpenVA, argsPipeline, odkID, runDate)[source]

Create & run script or run smartva.

This method runs the through the suite of methods in the OpenVA. class. The list of tasks performed (in order) are: (1) call the method OpenVA.copyVA() to copy over CSV files with VA data (retrieved from ODK Aggregate); (2) use the method OpenVA.rScript() to create an R script; and (3) call the method OpenVA.getCOD() to run the R script that estimates the causes of death and stores the results in “OpenVAFiles/recordStorage.csv” and “OpenVAFiles/entityAttributeValue.csv” (the former serving as the blob posted to DHIS2).

Parameters:
  • argsOpenVA (named tuple) – Configuration settings for openVA.
  • argsPipeline (named tuple) – Configuration settings for OpenVA Pipeline
  • odkID (string) – column/variable name of VA record ID in ODK export
  • runDate (nowDate.strftime("%Y-%m-%d_%H:%M:%S")) – date and time when OpenVA Pipeline ran
Returns:

an indicator of zero VA records in the ODK export

Return type:

dictionary

storeResultsDB()[source]

Store VA results in Transfer database.

API for Transfer Database

class openva_pipeline.transferDB.TransferDB(dbFileName, dbDirectory, dbKey, plRunDate)[source]

This class handles interactions with the Transfer database.

The Pipeline accesses configuration information from the Transfer database, and also stores log messages and verbal autopsy records in the DB. The Transfer database is encrypted using sqlcipher3 (and the pysqlcipher3 module is imported to establish DB connection).

Parameters

Parameters:
  • dbFileName (str) – File name of the Tranfser database.
  • dbDirectory (str) – Path of folder containing the Transfer database.
  • dbKey (str) – Encryption key for the Transfer database.
  • plRunDate (date) – Date when pipeline started latest run (YYYY-MM-DD_hh:mm:ss).
checkDuplicates(conn)[source]

Search for duplicate VA records.

This method searches for duplicate VA records in ODK Briefcase export file and the Tranfser DB. If duplicates are found, a warning message is logged to the EventLog table in the Transfer database and the duplicate records are removed from the ODK Briefcase export file.

Parameters:conn (sqlite3 Connection object) – A connection to the Transfer Database (e.g. the object returned from TransferDB.connectDB().)
Raises:DatabaseConnectionError, PipelineError
cleanDHIS()[source]

Remove DHIS2 blob files.

cleanODK()[source]

Remove ODK Briefcase Export files.

cleanOpenVA()[source]

Remove openVA files with COD results.

configDHIS(conn, algorithm)[source]

Query DHIS configuration settings from database.

This method is intended to be used in conjunction with (1) TransferDB.connectDB(), which establishes a connection to a database with the Pipeline configuration settings; and (2) DHIS.connect(), which establishes a connection to a DHIS server. Thus, TransferDB.configDHIS() gets its input from TransferDB.connectDB() and the output from TransferDB.config() is a valid argument for DHIS.connect()

Parameters:
  • conn (sqlite3 Connection object) – A connection to the Transfer Database (e.g. the object returned from TransferDB.connectDB().)
  • algorithm (str) – VA algorithm used by R package openVA
Returns:

Contains all parameters for DHIS.connect().

Return type:

tuple

Raises:

DHISConfigurationError

configODK(conn)[source]

Query ODK configuration settings from database.

This method is intended to be used in conjunction with (1) TransferDB.connectDB(), which establishes a connection to a database with the Pipeline configuration settings; and (2) ODK.briefcase(), which establishes a connection to an ODK Aggregate server. Thus, TransferDB.configODK() gets its input from TransferDB.connectDB() and the output from TransferDB.configODK() is a valid argument for ODK.briefcase().

Parameters:conn (sqlite3 Connection object) – A connection to the Transfer Database (e.g. the object returned from TransferDB.connectDB().)
Returns:Contains all parameters for ODK.briefcase().
Return type:tuple
Raises:ODKConfigurationError
configOpenVA(conn, algorithm, pipelineDir)[source]

Query OpenVA configuration settings from database.

This method is intended to receive its input (a Connection object) from TransferDB.connectDB(), which establishes a connection to a database with the Pipeline configuration settings. It sets up the configuration for all of the VA algorithms included in the R package openVA. The output from configOpenVA() serves as an input to the method OpenVA.setAlgorithmParameters(). This is a wrapper function that calls configInterVA(), configInSilicoVA(), and configSmartVA() to actually pull configuration settings from the database.

Parameters:
  • conn (sqlite3 Connection object) – A connection to the Transfer Database (e.g. the object returned from TransferDB.connectDB().)
  • algorithm (str) – VA algorithm used by R package openVA
  • pipelineDir (str) – Working directory for the Pipeline
Returns:

Contains all parameters needed for OpenVA.setAlgorithmParameters().

Rtypes:

tuple

Raises:

OpenVAConfigurationError

configPipeline(conn)[source]

Grabs Pipline configuration settings.

This method queries the Pipeline_Conf table in Transfer database and returns a tuple with attributes (1) algorithmMetadataCode; (2) codSource; (3) algorithm; and (4) workingDirectory.

Returns:Arguments needed to configure the OpenVA Pipeline alogrithmMetadataCode - attribute describing VA data codSource - attribute detailing the source of the Cause of Death list algorithm - attribute indicating which VA algorithm to use workingDirectory - attribute indicating the working directory
Return type:tuple
Raises:PipelineConfigurationError
connectDB()[source]

Connect to Transfer database.

Uses parameters supplied to the parent class, TransferDB, to connect to the (encrypted) Transfer database.

Returns:Used to query (encrypted) SQLite database.
Return type:SQLite database connection object
Raises:DatabaseConnectionError
makePipelineDirs()[source]

Create directories for storing files (if they don’t exist).

The method creates the following folders in the working directory (as set in the Transfer database table Pipeline_Conf): (1) ODKFiles for files containing verbal autopsy records from the ODK Aggregate server; (2) OpenVAFiles containing R scripts and results from the cause assignment algorithms; and (3) DHIS for holding blobs that will be stored in a data repository (DHIS2 server and/or the local Transfer database).

Raises:PipelineError
storeVA(conn)[source]

Store VA records in Transfer database.

This method is intended to be used in conjunction with the DHIS class, which prepares the records into the proper format for storage in the Transfer database.

Parameters:conn (sqlite3 Connection object) – A connection to the Transfer Database (e.g. the object returned from TransferDB.connectDB().)
Raises:PipelineError, DatabaseConnectionError
updateODKLastRun(conn, plRunDate)[source]

Update Transfer Database table ODK_Conf.odkLastRun

Parameters:
  • conn (sqlite3 Connection object) – A connection to the Transfer Database (e.g. the object returned from TransferDB.connectDB().)
  • plRunDate (date (YYYY-MM-DD_hh:mm:ss)) – Date when pipeline started latest run

API for ODK Briefcase

class openva_pipeline.odk.ODK(odkSettings, workingDirectory)[source]

Manages Pipeline’s interaction with ODK Aggregate.

This class handles the segment of the pipeline related to ODK. The ODK.connect() method calls ODK Briefcase to connect with an ODK Aggregate server and export VA records. It also checks for previously exported files and updates them as needed. Finally, it logs messages and errors to the pipeline database.

Parameters:
  • odkSettings (named tuple) – A named tuple with all of configuration settings as attributes.
  • workingDirectory (string) – Directory where openVA Pipeline should create files.
briefcase()[source]

Calls ODK Briefcase.

This method spawns a new process that runs the ODK Briefcase Java application (via a command-line interface) to download a CSV file with verbal autopsy records from an ODK Aggregate server.

Returns:Return value from method subprocess.run()
Return type:subprocess.CompletedProcess
Raises:ODKError
central()[source]

Connects to ODK Central through api.

This method calls requests.get to download a CSV file with verbal autopsy records from an ODK Collect server.

Returns:Returns a string indicating the number of downloaded records.
Return type:string
Raises:ODKError
mergeToPrevExport()[source]

Merge previous ODK Briefcase export files.

API for OpenVA

class openva_pipeline.openVA.OpenVA(vaArgs, pipelineArgs, odkID, runDate)[source]

Assign cause of death (COD) to verbal autopsies (VA) R package openVA.

This class creates and executes an R script that copies (and merges) ODK Briefcase exports, runs openVA to assign CODs, and creates outputs for depositing in the Transfers DB and to a DHIS server.

Parameters:algorithm (str) – Which VA algorithm should be used to assign COD.
Raises:OpenVAError
copyVA()[source]
Create data file for openVA by merging ODK export files & converting
with pycrossva.
Returns:Indicator of an empty (i.e. no records) ODK export file
Return type:logical
getCOD()[source]

Create and execute R script to assign a COD with openVA; or call the SmartVA CLI to assign COD.

rScript()[source]

Create an R script for running openVA and assigning CODs.

smartVA_to_csv()[source]
Write two CSV files: (1) Entity Value Attribute blob pushed to
DHIS2 (entityAttributeValue.csv)
  1. table for transfer database (recordStorage.csv)

Both CSV files are stored in the OpenVA folder.

API for DHIS2

class openva_pipeline.dhis.DHIS(dhisArgs, workingDirectory)[source]

Class for transfering VA records (with assigned CODs) to the DHIS2 server.

This class includes methods for importing VA results (i.e. assigned causes of death from openVA or SmartVA) as CSV files, connecting to a DHIS2 server with the Verbal Autopsy Program, and posting the results to the DHIS2 server and/or the local Transfer database.

Parameters:
  • dhisArgs ((named) tuple) – Contains parameter values for connected to DHIS2, as returned by transferDB.configDHIS().
  • workingDirectory (string) – Workind direcotry for the openVA Pipeline
Raises:

DHISError

connect()[source]

Setup connection to DHIS2 server.

This creates a connection to DHIS2’s VA Program ID by creating an instance of API. This method also checks that the VA Program and the organization unit can both be found on the DHIS2 server. The configuration settings for connecting to the DHIS2 (e.g., URL, username, password, etc.) are taken from the arguments passed to this method’s class DHIS (these settings can be created using the method Pipeline.config).

Returns:A class instance for interacting with the DHIS2 API.
Return type:Instance of the API class
postVA(apiDHIS)[source]

Post VA records to DHIS.

This method reads in a CSV file (“entityAttribuesValue.csv”) with cause of death results (from openVA) then formats events and posts them to a VA Program (installed on DHIS2 server).

Parameters:apiDHIS (Instance of the API class) – A class instance for interacting with the DHIS2 API created by the method DHIS.connect
Returns:Log information receieved after posting events to the VA Program on a DHIS2 server (see API.post).
Return type:dict
Raises:DHISError
verifyPost(postLog, apiDHIS)[source]

Verify that VA records were posted to DHIS2 server.

Parameters:
  • postLog (dictionary) – Log information retrieved after posting events to a VA Program on a DHIS2 server; this is the return object from DHIS.postVA.
  • apiDHIS (Instance of the API class) – A class instance for interacting with the DHIS2 API created by the method DHIS.connect
Raises:

DHISError

class openva_pipeline.dhis.API(dhisURL, dhisUser, dhisPass)[source]

This class provides methods for interacting with the DHIS2 API.

This class is called by an instance of the DHIS to retrieve information from and post verbal autopsy records (and results) to a DHIS2 server that has the Verbal Autopsy program installed.

Parameters:
  • dhisURL (string) – Web address for DHIS2 server (e.g., “play.dhis2.org/demo”).
  • dhisUser (string) – Username for DHIS2 account.
  • dhisPassword (string) – Password for DHIS2 account.
Raises:

DHISError

get(endpoint, params=None)[source]

GET method for DHIS2 API.

Return type:dict
post(endpoint, data)[source]

POST method for DHIS2 API.

Return type:dict
post_blob(f)[source]

Post file to DHIS2 and return created UID for that file

Return type:str
class openva_pipeline.dhis.VerbalAutopsyEvent(va_id, program, dhis_orgunit, event_date, sex, dob, age, cod_code, algorithm_metadata, odk_id, file_id)[source]

Create DHIS2 event + a BLOB file resource

Parameters:
  • va_id (string) – UID for verbal autopsy record (used as a DHIS2 data element)
  • program (string) – UID of the DHIS2’s Verbal Autopsy program
  • dhis_orgunit (string) – UID for the DHIS2 Organization Unit where the event (death) should be registered.
  • event_date (datetime.date) – Date of death with YYYY-MM-DD format
  • sex (string or integer) – Sex of the deceased (used as a DHIS2 data element). Possible values must fit to an option in the VA Program’s “Sex” optionSet: female, male, missing, unknown). If SmartVA is used to assign cause of death, then sex is an integer with 1 = male and 2 = female).
  • dob (datetime.date) – Date of birth of the deceased with YYYY-MM-DD format (used as a DHIS2 data element)
  • age (integer) – Age (in years) at time of death
  • cod_code (string) – Coded cause of death (must fit to an option in the VA Program’s “CoD codes” optionSet.
  • algorithm_metadata (string) – Code for how the CoD was obtained (must fit in VA Program’s “Algorithm Metadata” optionSet.
  • odk_id (string) – UID for the VA record assigned by the ODK Aggregate server
  • file_id (string) – UID for the blob file (containing the VA data and results) posted to (and assigned by) DHIS2 server.
format_to_dhis2(dhisUser)[source]

Format object to DHIS2 compatible event for DHIS2 API

Parameters:dhisUser – DHIS2 username for account posting the event
Returns:DHIS2 event
Return type:dict
openva_pipeline.dhis.create_db(fName, evaList)[source]

Create a SQLite database with VA data + COD

Parameters:evaList (list) – Event-Value-Attribute data structure with verbal autopsy data, cause of death result, and VA metadata.
Return type:None
openva_pipeline.dhis.getCODCode(myDict, searchFor)[source]

Return COD label expected by (DHIS2) VA Program.

Parameters:searchFor (string) – Cause of Death label returned by openVA.
Return type:str
openva_pipeline.dhis.findKeyValue(key, d)[source]

Return a key’s value in a nested dictionary.

Exceptions

exception openva_pipeline.exceptions.PipelineError[source]

Base class for exceptions in the openva_pipeline module.

exception openva_pipeline.exceptions.DatabaseConnectionError[source]

An error occurred connecting to the Transfer database.

exception openva_pipeline.exceptions.PipelineConfigurationError[source]

An error occurred accessing the Pipeline_Conf table in the DB.

exception openva_pipeline.exceptions.ODKConfigurationError[source]

An error occurred accessing the ODK_Conf table in the DB.

exception openva_pipeline.exceptions.OpenVAConfigurationError[source]

An error occurred accessing the OpenVA_Conf table in the DB.

exception openva_pipeline.exceptions.DHISConfigurationError[source]

An error occurred accessing the DHIS_Conf table in the DB.

exception openva_pipeline.exceptions.ODKError[source]

An error occurred with the odk module.

exception openva_pipeline.exceptions.OpenVAError[source]

An error occurred with the openVA module.

exception openva_pipeline.exceptions.SmartVAError[source]

An error occurred with the openVA module.

exception openva_pipeline.exceptions.DHISError[source]

An error occurred with the dhis module.