"""
openva_pipeline.pipeline
------------------------
This module defines the primary API for the openVA Pipeline.
"""
import os
import csv
import datetime
from .transferDB import TransferDB
from .transferDB import DatabaseConnectionError
from .odk import ODK
from .openVA import OpenVA
from .dhis import DHIS
[docs]class Pipeline:
"""Primary API for the openVA pipeline.
This class calls three others to move verbal autopsy data from an ODK
Aggregate server (using the ODK class), through the openVA R package to
assign cause of death (using the OpenVA class), and deposits the VA records
with assigned causes to either/both a DHIS server (using the DHIS class) or
the Transfer database -- a local database which also contains configuration
settings for the pipeline. The TransferDB class performs the final step of
storing the results locally as well as accessing the configuration settings.
:param dbFileName: File name of the Tranfser database.
:type dbFileName: string
:param dbDirectory: str
Path of folder containing the Transfer database.
:type dbDirectory: string
:param dbKey: Encryption key for the Transfer database.
:type dbKey: string
"""
def __init__(self, dbFileName, dbDirectory, dbKey, useDHIS=True):
self.dbFileName = dbFileName
self.dbDirectory = dbDirectory
self.dbKey = dbKey
self.dbPath = os.path.join(dbDirectory, dbFileName)
nowDate = datetime.datetime.now()
self.pipelineRunDate = nowDate.strftime("%Y-%m-%d_%H:%M:%S")
self.useDHIS = useDHIS
[docs] def logEvent(self, eventDesc, eventType):
"""Commit event or error message into EventLog table of transfer database.
:param eventDesc: Description of the event.
:type eventDesc: string
:param eventType: Type of event (error or information)
:type evenType: string
"""
errorFile = os.path.join(self.dbDirectory, "dbErrorLog.csv")
timeFMT = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
if os.path.isfile(errorFile) == False:
try:
with open(errorFile, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(
["Date"] + ["Description"] + ["Additional Information"]
)
except (OSError) as e:
print(str(e) + "...Can't create dbErrorLog.csv")
sys.exit(1)
try:
xferDB = TransferDB(
dbFileName=self.dbFileName,
dbDirectory=self.dbDirectory,
dbKey=self.dbKey,
plRunDate=self.pipelineRunDate,
)
conn = xferDB.connectDB()
c = conn.cursor()
sql = "INSERT INTO EventLog \
(eventDesc, eventType, eventTime) VALUES (?, ?, ?)"
par = (eventDesc, eventType, timeFMT)
c.execute(sql, par)
conn.commit()
conn.close()
except (DatabaseConnectionError) as e:
errorMsg = [timeFMT, str(e), "Committed by Pipeline.logEvent"]
try:
with open(errorFile, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow(errorMsg)
except (PermissionError, OSError) as exc:
print("Can't write to dbErrorLog.csv")
print(errorMsg)
[docs] def config(self):
"""Fetch configuration settings from Transfer DB.
This method queries the Transfer database (DB) and returns objects that
can be used as the arguments for other methods in this class, i.e.,
:meth:`Pipeline.runODK() <runODK>`,
:meth:`Pipeline.runOpenVA() <runOpenVA>`, and
:meth:`Pipeline.runDHIS() <runDHIS>`.
:param dbFileName: File name of the Transfer DB.
(e.g., Pipeline.db)
:type dbFileName: str
:param dbDirectory: Path to the location of the Transfer DB.
:type dbDirectory: str
:param dbKey: Encryption key for the Transfer DB
:type dbKey: str
:param plRunDate: Date when pipeline started latest
run (YYYY-MM-DD_hh:mm:ss).
:type plRunDate: date
:returns: Configuration settings for pipeline steps (e.g. connecting
to ODK Aggregate, running openVA, or posting records to DHIS)
:rtype: dictionary
"""
xferDB = TransferDB(
dbFileName=self.dbFileName,
dbDirectory=self.dbDirectory,
dbKey=self.dbKey,
plRunDate=self.pipelineRunDate,
)
conn = xferDB.connectDB()
settingsPipeline = xferDB.configPipeline(conn)
settingsODK = xferDB.configODK(conn)
settingsOpenVA = xferDB.configOpenVA(
conn, settingsPipeline.algorithm, settingsPipeline.workingDirectory
)
settings = {
"pipeline": settingsPipeline,
"odk": settingsODK,
"openVA": settingsOpenVA,
}
if self.useDHIS:
settingsDHIS = xferDB.configDHIS(conn, settingsPipeline.algorithm)
settings["dhis"] = settingsDHIS
conn.close()
return settings
[docs] def runODK(self, argsODK, argsPipeline):
"""Run check duplicates, copy file, and briefcase.
This method downloads data from either (1) an ODK Central server,
using :meth:`ODK.central() <openva_pipeline.odk.ODK.central>`, or
(2) an ODK Aggregate server using the Java application ODK Briefcase,
by calling the method
:meth:`ODK.briefcase() <openva_pipeline.odk.ODK.briefcase>`. The
configuration settings are taken from the argument
argsODK (see :meth:`Pipeline.config() <config>`)
, and downloads verbal autopsy (VA)
records as a (csv) export from an ODK Central/Aggregate server. If there
is a previous ODK export file, this method merges the files by
keeping only the unique VA records.
:param argsODK: Arguments passed to connect and download records
from the ODK Central/Aggregate server.
:type argsODK: named tuple
:param argsPipeline: Arguments for configuration the openva pipeline.
:type argsPipeline: named tuple
:return: Return value from method subprocess.run()
:rtype: subprocess.CompletedProcess
"""
pipelineODK = ODK(argsODK, argsPipeline.workingDirectory)
pipelineODK.mergeToPrevExport()
if argsODK.odkUseCentral == "True":
odkCentral = pipelineODK.central()
else:
odkBC = pipelineODK.briefcase()
xferDB = TransferDB(
dbFileName=self.dbFileName,
dbDirectory=self.dbDirectory,
dbKey=self.dbKey,
plRunDate=self.pipelineRunDate,
)
conn = xferDB.connectDB()
xferDB.configPipeline(conn)
xferDB.checkDuplicates(conn)
conn.close()
if argsODK.odkUseCentral == "True":
return odkCentral
else:
return odkBC
[docs] def runOpenVA(self, argsOpenVA, argsPipeline, odkID, runDate):
"""Create & run script or run smartva.
This method runs the through the suite of methods in the
:class:`OpenVA <openva_pipeline.openVA.OpenVA>`.
class. The list of tasks performed (in order) are: (1) call the method
:meth:`OpenVA.copyVA() <openva_pipeline.openVA.OpenVA.copyVA>`
to copy over CSV files with VA data (retrieved from ODK Aggregate);
(2) use the method
:meth:`OpenVA.rScript() <openva_pipeline.openVA.OpenVA.rScript>`
to create an R script; and (3) call the method
:meth:`OpenVA.getCOD() <openva_pipeline.openVA.OpenVA.getCOD>` to
run the R script that estimates the causes of death and stores the
results in "OpenVAFiles/recordStorage.csv" and
"OpenVAFiles/entityAttributeValue.csv" (the former serving as the
blob posted to DHIS2).
:param argsOpenVA: Configuration settings for openVA.
:type argsOpenVA: named tuple
:param argsPipeline: Configuration settings for OpenVA Pipeline
:type argsPipeline: named tuple
:param odkID: column/variable name of VA record ID in ODK export
:type odkID: string
:param runDate: date and time when OpenVA Pipeline ran
:type runDate: nowDate.strftime("%Y-%m-%d_%H:%M:%S")
:return: an indicator of zero VA records in the ODK export
:rtype: dictionary
"""
pipelineOpenVA = OpenVA(
vaArgs=argsOpenVA, pipelineArgs=argsPipeline, odkID=odkID, runDate=runDate
)
zeroRecords = pipelineOpenVA.copyVA()
rOut = {"zeroRecords": zeroRecords}
if not zeroRecords:
pipelineOpenVA.rScript()
completed = pipelineOpenVA.getCOD()
# rOut["completed"] = completed
rOut["returncode"] = completed.returncode
return rOut
[docs] def runDHIS(self, argsDHIS, argsPipeline):
"""Connect to API and post events.
This method first calls the method
:meth:`DHIS.connect() <openva_pipeline.dhis.DHIS.connect>`
to establish a connection with a DHIS2 server and, second
calls the method
:meth:`DHIS.postVA() <openva_pipeline.dhis.DHIS.postVA>` to
post VA data, the assigned causes of death, and associated
metadata (concerning cause assignment).
:param argsDHIS: Configuration settings for connecting to DHIS2 server.
:type argsOpenVA: named tuple
:param argsPipeline: Configuration settings for OpenVA Pipeline
:type argsPipeline: named tuple
:return: VA Program ID from the DHIS2 server, the log from
the DHIS2 connection, and the number of records posted to DHIS2
:rtype: dictionary
"""
pipelineDHIS = DHIS(argsDHIS, argsPipeline.workingDirectory)
apiDHIS = pipelineDHIS.connect()
postLog = pipelineDHIS.postVA(apiDHIS)
pipelineDHIS.verifyPost(postLog, apiDHIS)
dhisOut = {
"vaProgramUID": pipelineDHIS.vaProgramUID,
"postLog": postLog,
"nPostedRecords": pipelineDHIS.nPostedRecords,
}
return dhisOut
[docs] def storeResultsDB(self):
"""Store VA results in Transfer database."""
xferDB = TransferDB(
dbFileName=self.dbFileName,
dbDirectory=self.dbDirectory,
dbKey=self.dbKey,
plRunDate=self.pipelineRunDate,
)
conn = xferDB.connectDB()
xferDB.configPipeline(conn)
xferDB.storeVA(conn)
conn.close()
[docs] def closePipeline(self):
"""Update ODK_Conf ODKLastRun in Transfer DB and clean up files.
This method calls methods in the
:class:`TransferDB <openva_pipeline.transferDB.TransferDB>`
class to remove the data files created at each step of the
pipeline. More specifically, it runs
:meth:`TransferDB.cleanODK() <openva_pipeline.transferDB.TransferDB.cleanODK>`
to remove the ODK Briefcase export files ("ODKFiles/odkBCExportNew.csv"
and "ODKFiles/odkBCExportPrev.csv") if they exist;
:meth:`TransferDB.cleanOpenVA() <openva_pipeline.transferDB.TransferDB.cleanOpenVA>`
to remove the input data file ("OpenVAFiles/openVA_input.csv") and the
output files ("OpenVAFiles/recordStorage.csv",
"OpenVAFiles/newStorage.csv", and
"OpenVAFiles/entityAttributeValue.csv") -- note that all of these
results are stored in either/both of the Transfer DB and the DHIS2
server's VA program; and, third, the method
:meth:`TransferDB.cleanDHIS() <openva_pipeline.transferDB.TransferDB.cleanDHIS>`
is called to remove the blobs posted to the DHIS2 server and stored in the
folder "DHIS/blobs". Finally, this method updates the Transfer DB's
value in the ODK_Conf table's variable odkLastRun so the next ODK
Export file does not include VA records already processed through the
pipeline.
"""
xferDB = TransferDB(
dbFileName=self.dbFileName,
dbDirectory=self.dbDirectory,
dbKey=self.dbKey,
plRunDate=self.pipelineRunDate,
)
conn = xferDB.connectDB()
xferDB.configPipeline(conn)
xferDB.cleanODK()
xferDB.cleanOpenVA()
xferDB.cleanDHIS()
xferDB.updateODKLastRun(conn, self.pipelineRunDate)
conn.close()