*! version 1.15.8.19 19agol2015 *! PARALLEL: Stata module for parallel computing *! by George G. Vega [cre,aut], Brian Quistorff [ctb] *! *! Project website: *! https://github.com/gvegayon/parallel *! Bug reports: *! https://github.com/gvegayon/parallel/issues /* Copyright (c) 2014 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ // Syntax parser cap program drop parallel program def parallel version 11.0 // Checks wether if is parallel prefix or not if (regexm(`"`0'"', "^(do|clean|setclusters|break|version|append|printlog|viewlog)")) { /* If not prefix */ *If user initiated, clean the logs if regexs(0)=="clean" local add_on = cond(regexm(`"`0'"',","), "logs", ", logs") parallel_`0' `add_on' } else if (regexm(`"`0'"', "^(bs|sim)[,]?[\s ]?")) { /* Prefix bootstrap or simulate */ local cmd = regexs(1) local 0 = regexr(`"`0'"', "^(bs|sim)", "") gettoken x 0 : 0, parse(":") bind local 0 = regexr(`"`0'"', "^[:]", "") gettoken x options : x, parse(",") bind gettoken 0 argopt : 0, parse(",") bind parallel_`cmd' `0', argopt(`argopt') `options' } else if (regexm(`"`0'"',"^([,]?.*[:])")) { /* if prefix */ gettoken x 0 : 0, parse(":") bind local 0 = regexr(`"`0'"', "^[:]", "") // Gets the options (if these exists) of parallel gettoken x options : x, parse(",") bind gettoken 0 argopt : 0, parse(",") bind parallel_do `0', `options' prefix argopt(`argopt') } else { local val = regexm(`"`0'"', "^([a-zA-Z0-9_]*).*") local subcommand = regexs(1) di as result `"-`subcommand'-"' as error " invalid parallel subcommand" as text " ({stata h parallel:{it:help parallel}})" exit 198 } end /* Returns the version of parallel */ program def parallel_version, rclass version 11.0 di as result "parallel" as text " Stata module for parallel computing" di as result "vers" as text " 1.15.8.19 (19agol2015)" di as result "auth" as text " George G. Vega [cre,aut], Brian Quistorff [ctb]" return local pll_vers = "1.15.8.19" end /* Take a look to logfiles */ program def parallel_printlog version 11.0 syntax [anything(name=pll_instance)] , [Event(string)] parallel_checklog `pll_instance', e(`event') action(print) end program def parallel_viewlog version 11.0 syntax [anything(name=pll_instance)] , [Event(string)] parallel_checklog `pll_instance', e(`event') action(view) end program def parallel_checklog version 11.0 syntax [anything(name=pll_instance)] , [Event(string)] action(string) if ("`event'"=="") local event = "$LAST_PLL_ID" if ("`event'"!=".") { /* By default uses 1 */ if ("`pll_instance'" == "") local pll_instance 1 local pll_instance : di %04.0f `pll_instance' /* Does de file exists */ local logname = "__pll`event'_do`pll_instance'.log" if (c(os) == "Windows") local logname = "`c(tmpdir)'`logname'" else local logname = "`c(tmpdir)'/`logname'" /* If the logfile does not exists, do nothing*/ cap confirm file "`logname'" if (_rc) { di as result "No logfile for instance -`pll_instance'- of parallel process -`event'- found" exit } /* Showing the log in screen */ if ("`action'"=="print") { di as result "{hline 80}" di as result %~80s "beginning of file -`logname'-" di as result "{hline 80}" type `"`logname'"' di as result "{hline 80}" di as result %~80s "end of file -`logname'-" di as result "{hline 80}" } else view `"`logname'"' } else { di as error "It seems that you haven't use -parallel- yet." exit 601 } end //////////////////////////////////////////////////////////////////////////////// // Splits the dataset into clusters cap program drop parallel_spliter program def parallel_spliter version 11.0 syntax [namelist(name=xtstructure)] [,parallelid(string) sorting(integer 0) force(integer 0) keepusing(varlist)] //args xtstructure parallelid Sorting Force if length("$PLL_CLUSTERS") == 0 { di as err "Number of clusters not fixed." as text " Set the number of clusters with " _newline as err "-{cmd:parallel setclusters #}-" exit 198 } quietly { gen _`parallelid'cut = . if (length("`xtstructure'")) { /* Checks wheather if the data is in the correct sorting */ if !`sorting' & !`force' { error 5 } /* Checking the data types */ foreach var of varlist `xtstructure' { capture confirm string var `var' if (`=_rc') local numvars `numvars' `var' else local strvars `strvars' `var' } } /* Checking types of data */ if (length("`numvars'")) local numvars st_data(.,"`numvars'") else local numvars J(0,0,.) if (length("`strvars'")) local strvars st_data(.,"`strvars'") else local strvars J(0,0,"") /* Processing in MATA */ mata: st_store(., "_`parallelid'cut", parallel_divide_index(`numvars', `strvars')) if (length("`keepusing'")) { keep _`parallelid'cut `keepusing' gen __pllnobs`parallelid' = _n } save __pll`parallelid'_dataset, replace drop _all } end //////////////////////////////////////////////////////////////////////////////// // MAIN PROGRAM *cap program drop parallel_do program def parallel_do, rclass version 11.0 #delimit ; syntax anything(name=dofile equalok everything) [, by(string) Keep KEEPLast prefix Force PROGrams(namelist) Mata NOGlobals KEEPTiming Seeds(string) NOData Randtype(string) Timeout(integer 60) PROCessors(integer 0) argopt(string) KEEPUsing(string) SETparallelid(string) ]; #delimit cr if length("$PLL_CLUSTERS") == 0 { di "{error:You haven't set the number of clusters}" _n "{error:Please set it with: {cmd:parallel setclusters} {it:#}}" exit 198 } // Initial checks foreach opt in macrolist keep keeplast prefix force mata noglobals keeptiming nodata { local `opt' = length("``opt''") > 0 } /* Randtype */ if ("`randtype'" == "") local randtype = "datetime" /* If no data parsing has to be done (because of no data!) */ if (!(c(N)*c(k))) local nodata 1 if (!`keeptiming') { timer clear 98 timer clear 99 } timer on 98 // Delets last parallel instance ran if (`keeplast' & length("`r(pll_id)'")) cap parallel_clean, e(`r(pll_id)') // Gets some global values local sfn = "$S_FN" // Gets the directory where to work at if (!`prefix') { /* First checks if the file exists */ mata: parallel_normalizepath(`"`dofile'"',1) local pll_dir = "`filedir'" local dofile = "`filename'" } else local pll_dir = c(pwd)+"/" local initialdir = c(pwd) qui cd "`pll_dir'" if length("`by'") != 0 { local sortlist: sortedby local sorting = regexm("`sortlist'","^`by'") } else local sorting = 0 /* Creates a unique ID for the process and secures it */ if ("`setparallelid'"=="") mata: parallel_sandbox(5) else local parallelid = "`setparallelid'" global LAST_PLL_ID = "`parallelid'" global LAST_PLL_N = $PLL_CLUSTERS global LAST_PLL_DIR = "`pll_dir'" /* Generates database clusters */ if (!`nodata') parallel_spliter `by' , parallelid(`parallelid') sorting(`sorting') force(`force') keepusing(`keepusing') /* Starts building the files */ quietly { /* Saves mata objects */ if (`mata') { mata: mata mlib create l__pll`parallelid'_mlib, replace cap mata: mata mlib add l__pll`parallelid'_mlib *() cap mata: mata matsave __pll`parallelid'_mata.mmat *, replace if (`=_rc') local matasave = 0 else local matasave = 1 } else local matasave = 0 } /* Writing the dofile */ mata: st_local("errornum", strofreal(parallel_write_do(strtrim(`"`dofile' `argopt'"'), "`parallelid'", $PLL_CLUSTERS, `prefix', `matasave', !`noglobals', "`seeds'", "`randtype'", `nodata', "`pll_dir'", "`programs'", `processors'))) /* Checking if every thing is ok */ if (`errornum') { if (!`nodata') { qui use __pll`parallelid'_dataset, clear // Restores original S_FN (file name) value global S_FN = "`sfn'" /* Removing the cut variable */ qui drop _`parallelid'cut } /* Removes the sandbox file (unprotect the files) */ mata: parallel_sandbox(2, "`parallelid'") error `errornum' } timer off 88 cap timer list if (r(t88) == .) local pll_t_setu = 0 else local pll_t_setu = r(t88) /* Running the dofiles */ timer on 99 mata: st_local("nerrors", strofreal(parallel_run("`parallelid'",$PLL_CLUSTERS,`"$PLL_STATA_PATH"',`=`timeout'*1000'))) timer off 99 /* If parallel finished with an error it restores the dataset */ if (`nerrors' & !`nodata') { qui use __pll`parallelid'_dataset, clear global S_FN = "`sfn'" cap drop _`parallelid'cut } cap timer list if (r(t99) == .) local pll_t_calc = 0 else local pll_t_calc = r(t99) local pll_t_reps = r(nt99) timer on 97 // Paste the databases if (!`nodata' & !`nerrors') { parallel_fusion `parallelid', clusters($PLL_CLUSTERS) keepusing(`keepusing') // Restores original S_FN (file name) value global S_FN = "`sfn'" cap drop _`parallelid'cut } /* Removes the sandbox file (unprotect the files) */ if ("`setparallelid'" == "") { mata: parallel_sandbox(2, "`parallelid'") if (!`keep' & !`keeplast') parallel_clean, e("`parallelid'") } timer off 97 cap timer list if (r(t97) == .) local pll_t_fini = 0 else local pll_t_fini = r(t97) qui timer list return local pll_seeds="`pllseeds'" local pllseeds "" return scalar pll_errs = `nerrors' return local pll_dir "`pll_dir'" return scalar pll_t_reps = `pll_t_reps' return scalar pll_t_setu = `pll_t_setu' return scalar pll_t_calc = `pll_t_calc' return scalar pll_t_fini = `pll_t_fini' return local pll_id = "`parallelid'" return scalar pll_n = $PLL_CLUSTERS qui cd "`initialdir'" end //////////////////////////////////////////////////////////////////////////////// // Cleans all files generated by parallel *cap program drop parallel_clean program def parallel_clean version 11.0 syntax [, Event(string) All Force Logs] if (length("`event'") != 0 & length("`all'") != 0) { di as error `"invalid syntax: Using -pll_id- and -all- jointly is not allowed."' exit 198 } mata: parallel_clean("`event'", `=length("`all'") > 0', `=length("`force'") > 0', `=length("`logs'") > 0') end //////////////////////////////////////////////////////////////////////////////// // Sets the number of clusters as a global macro *cap program drop parallel_setclusters program parallel_setclusters version 11.0 syntax anything(name=nclusters) [, Force Statapath(string asis) Gateway(string) Includefile(string)] local nclusters = real(`"`nclusters'"') if (`nclusters' == .) { di as error `"Not allow: "#" Should be a number"' exit 109 } local force = length("`force'")>0 mata: parallel_setclusters(`nclusters', `force') mata: st_local("error", strofreal(parallel_setstatapath(`"`statapath'"', `force'))) if (`error'){ di as error `"Can not set Stata directory, try using -statapath()- option"' exit `error' } if "`c(mode)'"=="batch" & "`c(os)'"=="Windows" { if `"`gateway'"'==""{ local gateway "pll_gateway.sh" } cap confirm file `"`gateway'"' if _rc { di "path: `c(pwd)'" di as error `"On Windows in batch-mode, parallel requires a gateway file and no such file found."' exit _rc } global PLL_GATEWAY_FNAME `"`gateway'"' } if `"`includefile'"'!=""{ cap confirm file `"`includefile'"' if _rc { di as error `"Can not find the include file (`includefile')."' exit _rc } global PLL_INCLUDE_FILE `"`includefile'"' } end //////////////////////////////////////////////////////////////////////////////// // Exports // Exports a copy of programs *cap program drop program_export program def program_export version 11.0 syntax using/ [,Programlist(string) Inname(string)] mata: program_export("`using'", "`programlist'", "`inname'") end //////////////////////////////////////////////////////////////////////////////// // Appends the clusterized dataset *cap program drop parallel_fusion program def parallel_fusion version 11.0 syntax anything(name=parallelid) , clusters(integer) [keepusing(string)] capture { cap use "__pll`parallelid'_dta0001.dta", clear if (_rc) di "{error:No dataset for instance 0001.}" forval i = 2/`clusters' { cap append using `"__pll`parallelid'_dta`=string(`i',"%04.0f")'.dta"' if (_rc) di "{error:No dataset for instance `=string(`i',"%04.0f")'.}" } /* If it just used a set of variables */ if (length("`keepusing'")) { merge 1:1 __pllnobs`parallelid' using __pll`parallelid'_dataset, keep(3) nogen drop __pllnobs`parallelid' } } end //////////////////////////////////////////////////////////////////////////////// // Checks whether the user pressed break inside a loop *cap program drop parallel_break program def parallel_break version 11.0 mata: parallel_break() end