#!/usr/bin/ruby

#########################################################################
#
#  The Contents of this file are made available subject to the terms of
#  the Sun Industry Standards Source License Version 1.2
#
#  Sun Microsystems Inc., March, 2006
#
#
#  Sun Industry Standards Source License Version 1.2
#  =================================================
#  The contents of this file are subject to the Sun Industry Standards
#  Source License Version 1.2 (the "License"); You may not use this file
#  except in compliance with the License. You may obtain a copy of the
#  License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
#
#  Software provided under this License is provided on an "AS IS" basis,
#  WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
#  WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
#  MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
#  See the License for the specific provisions governing your rights and
#  obligations concerning the Software.
#
#   The Initial Developer of the Original Code is: Sun Microsystems, Inc.
#
#   Copyright: 2006 by Sun Microsystems, Inc.
#
#   All Rights Reserved.
#
#########################################################################
# TODO: 
# - provide means to restart entire flows with failed flowjobs be rerun only
# - support bulk jobs
# - allow DRMAA user hold be used despite user hold be used by flow itself
#########################################################################

require 'drmaa'


# ------------------------------------------------------------------------------------------
# Exceptions thrown during parsing stage

class ParsingFunction < ArgumentError ; end
class ParsingFormat < ArgumentError ; end


# ------------------------------------------------------------------------------------------
# The FlowFunction classes represent the entities found in the flowfile.

class FlowFunction
end
class JobsInParallel < FlowFunction
	attr_accessor :par
	def make(key, vars, depend, depth, select)
		do_it = select_func?(key, vars, select)
	
		all_jobs = Array.new
		@par.each { |sub| 
			name = sub[0]
			if do_it
				flowprint(depth, "PARALLEL: " + name)
			end
			new_vars = sub[1]
			sub_vars = vars.dup
			if ! new_vars.nil?
				new_vars.each_pair { |var,val| sub_vars[var] = val }
			end
			j = $flowfunction[name]
			if j.nil?
				raise ParsingFunction.new("#{key}(): flow function \"#{name}\" does not exit")
			end
			if do_it
				jobs = j.make(name, sub_vars, depend, depth+1, nil)
			else
				jobs = j.make(name, sub_vars, depend, depth+1, select)
			end
			if ! jobs.nil?
				all_jobs += jobs
			end
		}
		if all_jobs.size != 0
			return all_jobs
		else
			return nil
		end
	end
end

class JobsInSequence  < FlowFunction
	attr_accessor :seq
	def make(key, vars, depend, depth, select)
		do_it = select_func?(key, vars, select)
		first = true
		@seq.each { |sub| 
			name = sub[0]
			flowprint(depth, "SEQUENTIAL: " + name) if do_it
			new_vars = sub[1]
			sub_vars = vars.dup
			if ! new_vars.nil?
				new_vars.each_pair { |var,val| sub_vars[var] = val }
			end
			j = $flowfunction[name]
			if j.nil?
				raise ParsingFunction.new("#{key}: flow function \"#{name}\" does not exit")
			end
			if do_it
				depend = j.make(name, sub_vars, depend, depth+1, nil)
			else
				depend = j.make(name, sub_vars, depend, depth+1, select)
			end
		}
		return depend
	end
end

class RunnableJob < FlowFunction
	attr_accessor :attrs, :njobs
	def initialize
		@njobs = 0
	end

	def make(key, vars, depend, depth, select)
		@njobs += 1
		job_key = key + "#" + @njobs.to_s

		do_it = select_func?(key, vars, select)

		fj_attrs = Array.new
		@attrs.each_pair { |name,t|
			value = substitute(t, vars)
			fj_attrs.push([ name, value ])
		}
		if depend.nil?
			f = FlowJob.new(nil, fj_attrs)
			flowprint(depth, job_key + "(" + comma_vars(vars) + ")") if do_it
		else 
			f = FlowJob.new(depend.dup, fj_attrs)
			flowprint(depth, job_key + "(" + comma_vars(vars) + ") waiting for " + comma_jobs(f.depend, ", ")) if do_it
		end
		fj_attrs.each { |a| flowprint(depth+1, a[0] + "=\"" + a[1] + "\"") } if do_it
		f.presubproc(job_key)
		f.verify(job_key)

		if ! do_it
			$not_selected += 1
			return [ ]
		end

		$flowjob[job_key] = f
		return [ job_key ]
	end
end

def flowprint(depth, s)
   return if ! $parse_only
   (depth*3).times { putc " " } ; puts s
end

