#!/usr/bin/ruby ######################################################################### # # The Contents of this file are made available subject to the terms of # the Sun Industry Standards Source License Version 1.2 # # Sun Microsystems Inc., March, 2006 # # # Sun Industry Standards Source License Version 1.2 # ================================================= # The contents of this file are subject to the Sun Industry Standards # Source License Version 1.2 (the "License"); You may not use this file # except in compliance with the License. You may obtain a copy of the # License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html # # Software provided under this License is provided on an "AS IS" basis, # WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, # WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS, # MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING. # See the License for the specific provisions governing your rights and # obligations concerning the Software. # # The Initial Developer of the Original Code is: Sun Microsystems, Inc. # # Copyright: 2006 by Sun Microsystems, Inc. # # All Rights Reserved. # ######################################################################### # TODO: # - provide means to restart entire flows with failed flowjobs be rerun only # - support bulk jobs # - allow DRMAA user hold be used despite user hold be used by flow itself ######################################################################### require 'drmaa' # ------------------------------------------------------------------------------------------ # Exceptions thrown during parsing stage class ParsingFunction < ArgumentError ; end class ParsingFormat < ArgumentError ; end # ------------------------------------------------------------------------------------------ # The FlowFunction classes represent the entities found in the flowfile. class FlowFunction end class JobsInParallel < FlowFunction attr_accessor :par def make(key, vars, depend, depth, select) do_it = select_func?(key, vars, select) all_jobs = Array.new @par.each { |sub| name = sub[0] if do_it flowprint(depth, "PARALLEL: " + name) end new_vars = sub[1] sub_vars = vars.dup if ! new_vars.nil? new_vars.each_pair { |var,val| sub_vars[var] = val } end j = $flowfunction[name] if j.nil? raise ParsingFunction.new("#{key}(): flow function \"#{name}\" does not exit") end if do_it jobs = j.make(name, sub_vars, depend, depth+1, nil) else jobs = j.make(name, sub_vars, depend, depth+1, select) end if ! jobs.nil? all_jobs += jobs end } if all_jobs.size != 0 return all_jobs else return nil end end end class JobsInSequence < FlowFunction attr_accessor :seq def make(key, vars, depend, depth, select) do_it = select_func?(key, vars, select) first = true @seq.each { |sub| name = sub[0] flowprint(depth, "SEQUENTIAL: " + name) if do_it new_vars = sub[1] sub_vars = vars.dup if ! new_vars.nil? new_vars.each_pair { |var,val| sub_vars[var] = val } end j = $flowfunction[name] if j.nil? raise ParsingFunction.new("#{key}: flow function \"#{name}\" does not exit") end if do_it depend = j.make(name, sub_vars, depend, depth+1, nil) else depend = j.make(name, sub_vars, depend, depth+1, select) end } return depend end end class RunnableJob < FlowFunction attr_accessor :attrs, :njobs def initialize @njobs = 0 end def make(key, vars, depend, depth, select) @njobs += 1 job_key = key + "#" + @njobs.to_s do_it = select_func?(key, vars, select) fj_attrs = Array.new @attrs.each_pair { |name,t| value = substitute(t, vars) fj_attrs.push([ name, value ]) } if depend.nil? f = FlowJob.new(nil, fj_attrs) flowprint(depth, job_key + "(" + comma_vars(vars) + ")") if do_it else f = FlowJob.new(depend.dup, fj_attrs) flowprint(depth, job_key + "(" + comma_vars(vars) + ") waiting for " + comma_jobs(f.depend, ", ")) if do_it end fj_attrs.each { |a| flowprint(depth+1, a[0] + "=\"" + a[1] + "\"") } if do_it f.presubproc(job_key) f.verify(job_key) if ! do_it $not_selected += 1 return [ ] end $flowjob[job_key] = f return [ job_key ] end end def flowprint(depth, s) return if ! $parse_only (depth*3).times { putc " " } ; puts s end def comma_vars(vars) s = "" first = true vars.each_pair { |var,val| if first == false s += ", " else first = false end s += var + "=" + val } return s end def comma_jobs(jobs, sep = ",") s = "" first = true jobs.each { |job| if first == false s += sep else first = false end s += job } return s end def substitute(s, vars) vars.each_pair { |var,val| s = s.sub(var, val) } return s end # parses name1=value1,... into a Hash # used both for params and attrs def var_list(str) vars = Hash.new if ! str.nil? str.strip.scan(/[^,][^,]*/) { |vardef| n = vardef.strip.scan(/[^=][^=]*/) vars[n[0].strip] = n[1].strip } end return vars end # decide if a paricular flow call was selected as target def select_func?(k1, vrs1, select) return true if select.nil? k2 = select[0] vrs2 = select[1] if k1 != k2 or vrs1.size < vrs2.size return false end vrs2.each_pair { |k,v| if ! vrs1.has_key?(k) or vrs1[k] != v return false end } return true end # return name of first function def parse_flow(file) all = nil begin IO::foreach(file) { |line| case line when /^#/ next else # crack line function = line.sub(/[ ]*=.*$/, "").strip val = line.sub(/^[^=]*=/, "").strip if all.nil? all = function end # runnable job if ! val.index("{").nil? r = RunnableJob.new jobdef = val.scan(/[^{}][^{}]*/)[0].strip r.attrs = var_list(jobdef) $flowfunction[function] = r # jobs in parallel elsif ! val.index("&").nil? p = JobsInParallel.new p.par = Array.new val.scan(/[^&][^&]*/) { |sub| p.par << parse_flowcall(sub) } $flowfunction[function] = p # jobs in sequence elsif ! val.index("|").nil? s = JobsInSequence.new s.seq = Array.new val.scan(/[^|][^|]*/) { |sub| s.seq << parse_flowcall(sub) } $flowfunction[function] = s else # parsing code possibly is not yet good enoug -- sorryh raise ParsingFormat.new("flow file may not have empty lines") end end } end return all end def parse_flowcall(s) jobdef = s.strip.scan(/[^()][^()]*/) key = jobdef[0].strip vars = var_list(jobdef[1]) return [ key, vars ] end # ------------------------------------------------------------------------------------------ # At end of parsing stage there is one FlowJob for each job to be run. # The FlowJob also keeps state information, dependency information and # job finish information. class FlowJob # configuration attr_accessor :attrs, :depend # state information attr_accessor :jobid, :info def initialize(depend, attrs) @depend = depend @attrs = attrs end # -- verification def verify(key) cmd = false @attrs.each { |a| name = a[0] value = a[1] if value.index('$') raise ParsingFunction.new("#{key}: #{name}=#{value} contains \"$\"") end case name when "cmd" if value.index('/') == 0 if ! File.executable?(value) raise ParsingFunction.new("#{key}: cmd=#{value} must be executable") end else if executable_cmd(value).nil? raise ParsingFunction.new("#{key}: could't find cmd=#{value} in CMDPATH") end end cmd = true when "join", "nomail" true_or_false?(key, name, value) when "args", "name", "nat", "cat", "wd", "in", "out", "err", "join", "trans", "mail" else # bug: must use DRMAA.get_attribute_names() to detect use of invalid attributes raise ParsingFunction.new("#{key}: unknown attribute \"#{name}\"") end } if !cmd raise ParsingFunction.new("#{key}: missing mandatory attribute \"cmd\"") end end def presubproc(job_key) if defined? FlowRC.presubmit_proc FlowRC.presubmit_proc(job_key, @attrs) end end def executable_cmd(cmd) path = nil $CMDPATH.each { |p| if File.executable?(p + "/" + cmd) path = p + "/" + cmd break end } return path end def true_or_false?(key, name, value) case value when "0", "false", "no", "n" return false when "1", "true", "yes", "y" return true else raise ParsingFunction.new("#{key}: \"#{name}=#{value}\" is neither \"true\" nor \"false\"") end end def submit(key, predecessors) if $MAX_JOBS != 0 and $jobs_in_drm == $MAX_JOBS return false end jt = DRMAA::JobTemplate.new # job defaults jt.name = key # flow job name if $flowdir.nil? jt.wd = $wd jt.stdout = ":/dev/null" jt.join = true else jt.wd = $flowdir jt.stdout = ":#{$flowdir}/#{key}.o" jt.stderr = ":#{$flowdir}/#{key}.e" jt.join = false end native = nil attrs.each { |a| name = a[0] value = a[1] case name when "cmd" if value.index("/") == 0 jt.command = value else jt.command = executable_cmd(value) end when "args" jt.arg = value.split(" ") when "env" jt.env = value.split(",") when "name" jt.name = value when "nat" native = value when "cat" jt.category = value when "hold" # careful! hold is used by flow itself # jt.hold = true_or_false?(key, name, value) when "wd" jt.wd = value when "in" jt.stdin = value when "out" jt.stdout = value when "err" jt.stderr = value when "join" jt.join = true_or_false?(key, name, value) when "trans" jt.transfer = value when "mail" jt.mail = value.split(",") when "nomail" jt.block_mail = true_or_false?(key, name, value) end } if ! predecessors.nil? if $drm_depend if native.nil? jt.native = "-hold_jid " + predecessors else jt.native = native + " -hold_jid " + predecessors end else jt.hold = true jt.native = native unless native.nil? end else jt.native = native unless native.nil? end begin jobid = $session.run(jt) $already_submitted += 1 $last_submission = Time.now @jobid = jobid if ! predecessors.nil? puts "#{key} " + jobid + " submitted depending on " + predecessors else puts "#{key} " + jobid + " submitted" end rescue DRMAA::DRMAATryLater STDERR.puts "... try later (#{key})" return false end $jobs_in_drm += 1 return true end # true, if all predecessors done def is_due? return true if @depend.nil? self.depend.each { |key| info = $flowjob[key].info if info.nil? return false # not yet finished end if ! info.wifexited? or info.wexitstatus != 0 return false # failed end } return true end def can_submit # now --> [0, jobids] # later --> [1, nil] # never --> [2, nil] r = 0 jobids = nil self.depend.each { |key| node = $flowjob[key] info = node.info if ! info.nil? if !info.wifexited? or info.wexitstatus != 0 return [ 2, nil] # failed else next # done end end jobid = node.jobid if jobid.nil? r = 1 # predecessor not yet submitted else # collect jobids if jobids.nil? jobids = jobid else jobids += "," + jobid end end } if r == 1 return [1,nil] else return [0,jobids] end end end # ------------------------------------------------------------------------------------------ # The functions below are used by main to run the workflow and cause # successor jobs be submitted/released once they are due. # Workflow optimization requires job be submitted in order # pass (1): jobs without predecessors or with all predecessors run # pass (2): jobs whose predecessors are submitted # aims is as broad as possible flow submission. def submit_jobs(flush) if $flowjob.size == $already_submitted or $terminate_session # STDERR.puts "all jobs submitted" return true # all submitted end if ! flush if $last_submission != 0 and (Time.now - $last_submission) < $STREAMING_RETRY # puts "... retry not yet reached" return false # retry not yet reached end end # STDERR.puts "1st pass" $flowjob.each_pair { |key,fj| next if ! fj.jobid.nil? # already submitted next if ! fj.info.nil? # already finished # all predecessors done next if ! fj.is_due? if ! fj.submit(key, nil) return false # try again end if $terminate_program exit 1 elsif $terminate_session terminate() return true end } begin # STDERR.puts "2nd pass" all_submitted = true $flowjob.each_pair { |key,fj| next if ! fj.jobid.nil? # already submitted next if ! fj.info.nil? # already finished # analyze predecessors status = fj.can_submit() if status[0] != 0 all_submitted = false if status[0] == 1 next end predecessors = status[1] if ! fj.submit(key, predecessors) return false # try again end if $terminate_program exit 1 elsif $terminate_session terminate() return true end } end until all_submitted return true # all submitted end def reap_jobs $session.wait_each(1) { |info| # delete workflow upon user interrupt if $terminate_program exit 1 elsif $terminate_session terminate() end # nothing happend if info.nil? submit_jobs(false) next end $jobs_in_drm -= 1 # interpret job finish information if info.wifaborted? failed = true happend = "aborted" caused = "terminated" elsif info.wifsignaled? failed = true happend = "died from " + info.wtermsig happend += " (core dump)" if info.wcoredump? caused = "terminated" elsif info.wifexited? exit_status = info.wexitstatus if exit_status != 0 failed = true happend = "exited with " + exit_status.to_s caused = "terminated" else failed = false happend = "done" caused = "released" end end # search flow job job_key = nil fjob = nil $flowjob.each_pair { |k,v| if v.jobid.nil? next end if v.jobid == info.job job_key = k fjob = v break end } if fjob.nil? puts "missing flow job for finished job " + info.job exit 1 end # mark flow job as done fjob.info = info fjob.jobid = nil trigger = Array.new if ! $terminate_session # drive conclusions $flowjob.each_pair { |k,v| # finished and non-blocked ones: skip next if ! v.info.nil? or v.depend.nil? or v.jobid.nil? # dependend to others: skip next if ! v.depend.include?(job_key) if failed begin $session.terminate(v.jobid) rescue DRMAA::DRMAAInvalidJobError end trigger << v.jobid else do_rls = true v.depend.each { |k| do_rls = false if $flowjob[k].info.nil? } if do_rls and ! $drm_depend $session.release(v.jobid) trigger << v.jobid end end } end # report what happend if trigger.size == 0 puts "#{job_key} #{info.job} " + happend else puts "#{job_key} #{info.job} " + happend + " " + caused + " " + comma_jobs(trigger, ", ") end submit_jobs(false) } end # show final statistics def final_report nfailed = 0 nrun = 0 nnotrun = 0 rusage = Hash.new $flowjob.each_pair { |k,v| if v.info.nil? nnotrun += 1 next end if ! v.info.wifexited? or v.info.wexitstatus != 0 nfailed += 1 else nrun += 1 end usage = v.info.rusage next if usage.nil? usage.each_pair { |name,value| if $USAGE_REPORT.include?(name) if ! rusage.has_key?(name) rusage[name] = value.to_f else rusage[name] += value.to_f end end } } puts "# ---- final report" rusage.each_pair { |name,value| printf("usage: #{name} = %-7.2f\n", value) } puts "run: #{nrun} failed: #{nfailed} notrun: #{nnotrun}" end def terminate if ! $did_terminate STDERR.puts "Terminate!" $session.terminate $did_terminate = true end end def handle_signal if ! $terminate_session $terminate_session = true elsif ! $terminate_program $terminate_program = true end end def usage(ret) if ret == 0 out = STDOUT else out = STDERR end out.puts "usage: flow.rb [options] workflow.ff [start]" out.puts " options: -verify only parse and verify the flow" out.puts " -dd use DRM dependencies" out.puts " -flowdir flowdir is used as defaults" out.puts " start: --> TEST or TEST($arch=solaris)" exit ret end # ------------------------------------------------------------------------------------------ # main # use defaults # (1) from ./.flowrc.rb # (2) from $HOME/.flowrc.rb # (3) or built-in ones read_rc_file = false if FileTest.exist?('.flowrc.rb') require '.flowrc' read_rc_file = true elsif FileTest.exist?(ENV["HOME"] + "/.flowrc.rb") require ENV["HOME"] + "/.flowrc.rb" read_rc_file = true end if ! read_rc_file $CMDPATH = Dir::getwd() $STREAMING_RETRY = 5 $USAGE_REPORT = [ ] $MAX_JOBS = 0 else $CMDPATH = FlowRC::CMDPATH $STREAMING_RETRY = FlowRC::STREAMING_RETRY $USAGE_REPORT = FlowRC::USAGE_REPORT $MAX_JOBS = FlowRC::MAX_JOBS end # The flowdir is used in a number of cases to have reasonable # defaults. Thus it makes some difference if flowdir was # specified or not: # # wd (drmaa_wd) # The flowdir is used as jobs' default working directory. # Without flowdir the current working directory is simply # used. Though each jobs' working directory can also be # specified within the flowfile, but if they have to that # would make them harder to read by humans. # # out/err/join (drmaa- stdout_path/stderr_path/join) # Without flowdir "/dev/null" is used as default for 'out' # and 'join' is true. Reason is there were no better # default to store job output/error files than the # current working directory, but if that were used # it might incidentally happen that masses of job # output files are dumped in some directory. If flowdir # was specified at command line it is used as default # for storing job output and error separately in # $flowdir/.o and $flowdir/.o. # # cmd (drmaa_remote_command) # args (drmaa_argv) # env (drmaa_env) $parse_only = false $drm_depend = false $flowdir = nil # command line parsing while ARGV.length >= 2 do case ARGV[0] when "-verify" $parse_only = true ARGV.shift when "-dd" $drm_depend = true ARGV.shift when "-flowdir" ARGV.shift usage(1) if $flowdir or ARGV.length < 2 $flowdir = ARGV[0] ARGV.shift when "-h", "-help" usage 0 else break end end if ARGV.length >= 1 flowfile=ARGV.shift if ! FileTest.readable?(flowfile) STDERR.puts flowfile + " does not exit" exit 1 end else usage(1) end if ARGV.length == 1 target = parse_flowcall(ARGV.shift) end usage(1) unless ARGV.length == 0 # flow parsing and verification begin $wd = Dir::getwd $flowfunction = Hash.new all = parse_flow(flowfile) j = $flowfunction[all] $flowjob = Hash.new $not_selected = 0 target = parse_flowcall(all) if target.nil? j.make(all, vars = Hash.new, nil, 0, target) if $flowjob.size == 0 raise ParsingFormat.new("flow start \"#{target[0]}\" does not exist in #{flowfile}") end puts "---+ doing #{$flowjob.size} of #{$flowjob.size+$not_selected} jobs with #{target[0]} as flow target" STDOUT.flush exit 0 if $parse_only rescue ParsingFunction => msg STDERR.puts "Error in " + msg exit 1 rescue ParsingFormat => msg STDERR.puts "Format error: " + msg exit 1 end # run the workflow t1 = Time.now begin $terminate_session = $terminate_program = false trap("INT") { handle_signal } trap("TERM") { handle_signal } $session = DRMAA::Session.new # puts "# ----- submitting jobs" $already_submitted = $last_submission = 0 $jobs_in_drm = 0 # May not stop reaping before all jobs # are submitted in case of streaming. first = true begin all_reaped = false all_submitted = submit_jobs(true) if first # puts "# ----- reaping jobs" first = false else if all_submitted all_reaped = true else sleep $STREAMING_RETRY end end reap_jobs() end until all_reaped rescue DRMAA::DRMAAException => msg puts msg exit 1 end final_report() t2 = Time.now printf("total: %7.1f seconds\n", t2-t1) exit 0