*! vers 0.14.6.24 24jun2014
*! auth George G. Vega

program def parallel_append

	vers 11.0

	#delimit ;
	syntax [anything(name=files)] , Do(string asis) [
		in(string asis) 
		if(string asis)
		Expression(string) *];
	#delimit cr
			
	if ("`in'" != "") local in in `in'
	if ("`if'" != "") local if if `if'

	/* Checking arguments */
	if (`"`files'"' == "" & `"`expression'"' == "") {
		di as error "One of -files- or -expr()- must be specified."
		error 1
	}
	else if (`"`anything'"' != "" & `"`expression'"' != "") {
		di as error "-files- and -expr()- cannot be specified at the same time"
		error 1
	}

	/* Expanding the expression */
	if (`"`expression'"' != "") {
		mata: st_local("files",parallel_expand_expr(`"`expression'"'))
	}
	else if (regexm(`"`files'"', "[*]")) local files : dir . files "`files'"
	
	/* Checking cmd/dofile */
	cap confirm file `"`do'"'
	if (_rc) cap confirm file `"`do'.do"'
	if (!_rc) local do = `"`do'.do"'
	else {
		cap `do'
		if (_rc == 199) {
			di as error `"Error: No file or cmd nammed -`do'-"'
			exit 199
		}
	}

	/* Checking out the number of files */
	tokenize `"`files'"'
	local n = 0
	local i = 0
	local nerr = 0
	while ("``++n''" != "") {
		
		/* Checking whether it exists*/
		local fn = `"``n''"'
		if (!regexm(`"`fn'"',"\.dta$")) cap confirm file `"``n''.dta"'
		else cap confirm file `"``n''"'

		if (!_rc) {
			if (!regexm(`"`fn'"',"\.dta$")) local ext = ".dta"
			else local ext = ""
			local file`++i' = "``n''`ext'"
		}
		else {
			di "{result:Warning:}{text:The file -``n''.dta- couldn't be found.}"
		}
	}
	local n = `i'

	/* If no files had been found */
	if (!`n') {
		di as error "No files found"
		error 1
	}

	/* Showing the files that will be used */
	di "{result:The following files will be processed:}"
	forval i=1/`n' {
		di " " as result %03.0f `i' as text " `file`i''"
	}
	
	/* Checking the groups clusters */
	local size = `n'/$PLL_CLUSTERS

	local oldclusters = $PLL_CLUSTERS
	local olddir = $PLL_STATA_PATH
	if (`size' < 1) {
		qui parallel setclusters `n', statapath(`olddir') f
		local g = 1
		forval i=1/`n' {
			local group`g' `group`g'' `file`i''
		}
	}
	else {
		/* Grouping files */
		local g = 1
		forval i=1/`n' {
			local group`g' `group`g'' `file`i''
			if (!mod(`i',$PLL_CLUSTERS)) local ++g
		}
	}
	
	local ng = ceil(`size')
	
	di "{result:The files will be processed in the following order:}"
	forval i=1/`ng' {
		di " " as result %03.0f `i' as text " `group`i''"
	}
	
	/* Getting a common id for the files */
	mata: parallel_sandbox(5)
	local parallelid0 = "`parallelid'"
	local tmpid = "__pll`parallelid'_append"
	mkdir `tmpid'
		
	local nsave = 0
	forval i=1/`ng' {
		
		/* Writing the file */	
		local f `tmpid'.do
		qui file open fh using `f', w replace
		
		tokenize `group`i''
		local j = 0
		local k = 0
		file write fh "cd `c(pwd)'" _newline
		while (`"``++j''"' != "") {
			file write fh `"if (\`pll_instance' == `++k') {"' _newline
			file write fh `"    use ``j'' `if' `in'"' _newline 
			file write fh `"    local filename = `"``j''"'"' _newline
			file write fh `"    local tmpn = string(`++nsave',"%04.0f")"'_newline _newline "}" _newline
		}
		
		cap findfile `do'
		if (_rc) {
			file write fh `"`do'"' _newline
		}
		
		file write fh `"gen dta_source = "\`filename'""' _newline
		file write fh "compress" _newline
		file write fh "save `tmpid'/`tmpid'\`tmpn', replace" _newline
		
		file close fh

		qui parallel setclusters `--j', s(`olddir') f

		mata: parallel_sandbox(5)
		local parallelid`i' = "`parallelid'"
		cap noi parallel do `f', `options' nodata setparallelid(`parallelid')

		/* Checking if an error has occurred */
		if (_rc) {
			
			mata: parallel_sandbox(2, "$LAST_PLL_ID")
			qui parallel clean, e($LAST_PLL_ID)
			forval j=0/`i' {
				mata: parallel_sandbox(2, "`parallelid`j''")
				qui parallel clean, e(`parallelid`j'')
			}
			qui parallel setclusters `oldclusters', s(`olddir') f
			di as error "An error -`=_rc'- has occured while running parallel"
			exit 1
			
		}
		else if (r(pll_errs)) {
			/*
			forval j=0/`i' {
				mata: parallel_sandbox(2, "`parallelid`j''")
				qui parallel clean, e(`parallelid`j'')
			}
			qui parallel setclusters `oldclusters', s(`olddir') f
			exit 1
			*/
			di "{result:Warning:}{text: Some datasets in group -`g'- couldn't be processed}"
		}

		rm `f'
	}

	qui clear
	
	/* Appending all the results */
	forval i=1/`nsave' {
		local tmpn = "`tmpid'"+string(`i',"%04.0f")+".dta"
		cap {
			append using `tmpid'/`tmpn'
			rm `tmpid'/`tmpn'
		}
		
		if (!c(N)) cap use `tmpid'/`tmpn'
		if (_rc) local err `err' `file`i''
	}
	
	/* Labeling */
	quietly {
		if (c(N)) {
			encode dta_source, gen(dta_source2)
			drop dta_source
			ren dta_source2 dta_source 
			lab var dta_source "Original dataset of the observation"
		}
	}
	
	/* Removing the tmp dir and free id */
	forval i = 0/`ng' {
		mata: parallel_sandbox(2, "`parallelid`i''")
		qui parallel clean, e(`parallelid`i'')
	}

	if (`"`err'"'!="") di "{result:Warning:}{text:The following files could't be found}" _newline as text `"`=regexr(`"`err'"',"^[0]","")'"'

	qui parallel setclusters `oldclusters', s(`olddir') f
	
end