def comma_vars(vars)
s = ""
first = true
vars.each_pair { |var,val|
	if first == false
		s += ", " 
	else
		first = false
	end
	s += var + "=" + val
}
return s
end

def comma_jobs(jobs, sep = ",")
s = ""
first = true
jobs.each { |job|
	if first == false
		s += sep
	else
		first = false
	end
	s += job
}
return s
end

def substitute(s, vars)
vars.each_pair { |var,val|
	s = s.sub(var, val)
}
return s
end

# parses name1=value1,... into a Hash
# used both for params and attrs
def var_list(str)
	vars = Hash.new
	if ! str.nil?
		str.strip.scan(/[^,][^,]*/) { |vardef|
			n = vardef.strip.scan(/[^=][^=]*/)
			vars[n[0].strip] = n[1].strip
		}
	end
	return vars
end

# decide if a paricular flow call was selected as target
def select_func?(k1, vrs1, select)
	return true if select.nil?

	k2 = select[0]
	vrs2 = select[1]
	if k1 != k2 or vrs1.size < vrs2.size
		return false
	end

	vrs2.each_pair { |k,v|
		if ! vrs1.has_key?(k) or vrs1[k] != v
			return false
		end
	}

	return true
end

# return name of first function
def parse_flow(file)
	all = nil
	begin
		IO::foreach(file) { |line|
			case line
			when /^#/
				next
			else
				# crack line
				function = line.sub(/[ ]*=.*$/, "").strip
				val = line.sub(/^[^=]*=/, "").strip
				if all.nil?
					all = function
				end

				# runnable job
				if ! val.index("{").nil?
					r = RunnableJob.new
					jobdef = val.scan(/[^{}][^{}]*/)[0].strip
					r.attrs = var_list(jobdef)
					$flowfunction[function] = r

				# jobs in parallel
				elsif ! val.index("&").nil?
					p = JobsInParallel.new
					p.par = Array.new
					val.scan(/[^&][^&]*/) { |sub| p.par << parse_flowcall(sub) }
					$flowfunction[function] = p

				# jobs in sequence
				elsif ! val.index("|").nil?
					s = JobsInSequence.new
					s.seq = Array.new
					val.scan(/[^|][^|]*/) { |sub| s.seq << parse_flowcall(sub) }
					$flowfunction[function] = s

				else
					# parsing code possibly is not yet good enoug -- sorryh
					raise ParsingFormat.new("flow file may not have empty lines")
				end
			end
		}
	end
	return all
end

def parse_flowcall(s)
	jobdef = s.strip.scan(/[^()][^()]*/)
	key = jobdef[0].strip
	vars = var_list(jobdef[1])
	return [ key, vars ]
end


# ------------------------------------------------------------------------------------------
# At end of parsing stage there is one FlowJob for each job to be run.
# The FlowJob also keeps state information, dependency information and 
# job finish information.

