gridengine/examples/drmaa/ruby/flow/flow.rb

912 lines
20 KiB
Ruby
Raw Permalink Normal View History

2024-08-26 18:03:25 +02:00
#!/usr/bin/ruby
#########################################################################
#
# The Contents of this file are made available subject to the terms of
# the Sun Industry Standards Source License Version 1.2
#
# Sun Microsystems Inc., March, 2006
#
#
# Sun Industry Standards Source License Version 1.2
# =================================================
# The contents of this file are subject to the Sun Industry Standards
# Source License Version 1.2 (the "License"); You may not use this file
# except in compliance with the License. You may obtain a copy of the
# License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
#
# Software provided under this License is provided on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
# See the License for the specific provisions governing your rights and
# obligations concerning the Software.
#
# The Initial Developer of the Original Code is: Sun Microsystems, Inc.
#
# Copyright: 2006 by Sun Microsystems, Inc.
#
# All Rights Reserved.
#
#########################################################################
# TODO:
# - provide means to restart entire flows with failed flowjobs be rerun only
# - support bulk jobs
# - allow DRMAA user hold be used despite user hold be used by flow itself
#########################################################################
require 'drmaa'
# ------------------------------------------------------------------------------------------
# Exceptions thrown during parsing stage
class ParsingFunction < ArgumentError ; end
class ParsingFormat < ArgumentError ; end
# ------------------------------------------------------------------------------------------
# The FlowFunction classes represent the entities found in the flowfile.
class FlowFunction
end
class JobsInParallel < FlowFunction
attr_accessor :par
def make(key, vars, depend, depth, select)
do_it = select_func?(key, vars, select)
all_jobs = Array.new
@par.each { |sub|
name = sub[0]
if do_it
flowprint(depth, "PARALLEL: " + name)
end
new_vars = sub[1]
sub_vars = vars.dup
if ! new_vars.nil?
new_vars.each_pair { |var,val| sub_vars[var] = val }
end
j = $flowfunction[name]
if j.nil?
raise ParsingFunction.new("#{key}(): flow function \"#{name}\" does not exit")
end
if do_it
jobs = j.make(name, sub_vars, depend, depth+1, nil)
else
jobs = j.make(name, sub_vars, depend, depth+1, select)
end
if ! jobs.nil?
all_jobs += jobs
end
}
if all_jobs.size != 0
return all_jobs
else
return nil
end
end
end
class JobsInSequence < FlowFunction
attr_accessor :seq
def make(key, vars, depend, depth, select)
do_it = select_func?(key, vars, select)
first = true
@seq.each { |sub|
name = sub[0]
flowprint(depth, "SEQUENTIAL: " + name) if do_it
new_vars = sub[1]
sub_vars = vars.dup
if ! new_vars.nil?
new_vars.each_pair { |var,val| sub_vars[var] = val }
end
j = $flowfunction[name]
if j.nil?
raise ParsingFunction.new("#{key}: flow function \"#{name}\" does not exit")
end
if do_it
depend = j.make(name, sub_vars, depend, depth+1, nil)
else
depend = j.make(name, sub_vars, depend, depth+1, select)
end
}
return depend
end
end
class RunnableJob < FlowFunction
attr_accessor :attrs, :njobs
def initialize
@njobs = 0
end
def make(key, vars, depend, depth, select)
@njobs += 1
job_key = key + "#" + @njobs.to_s
do_it = select_func?(key, vars, select)
fj_attrs = Array.new
@attrs.each_pair { |name,t|
value = substitute(t, vars)
fj_attrs.push([ name, value ])
}
if depend.nil?
f = FlowJob.new(nil, fj_attrs)
flowprint(depth, job_key + "(" + comma_vars(vars) + ")") if do_it
else
f = FlowJob.new(depend.dup, fj_attrs)
flowprint(depth, job_key + "(" + comma_vars(vars) + ") waiting for " + comma_jobs(f.depend, ", ")) if do_it
end
fj_attrs.each { |a| flowprint(depth+1, a[0] + "=\"" + a[1] + "\"") } if do_it
f.presubproc(job_key)
f.verify(job_key)
if ! do_it
$not_selected += 1
return [ ]
end
$flowjob[job_key] = f
return [ job_key ]
end
end
def flowprint(depth, s)
return if ! $parse_only
(depth*3).times { putc " " } ; puts s
end
def comma_vars(vars)
s = ""
first = true
vars.each_pair { |var,val|
if first == false
s += ", "
else
first = false
end
s += var + "=" + val
}
return s
end
def comma_jobs(jobs, sep = ",")
s = ""
first = true
jobs.each { |job|
if first == false
s += sep
else
first = false
end
s += job
}
return s
end
def substitute(s, vars)
vars.each_pair { |var,val|
s = s.sub(var, val)
}
return s
end
# parses name1=value1,... into a Hash
# used both for params and attrs
def var_list(str)
vars = Hash.new
if ! str.nil?
str.strip.scan(/[^,][^,]*/) { |vardef|
n = vardef.strip.scan(/[^=][^=]*/)
vars[n[0].strip] = n[1].strip
}
end
return vars
end
# decide if a paricular flow call was selected as target
def select_func?(k1, vrs1, select)
return true if select.nil?
k2 = select[0]
vrs2 = select[1]
if k1 != k2 or vrs1.size < vrs2.size
return false
end
vrs2.each_pair { |k,v|
if ! vrs1.has_key?(k) or vrs1[k] != v
return false
end
}
return true
end
# return name of first function
def parse_flow(file)
all = nil
begin
IO::foreach(file) { |line|
case line
when /^#/
next
else
# crack line
function = line.sub(/[ ]*=.*$/, "").strip
val = line.sub(/^[^=]*=/, "").strip
if all.nil?
all = function
end
# runnable job
if ! val.index("{").nil?
r = RunnableJob.new
jobdef = val.scan(/[^{}][^{}]*/)[0].strip
r.attrs = var_list(jobdef)
$flowfunction[function] = r
# jobs in parallel
elsif ! val.index("&").nil?
p = JobsInParallel.new
p.par = Array.new
val.scan(/[^&][^&]*/) { |sub| p.par << parse_flowcall(sub) }
$flowfunction[function] = p
# jobs in sequence
elsif ! val.index("|").nil?
s = JobsInSequence.new
s.seq = Array.new
val.scan(/[^|][^|]*/) { |sub| s.seq << parse_flowcall(sub) }
$flowfunction[function] = s
else
# parsing code possibly is not yet good enoug -- sorryh
raise ParsingFormat.new("flow file may not have empty lines")
end
end
}
end
return all
end
def parse_flowcall(s)
jobdef = s.strip.scan(/[^()][^()]*/)
key = jobdef[0].strip
vars = var_list(jobdef[1])
return [ key, vars ]
end
# ------------------------------------------------------------------------------------------
# At end of parsing stage there is one FlowJob for each job to be run.
# The FlowJob also keeps state information, dependency information and
# job finish information.
class FlowJob
# configuration
attr_accessor :attrs, :depend
# state information
attr_accessor :jobid, :info
def initialize(depend, attrs)
@depend = depend
@attrs = attrs
end
# -- verification
def verify(key)
cmd = false
@attrs.each { |a|
name = a[0]
value = a[1]
if value.index('$')
raise ParsingFunction.new("#{key}: #{name}=#{value} contains \"$\"")
end
case name
when "cmd"
if value.index('/') == 0
if ! File.executable?(value)
raise ParsingFunction.new("#{key}: cmd=#{value} must be executable")
end
else
if executable_cmd(value).nil?
raise ParsingFunction.new("#{key}: could't find cmd=#{value} in CMDPATH")
end
end
cmd = true
when "join", "nomail"
true_or_false?(key, name, value)
when "args", "name", "nat", "cat", "wd", "in", "out", "err", "join", "trans", "mail"
else
# bug: must use DRMAA.get_attribute_names() to detect use of invalid attributes
raise ParsingFunction.new("#{key}: unknown attribute \"#{name}\"")
end
}
if !cmd
raise ParsingFunction.new("#{key}: missing mandatory attribute \"cmd\"")
end
end
def presubproc(job_key)
if defined? FlowRC.presubmit_proc
FlowRC.presubmit_proc(job_key, @attrs)
end
end
def executable_cmd(cmd)
path = nil
$CMDPATH.each { |p|
if File.executable?(p + "/" + cmd)
path = p + "/" + cmd
break
end
}
return path
end
def true_or_false?(key, name, value)
case value
when "0", "false", "no", "n"
return false
when "1", "true", "yes", "y"
return true
else
raise ParsingFunction.new("#{key}: \"#{name}=#{value}\" is neither \"true\" nor \"false\"")
end
end
def submit(key, predecessors)
if $MAX_JOBS != 0 and $jobs_in_drm == $MAX_JOBS
return false
end
jt = DRMAA::JobTemplate.new
# job defaults
jt.name = key # flow job name
if $flowdir.nil?
jt.wd = $wd
jt.stdout = ":/dev/null"
jt.join = true
else
jt.wd = $flowdir
jt.stdout = ":#{$flowdir}/#{key}.o"
jt.stderr = ":#{$flowdir}/#{key}.e"
jt.join = false
end
native = nil
attrs.each { |a|
name = a[0]
value = a[1]
case name
when "cmd"
if value.index("/") == 0
jt.command = value
else
jt.command = executable_cmd(value)
end
when "args"
jt.arg = value.split(" ")
when "env"
jt.env = value.split(",")
when "name"
jt.name = value
when "nat"
native = value
when "cat"
jt.category = value
when "hold"
# careful! hold is used by flow itself
# jt.hold = true_or_false?(key, name, value)
when "wd"
jt.wd = value
when "in"
jt.stdin = value
when "out"
jt.stdout = value
when "err"
jt.stderr = value
when "join"
jt.join = true_or_false?(key, name, value)
when "trans"
jt.transfer = value
when "mail"
jt.mail = value.split(",")
when "nomail"
jt.block_mail = true_or_false?(key, name, value)
end
}
if ! predecessors.nil?
if $drm_depend
if native.nil?
jt.native = "-hold_jid " + predecessors
else
jt.native = native + " -hold_jid " + predecessors
end
else
jt.hold = true
jt.native = native unless native.nil?
end
else
jt.native = native unless native.nil?
end
begin
jobid = $session.run(jt)
$already_submitted += 1
$last_submission = Time.now
@jobid = jobid
if ! predecessors.nil?
puts "#{key} " + jobid + " submitted depending on " + predecessors
else
puts "#{key} " + jobid + " submitted"
end
rescue DRMAA::DRMAATryLater
STDERR.puts "... try later (#{key})"
return false
end
$jobs_in_drm += 1
return true
end
# true, if all predecessors done
def is_due?
return true if @depend.nil?
self.depend.each { |key|
info = $flowjob[key].info
if info.nil?
return false # not yet finished
end
if ! info.wifexited? or info.wexitstatus != 0
return false # failed
end
}
return true
end
def can_submit
# now --> [0, jobids]
# later --> [1, nil]
# never --> [2, nil]
r = 0
jobids = nil
self.depend.each { |key|
node = $flowjob[key]
info = node.info
if ! info.nil?
if !info.wifexited? or info.wexitstatus != 0
return [ 2, nil] # failed
else
next # done
end
end
jobid = node.jobid
if jobid.nil?
r = 1 # predecessor not yet submitted
else
# collect jobids
if jobids.nil?
jobids = jobid
else
jobids += "," + jobid
end
end
}
if r == 1
return [1,nil]
else
return [0,jobids]
end
end
end
# ------------------------------------------------------------------------------------------
# The functions below are used by main to run the workflow and cause
# successor jobs be submitted/released once they are due.
# Workflow optimization requires job be submitted in order
# pass (1): jobs without predecessors or with all predecessors run
# pass (2): jobs whose predecessors are submitted
# aims is as broad as possible flow submission.
def submit_jobs(flush)
if $flowjob.size == $already_submitted or $terminate_session
# STDERR.puts "all jobs submitted"
return true # all submitted
end
if ! flush
if $last_submission != 0 and (Time.now - $last_submission) < $STREAMING_RETRY
# puts "... retry not yet reached"
return false # retry not yet reached
end
end
# STDERR.puts "1st pass"
$flowjob.each_pair { |key,fj|
next if ! fj.jobid.nil? # already submitted
next if ! fj.info.nil? # already finished
# all predecessors done
next if ! fj.is_due?
if ! fj.submit(key, nil)
return false # try again
end
if $terminate_program
exit 1
elsif $terminate_session
terminate()
return true
end
}
begin
# STDERR.puts "2nd pass"
all_submitted = true
$flowjob.each_pair { |key,fj|
next if ! fj.jobid.nil? # already submitted
next if ! fj.info.nil? # already finished
# analyze predecessors
status = fj.can_submit()
if status[0] != 0
all_submitted = false if status[0] == 1
next
end
predecessors = status[1]
if ! fj.submit(key, predecessors)
return false # try again
end
if $terminate_program
exit 1
elsif $terminate_session
terminate()
return true
end
}
end until all_submitted
return true # all submitted
end
def reap_jobs
$session.wait_each(1) { |info|
# delete workflow upon user interrupt
if $terminate_program
exit 1
elsif $terminate_session
terminate()
end
# nothing happend
if info.nil?
submit_jobs(false)
next
end
$jobs_in_drm -= 1
# interpret job finish information
if info.wifaborted?
failed = true
happend = "aborted"
caused = "terminated"
elsif info.wifsignaled?
failed = true
happend = "died from " + info.wtermsig
happend += " (core dump)" if info.wcoredump?
caused = "terminated"
elsif info.wifexited?
exit_status = info.wexitstatus
if exit_status != 0
failed = true
happend = "exited with " + exit_status.to_s
caused = "terminated"
else
failed = false
happend = "done"
caused = "released"
end
end
# search flow job
job_key = nil
fjob = nil
$flowjob.each_pair { |k,v|
if v.jobid.nil?
next
end
if v.jobid == info.job
job_key = k
fjob = v
break
end
}
if fjob.nil?
puts "missing flow job for finished job " + info.job
exit 1
end
# mark flow job as done
fjob.info = info
fjob.jobid = nil
trigger = Array.new
if ! $terminate_session
# drive conclusions
$flowjob.each_pair { |k,v|
# finished and non-blocked ones: skip
next if ! v.info.nil? or v.depend.nil? or v.jobid.nil?
# dependend to others: skip
next if ! v.depend.include?(job_key)
if failed
begin
$session.terminate(v.jobid)
rescue DRMAA::DRMAAInvalidJobError
end
trigger << v.jobid
else
do_rls = true
v.depend.each { |k|
do_rls = false if $flowjob[k].info.nil?
}
if do_rls and ! $drm_depend
$session.release(v.jobid)
trigger << v.jobid
end
end
}
end
# report what happend
if trigger.size == 0
puts "#{job_key} #{info.job} " + happend
else
puts "#{job_key} #{info.job} " + happend + " " + caused + " " + comma_jobs(trigger, ", ")
end
submit_jobs(false)
}
end
# show final statistics
def final_report
nfailed = 0
nrun = 0
nnotrun = 0
rusage = Hash.new
$flowjob.each_pair { |k,v|
if v.info.nil?
nnotrun += 1
next
end
if ! v.info.wifexited? or v.info.wexitstatus != 0
nfailed += 1
else
nrun += 1
end
usage = v.info.rusage
next if usage.nil?
usage.each_pair { |name,value|
if $USAGE_REPORT.include?(name)
if ! rusage.has_key?(name)
rusage[name] = value.to_f
else
rusage[name] += value.to_f
end
end
}
}
puts "# ---- final report"
rusage.each_pair { |name,value|
printf("usage: #{name} = %-7.2f\n", value)
}
puts "run: #{nrun} failed: #{nfailed} notrun: #{nnotrun}"
end
def terminate
if ! $did_terminate
STDERR.puts "Terminate!"
$session.terminate
$did_terminate = true
end
end
def handle_signal
if ! $terminate_session
$terminate_session = true
elsif ! $terminate_program
$terminate_program = true
end
end
def usage(ret)
if ret == 0
out = STDOUT
else
out = STDERR
end
out.puts "usage: flow.rb [options] workflow.ff [start]"
out.puts " options: -verify only parse and verify the flow"
out.puts " -dd use DRM dependencies"
out.puts " -flowdir <path> flowdir is used as defaults"
out.puts " start: <flowcall> --> TEST or TEST($arch=solaris)"
exit ret
end
# ------------------------------------------------------------------------------------------
# main
# use defaults
# (1) from ./.flowrc.rb
# (2) from $HOME/.flowrc.rb
# (3) or built-in ones
read_rc_file = false
if FileTest.exist?('.flowrc.rb')
require '.flowrc'
read_rc_file = true
elsif FileTest.exist?(ENV["HOME"] + "/.flowrc.rb")
require ENV["HOME"] + "/.flowrc.rb"
read_rc_file = true
end
if ! read_rc_file
$CMDPATH = Dir::getwd()
$STREAMING_RETRY = 5
$USAGE_REPORT = [ ]
$MAX_JOBS = 0
else
$CMDPATH = FlowRC::CMDPATH
$STREAMING_RETRY = FlowRC::STREAMING_RETRY
$USAGE_REPORT = FlowRC::USAGE_REPORT
$MAX_JOBS = FlowRC::MAX_JOBS
end
# The flowdir is used in a number of cases to have reasonable
# defaults. Thus it makes some difference if flowdir was
# specified or not:
#
# wd (drmaa_wd)
# The flowdir is used as jobs' default working directory.
# Without flowdir the current working directory is simply
# used. Though each jobs' working directory can also be
# specified within the flowfile, but if they have to that
# would make them harder to read by humans.
#
# out/err/join (drmaa- stdout_path/stderr_path/join)
# Without flowdir "/dev/null" is used as default for 'out'
# and 'join' is true. Reason is there were no better
# default to store job output/error files than the
# current working directory, but if that were used
# it might incidentally happen that masses of job
# output files are dumped in some directory. If flowdir
# was specified at command line it is used as default
# for storing job output and error separately in
# $flowdir/<flowjobname>.o and $flowdir/<flowjobname>.o.
#
# cmd (drmaa_remote_command)
# args (drmaa_argv)
# env (drmaa_env)
$parse_only = false
$drm_depend = false
$flowdir = nil
# command line parsing
while ARGV.length >= 2 do
case ARGV[0]
when "-verify"
$parse_only = true
ARGV.shift
when "-dd"
$drm_depend = true
ARGV.shift
when "-flowdir"
ARGV.shift
usage(1) if $flowdir or ARGV.length < 2
$flowdir = ARGV[0]
ARGV.shift
when "-h", "-help"
usage 0
else
break
end
end
if ARGV.length >= 1
flowfile=ARGV.shift
if ! FileTest.readable?(flowfile)
STDERR.puts flowfile + " does not exit"
exit 1
end
else
usage(1)
end
if ARGV.length == 1
target = parse_flowcall(ARGV.shift)
end
usage(1) unless ARGV.length == 0
# flow parsing and verification
begin
$wd = Dir::getwd
$flowfunction = Hash.new
all = parse_flow(flowfile)
j = $flowfunction[all]
$flowjob = Hash.new
$not_selected = 0
target = parse_flowcall(all) if target.nil?
j.make(all, vars = Hash.new, nil, 0, target)
if $flowjob.size == 0
raise ParsingFormat.new("flow start \"#{target[0]}\" does not exist in #{flowfile}")
end
puts "---+ doing #{$flowjob.size} of #{$flowjob.size+$not_selected} jobs with #{target[0]} as flow target"
STDOUT.flush
exit 0 if $parse_only
rescue ParsingFunction => msg
STDERR.puts "Error in " + msg
exit 1
rescue ParsingFormat => msg
STDERR.puts "Format error: " + msg
exit 1
end
# run the workflow
t1 = Time.now
begin
$terminate_session = $terminate_program = false
trap("INT") { handle_signal }
trap("TERM") { handle_signal }
$session = DRMAA::Session.new
# puts "# ----- submitting jobs"
$already_submitted = $last_submission = 0
$jobs_in_drm = 0
# May not stop reaping before all jobs
# are submitted in case of streaming.
first = true
begin
all_reaped = false
all_submitted = submit_jobs(true)
if first
# puts "# ----- reaping jobs"
first = false
else
if all_submitted
all_reaped = true
else
sleep $STREAMING_RETRY
end
end
reap_jobs()
end until all_reaped
rescue DRMAA::DRMAAException => msg
puts msg
exit 1
end
final_report()
t2 = Time.now
printf("total: %7.1f seconds\n", t2-t1)
exit 0