Source code for openva_pipeline.runPipeline

# ------------------------------------------------------------------------------#
#    Copyright (C) 2018  Jason Thomas, Samuel J. Clark, & Martin Bratschi
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
# ------------------------------------------------------------------------------#

import sys
import os
import requests
from pysqlcipher3 import dbapi2 as sqlcipher

from openva_pipeline.pipeline import Pipeline
from openva_pipeline.exceptions import PipelineError
from openva_pipeline.exceptions import DatabaseConnectionError
from openva_pipeline.exceptions import PipelineConfigurationError
from openva_pipeline.exceptions import ODKError
from openva_pipeline.exceptions import OpenVAError
from openva_pipeline.exceptions import SmartVAError
from openva_pipeline.exceptions import DHISError


[docs]def createTransferDB(database_file_name, database_directory, database_key): """Create the (SQLite encrypted) Transfer Database. :param database_file_name: File name for the Transfer Database. :param database_directory: Path of the Transfer Database. :param datatbase_key: Encryption key for the Transfer Database :param export_to_DHIS: Indicator for posting VA records to a DHIS2 server. """ dbPath = os.path.join(database_directory, database_file_name) sqlPath = os.path.abspath( os.path.join(os.path.dirname(__file__), "sql/pipelineDB.sql") ) try: conn = sqlcipher.connect(dbPath) except (sqlcipher.DatabaseError, sqlcipher.OperationalError) as e: raise DatabaseConnectionError("Unable to create database..." + str(e)) try: parSetKey = '"' + database_key + '"' conn.execute("PRAGMA key = " + parSetKey) # c = conn.cursor() except (sqlcipher.DatabaseError, sqlcipher.OperationalError) as e: raise DatabaseConnectionError("Unable to set encryption key..." + str(e)) try: with open(sqlPath, "r", newline="\n", encoding="utf-8") as sqlScript: # c.executescript(sqlScript.read()) conn.executescript(sqlScript.read()) except (sqlcipher.DatabaseError, sqlcipher.OperationalError) as e: raise DatabaseConnectionError( "Problem running script (pipelineDB.sql)..." + str(e) )
[docs]def runPipeline( database_file_name, database_directory, database_key, export_to_DHIS=True ): """Runs through all steps of the OpenVA Pipeline This function is a wrapper for the Pipeline class, which runs through all steps of the OpenVA Pipeline -- (1) connect to Transfer Database (to retrieve configuration settings); (2) connect to ODK Aggregate to download a CSV file with VA records; (3) run openVA (or SmartVA) to assign cause of death; and (4) store CoD results and VA data in the Transfer Database as well as a DHIS2 VA Program (if requested). :param database_file_name: File name for the Transfer Database. :param database_directory: Path of the Transfer Database. :param datatbase_key: Encryption key for the Transfer Database :param export_to_DHIS: Indicator for posting VA records to a DHIS2 server. :type export_to_DHIS: (Boolean) """ pl = Pipeline( dbFileName=database_file_name, dbDirectory=database_directory, dbKey=database_key, useDHIS=export_to_DHIS, ) try: settings = pl.config() except PipelineConfigurationError as e: pl.logEvent(str(e), "Error") sys.exit(1) settingsPipeline = settings["pipeline"] settingsODK = settings["odk"] settingsOpenVA = settings["openVA"] settingsDHIS = settings["dhis"] try: pl.runODK(settingsODK, settingsPipeline) pl.logEvent("ODK Export Completed Successfully", "Event") except ODKError as e: pl.logEvent(str(e), "Error") sys.exit(1) try: rOut = pl.runOpenVA( settingsOpenVA, settingsPipeline, settingsODK.odkID, pl.pipelineRunDate ) pl.logEvent("OpenVA Analysis Completed Successfully", "Event") except (OpenVAError, SmartVAError) as e: pl.logEvent(str(e), "Error") sys.exit(1) if rOut["zeroRecords"] == True: pl.logEvent("No new VA records from ODK (now exiting)", "Event") sys.exit(0) if export_to_DHIS: try: pl.runDHIS(settingsDHIS, settingsPipeline) pl.logEvent("Posted Events to DHIS2 Successfully", "Event") except DHISError as e: pl.logEvent(str(e), "Error") sys.exit(1) try: pl.storeResultsDB() pl.logEvent("Stored Records to Xfer Database Successfully", "Event") except (PipelineError, DatabaseConnectionError, PipelineConfigurationError) as e: pl.logEvent(str(e), "Error") sys.exit(1) try: pl.closePipeline() pl.logEvent("Successfully Completed Run of Pipeline", "Event") except (DatabaseConnectionError, DatabaseConnectionError) as e: pl.logEvent(str(e), "Error") sys.exit(1)
def downloadBriefcase(): """Download the ODK Briefcase (v1.18.0) jar file from Git Hub.""" bcURL = "https://github.com/getodk/briefcase/releases/download/v1.18.0/ODK-Briefcase-v1.18.0.jar" try: with open("ODK-Briefcase-v1.18.0.jar", "wb") as bcFile: r = requests.get(bcURL) bcFile.write(r.content) os.chmod("ODK-Briefcase-v1.18.0.jar", 0o744) except (requests.RequestException, IOError) as e: raise ODKError("Error downloading Briefcase: {}".format(str(e))) def downloadSmartVA(): """Download the smartva (linux) binary application file from Git Hub.""" smartvaURL = ( "https://github.com/ihmeuw/SmartVA-Analyze/releases/download/v2.0.0/smartva" ) try: with open("smartva", "wb") as smartvaBinary: r = requests.get(smartvaURL) smartvaBinary.write(r.content) os.chmod("smartva", 0o777) except (requests.RequestException, IOError) as e: raise SmartVAError("Error downloading smartva: {}".format(str(e))) # if __name__ == "__main__": # runPipeline(database_file_name= "run_Pipeline.db", # database_directory = "tests", # database_key = "enilepiP")