class FlowJob 
	# configuration
	attr_accessor :attrs, :depend
	# state information
	attr_accessor :jobid, :info 
	def initialize(depend, attrs)
		@depend = depend
		@attrs = attrs
	end
	# -- verification
	def verify(key)
		cmd = false
		@attrs.each { |a|
			name = a[0]
			value = a[1]
			if value.index('$')
				raise ParsingFunction.new("#{key}: #{name}=#{value} contains \"$\"")
			end
			case name
			when "cmd"
				if value.index('/') == 0
					if ! File.executable?(value)
						raise ParsingFunction.new("#{key}: cmd=#{value} must be executable")
					end
				else
					if executable_cmd(value).nil?
						raise ParsingFunction.new("#{key}: could't find cmd=#{value} in CMDPATH")
					end
				end
				cmd = true
			when "join", "nomail"	
				true_or_false?(key, name, value)
			when "args", "name", "nat", "cat", "wd", "in", "out", "err", "join", "trans", "mail"
			else
				# bug: must use DRMAA.get_attribute_names() to detect use of invalid attributes
				raise ParsingFunction.new("#{key}: unknown attribute \"#{name}\"")
			end
		}
		if !cmd
			raise ParsingFunction.new("#{key}: missing mandatory attribute \"cmd\"")
		end
	end
	def presubproc(job_key)
		if defined? FlowRC.presubmit_proc
			FlowRC.presubmit_proc(job_key, @attrs)
		end
	end
	def executable_cmd(cmd)
		path = nil
		$CMDPATH.each { |p|
			if File.executable?(p + "/" + cmd)
				path = p + "/" + cmd
				break
			end
		}
		return path
	end
	def true_or_false?(key, name, value)
		case value
		when "0", "false", "no", "n"
			return false
		when "1", "true", "yes", "y"
			return true
		else
			raise ParsingFunction.new("#{key}: \"#{name}=#{value}\" is neither \"true\" nor \"false\"")
		end
	end

	def submit(key, predecessors)
		if $MAX_JOBS != 0 and $jobs_in_drm == $MAX_JOBS 
			return false
		end
		jt = DRMAA::JobTemplate.new

      # job defaults
		jt.name = key # flow job name
		if $flowdir.nil?
			jt.wd = $wd
			jt.stdout = ":/dev/null"
			jt.join = true
		else
			jt.wd = $flowdir
			jt.stdout = ":#{$flowdir}/#{key}.o"
			jt.stderr = ":#{$flowdir}/#{key}.e"
			jt.join = false
		end

		native = nil

		attrs.each { |a|
			name = a[0]
			value = a[1]
			case name
			when "cmd"
				if value.index("/") == 0 
					jt.command = value
				else
					jt.command = executable_cmd(value)
				end
			when "args"
				jt.arg = value.split(" ")
			when "env"
				jt.env = value.split(",")
			when "name"
				jt.name = value
			when "nat"
				native = value
			when "cat"	
				jt.category = value
			when "hold"	
				# careful! hold is used by flow itself
				# jt.hold = true_or_false?(key, name, value)
			when "wd"
				jt.wd = value
			when "in"	
				jt.stdin = value
			when "out"	
				jt.stdout = value
			when "err"	
				jt.stderr = value
			when "join"	
				jt.join = true_or_false?(key, name, value)
			when "trans"	
				jt.transfer = value

			when "mail"
				jt.mail = value.split(",")
			when "nomail"	
				jt.block_mail = true_or_false?(key, name, value)
			end
		}

		if ! predecessors.nil?
			if $drm_depend 
				if native.nil?
					jt.native = "-hold_jid " + predecessors
				else
					jt.native = native + " -hold_jid " + predecessors
				end
			else
				jt.hold = true
				jt.native = native unless native.nil?
			end
		else
			jt.native = native unless native.nil?
		end

		begin
			jobid = $session.run(jt)
			$already_submitted += 1
			$last_submission = Time.now
			@jobid = jobid
			if ! predecessors.nil?
				puts "#{key} " + jobid + " submitted depending on " + predecessors
			else
				puts "#{key} " + jobid + " submitted"
			end
		rescue DRMAA::DRMAATryLater
			STDERR.puts "... try later (#{key})"
			return false
		end
		$jobs_in_drm += 1
		return true
	end

	# true, if all predecessors done 
	def is_due? 
		return true if @depend.nil?

		self.depend.each { |key|
			info = $flowjob[key].info
			if info.nil? 
				return false # not yet finished
			end
			if ! info.wifexited? or info.wexitstatus != 0
				return false # failed
			end
		}

		return true
	end

	def can_submit 
		# now   --> [0, jobids]
		# later --> [1, nil]
		# never --> [2, nil]
		r = 0
		jobids = nil
		self.depend.each { |key|
			node = $flowjob[key]

			info = node.info
			if ! info.nil?
				if !info.wifexited? or info.wexitstatus != 0
					return [ 2, nil] # failed
				else
					next # done
				end
			end

			jobid = node.jobid
			if jobid.nil?
				r = 1 # predecessor not yet submitted
			else
				# collect jobids
				if jobids.nil?
					jobids = jobid
				else
					jobids += "," + jobid
				end
			end
		}
		if r == 1
			return [1,nil]
		else
			return [0,jobids]
		end
	end
end


# ------------------------------------------------------------------------------------------
# The functions below are used by main to run the workflow  and cause 
# successor jobs be submitted/released once they are due.

# Workflow optimization requires job be submitted in order
# pass (1): jobs without predecessors or with all predecessors run
# pass (2): jobs whose predecessors are submitted
# aims is as broad as possible flow submission.
def submit_jobs(flush)

	if $flowjob.size == $already_submitted or $terminate_session
		# STDERR.puts "all jobs submitted"
		return true # all submitted
	end

	if ! flush 
		if $last_submission != 0 and (Time.now - $last_submission) < $STREAMING_RETRY
			# puts "... retry not yet reached"
			return false # retry not yet reached
		end
	end

	# STDERR.puts "1st pass"
	$flowjob.each_pair { |key,fj|
		next if ! fj.jobid.nil? # already submitted
		next if ! fj.info.nil? # already finished

		# all predecessors done 
		next if ! fj.is_due? 

		if ! fj.submit(key, nil)
			return false # try again
		end

		if $terminate_program
			exit 1
		elsif $terminate_session
			terminate()
			return true
		end
	}

	begin
		# STDERR.puts "2nd pass"
		all_submitted = true

		$flowjob.each_pair { |key,fj|
			next if ! fj.jobid.nil? # already submitted
			next if ! fj.info.nil? # already finished

			# analyze predecessors
			status = fj.can_submit()
			if status[0] != 0 
				all_submitted = false if status[0] == 1 
				next
			end
			predecessors = status[1]

			if ! fj.submit(key, predecessors)
				return false # try again
			end

			if $terminate_program
				exit 1
			elsif $terminate_session
				terminate()
				return true
			end
		}
	end until all_submitted

	return true # all submitted
end

def reap_jobs

	$session.wait_each(1) { |info|

		# delete workflow upon user interrupt
		if $terminate_program
			exit 1
		elsif $terminate_session
			terminate()
		end

		# nothing happend
		if info.nil?
			submit_jobs(false)
			next
		end
		$jobs_in_drm -= 1

		# interpret job finish information
		if info.wifaborted? 
			failed = true
			happend = "aborted"
			caused = "terminated"
		elsif info.wifsignaled? 
			failed = true
			happend = "died from " + info.wtermsig 
			happend += " (core dump)" if info.wcoredump?  
			caused = "terminated"
		elsif info.wifexited?
			exit_status = info.wexitstatus
			if  exit_status != 0
				failed = true
				happend = "exited with " + exit_status.to_s
				caused = "terminated"
			else
				failed = false
				happend = "done"
				caused = "released"
			end
		end

		# search flow job
		job_key = nil
		fjob = nil
		$flowjob.each_pair { |k,v|
			if v.jobid.nil?
				next 
			end
			if v.jobid == info.job
				job_key = k
				fjob = v
				break
			end
		}
		if fjob.nil?
			puts "missing flow job for finished job " + info.job
			exit 1
		end

		# mark flow job as done
		fjob.info = info
		fjob.jobid = nil

		trigger = Array.new
		if ! $terminate_session
			# drive conclusions
			$flowjob.each_pair { |k,v|
				# finished and non-blocked ones: skip 
				next if ! v.info.nil? or v.depend.nil? or v.jobid.nil?
				# dependend to others: skip 
				next if ! v.depend.include?(job_key)
				
				if failed
					begin
						$session.terminate(v.jobid)
					rescue DRMAA::DRMAAInvalidJobError
					end
					trigger << v.jobid
				else
					do_rls = true
					v.depend.each { |k|
						do_rls = false if $flowjob[k].info.nil?
					}
					if do_rls and ! $drm_depend 
						$session.release(v.jobid)
						trigger << v.jobid
					end
				end
			}
		end

		# report what happend
		if trigger.size == 0
			puts "#{job_key} #{info.job} " + happend
		else
			puts "#{job_key} #{info.job} " + happend + " " + caused + " " + comma_jobs(trigger, ", ")
		end

		submit_jobs(false)
	}
end

# show final statistics
def final_report
	nfailed = 0
	nrun = 0
	nnotrun = 0

	rusage = Hash.new
	$flowjob.each_pair { |k,v|
		if v.info.nil?
			nnotrun += 1
			next 
		end
		if ! v.info.wifexited? or v.info.wexitstatus != 0
			nfailed += 1
		else
			nrun += 1
		end
		usage = v.info.rusage
		next if usage.nil?
		usage.each_pair { |name,value| 
			if $USAGE_REPORT.include?(name)
				if ! rusage.has_key?(name)
					rusage[name] = value.to_f
				else
					rusage[name] += value.to_f
				end
			end
		}
	}
	puts "# ---- final report"
	rusage.each_pair { |name,value|
		printf("usage: #{name} = %-7.2f\n", value)
	}
	puts "run: #{nrun} failed: #{nfailed} notrun: #{nnotrun}"
end

def terminate
	if ! $did_terminate
		STDERR.puts "Terminate!"
		$session.terminate
		$did_terminate = true
	end
end

def handle_signal
	if ! $terminate_session 
		$terminate_session = true
	elsif ! $terminate_program 
		$terminate_program = true
	end
end

def usage(ret)
	if ret == 0
		out =  STDOUT
	else
		out = STDERR
	end
	out.puts "usage: flow.rb [options] workflow.ff [start]"
	out.puts "  options: -verify         only parse and verify the flow"
	out.puts "           -dd             use DRM dependencies"
	out.puts "           -flowdir <path> flowdir is used as defaults"
	out.puts "  start:   <flowcall> -->  TEST or TEST($arch=solaris)"
	exit ret
end

# ------------------------------------------------------------------------------------------
#    main

# use defaults 
# (1) from ./.flowrc.rb 
# (2) from $HOME/.flowrc.rb 
# (3) or built-in ones

read_rc_file = false
if FileTest.exist?('.flowrc.rb')
	require '.flowrc'
	read_rc_file = true
elsif FileTest.exist?(ENV["HOME"] + "/.flowrc.rb")
	require ENV["HOME"] + "/.flowrc.rb"
	read_rc_file = true
end

if ! read_rc_file
	$CMDPATH = Dir::getwd()
	$STREAMING_RETRY = 5
	$USAGE_REPORT = [ ]
	$MAX_JOBS = 0
else
	$CMDPATH = FlowRC::CMDPATH
	$STREAMING_RETRY = FlowRC::STREAMING_RETRY
	$USAGE_REPORT = FlowRC::USAGE_REPORT
	$MAX_JOBS = FlowRC::MAX_JOBS
end

# The flowdir is used in a number of cases to have reasonable 
# defaults. Thus it makes some difference if flowdir was 
# specified or not:
# 
# wd (drmaa_wd)
#    The flowdir is used as jobs' default working directory.
#    Without flowdir the current working directory is simply 
#    used. Though each jobs' working directory can also be
#    specified within the flowfile, but if they have to that
#    would make them harder to read by humans.  
#
# out/err/join (drmaa- stdout_path/stderr_path/join)
#    Without flowdir "/dev/null" is used as default for 'out'
#    and 'join' is true. Reason is there were no better 
#    default to store job output/error files than the
#    current working directory, but if that were used
#    it might incidentally happen that masses of job 
#    output files are dumped in some directory. If flowdir
#    was specified at command line it is used as default
#    for storing job output and error separately in 
#    $flowdir/<flowjobname>.o and $flowdir/<flowjobname>.o.
#
# cmd (drmaa_remote_command)
# args (drmaa_argv)
# env (drmaa_env)


$parse_only = false
$drm_depend = false
$flowdir = nil

# command line parsing
while ARGV.length >= 2 do 
	case ARGV[0]
	when "-verify"
		$parse_only = true
		ARGV.shift
	when "-dd"
		$drm_depend = true
		ARGV.shift
	when "-flowdir"
		ARGV.shift
		usage(1) if $flowdir or ARGV.length < 2
		$flowdir = ARGV[0]
		ARGV.shift
	when "-h", "-help"
		usage 0
	else
		break
	end
end
if ARGV.length >= 1
	flowfile=ARGV.shift
	if ! FileTest.readable?(flowfile)
	 	STDERR.puts flowfile + " does not exit"
	 	exit 1
	 end
else 
	usage(1)
end
if ARGV.length == 1
	target = parse_flowcall(ARGV.shift)
end
usage(1) unless ARGV.length == 0

# flow parsing and verification
begin
	$wd = Dir::getwd

	$flowfunction = Hash.new
	all = parse_flow(flowfile)
	j = $flowfunction[all]

	$flowjob = Hash.new
	$not_selected = 0
	target = parse_flowcall(all) if target.nil?
	j.make(all, vars = Hash.new, nil, 0, target)
	if $flowjob.size == 0
		raise ParsingFormat.new("flow start \"#{target[0]}\" does not exist in #{flowfile}")
	end
	puts "---+ doing #{$flowjob.size} of #{$flowjob.size+$not_selected} jobs with #{target[0]} as flow target"

	STDOUT.flush
	exit 0 if $parse_only
rescue ParsingFunction => msg
	STDERR.puts "Error in " + msg
	exit 1
rescue ParsingFormat => msg
	STDERR.puts "Format error: " + msg
	exit 1
end

# run the workflow
t1 = Time.now
begin
	$terminate_session = $terminate_program = false
	trap("INT") { handle_signal }
	trap("TERM") { handle_signal }

	$session = DRMAA::Session.new
	# puts "# ----- submitting jobs" 
	$already_submitted = $last_submission = 0
	$jobs_in_drm = 0

	# May not stop reaping before all jobs 
	# are submitted in case of streaming.
	first = true
	begin
		all_reaped = false
		all_submitted = submit_jobs(true)
		if first
			# puts "# ----- reaping jobs" 
			first = false
		else
			if all_submitted
				all_reaped = true 
			else
				sleep $STREAMING_RETRY
			end
		end
		reap_jobs()
	end until all_reaped

rescue DRMAA::DRMAAException => msg
	puts msg
	exit 1
end

final_report()

t2 = Time.now
printf("total: %7.1f seconds\n", t2-t1)
exit 0