require 'open3' require 'tmpdir' require 'thread' require 'yaml' module Session #--{{{ VERSION = '2.2.0' @use_spawn = ENV['SESSION_USE_SPAWN'] @use_open3 = ENV['SESSION_USE_OPEN3'] @debug = ENV['SESSION_DEBUG'] class << self #--{{{ attr :use_spawn, true attr :use_open3, true attr :debug, true def new(*a, &b) #--{{{ Sh::new(*a, &b) #--}}} end alias [] new #--}}} end class PipeError < StandardError; end class ExecutionError < StandardError; end class History #--{{{ def initialize; @a = []; end def method_missing(m,*a,&b); @a.send(m,*a,&b); end def to_yaml(*a,&b); @a.to_yaml(*a,&b); end alias to_s to_yaml alias to_str to_yaml #--}}} end # class History class Command #--{{{ class << self #--{{{ def cmdno; @cmdno ||= 0; end def cmdno= n; @cmdno = n; end #--}}} end # attributes #--{{{ attr :cmd attr :cmdno attr :out,true attr :err,true attr :cid attr :begin_out attr :end_out attr :begin_out_pat attr :end_out_pat attr :begin_err attr :end_err attr :begin_err_pat attr :end_err_pat #--}}} def initialize(command) #--{{{ @cmd = command.to_s @cmdno = self.class.cmdno self.class.cmdno += 1 @err = '' @out = '' @cid = "%d_%d_%d" % [$$, cmdno, rand(Time.now.usec)] @begin_out = "__CMD_OUT_%s_BEGIN__" % cid @end_out = "__CMD_OUT_%s_END__" % cid @begin_out_pat = %r/#{ Regexp.escape(@begin_out) }/ @end_out_pat = %r/#{ Regexp.escape(@end_out) }/ @begin_err = "__CMD_ERR_%s_BEGIN__" % cid @end_err = "__CMD_ERR_%s_END__" % cid @begin_err_pat = %r/#{ Regexp.escape(@begin_err) }/ @end_err_pat = %r/#{ Regexp.escape(@end_err) }/ #--}}} end def to_hash #--{{{ %w(cmdno cmd out err cid).inject({}){|h,k| h.update k => send(k) } #--}}} end def to_yaml(*a,&b) #--{{{ to_hash.to_yaml(*a,&b) #--}}} end alias to_s to_yaml alias to_str to_yaml #--}}} end # class Command class AbstractSession #--{{{ # class methods class << self #--{{{ def default_prog #--{{{ return @default_prog if defined? @default_prog and @default_prog if defined? self::DEFAULT_PROG return @default_prog = self::DEFAULT_PROG else @default_prog = ENV["SESSION_#{ self }_PROG"] end nil #--}}} end def default_prog= prog #--{{{ @default_prog = prog #--}}} end attr :use_spawn, true attr :use_open3, true attr :debug, true def init #--{{{ @use_spawn = nil @use_open3 = nil @debug = nil #--}}} end alias [] new #--}}} end # class init init # attributes #--{{{ attr :opts attr :prog attr :stdin alias i stdin attr :stdout alias o stdout attr :stderr alias e stderr attr :history attr :track_history attr :outproc, true attr :errproc, true attr :use_spawn attr :use_open3 attr :debug, true alias debug? debug attr :threads #--}}} # instance methods def initialize(*args) #--{{{ @opts = hashify(*args) @prog = opts[:prog] || self.class.default_prog raise(ArgumentError, "no program specified") unless @prog @track_history = nil @track_history = opts[:history] if opts.has_key? :history @track_history = opts[:track_history] if opts.has_key? :track_history @history = History::new if @track_history @debug = Session.debug unless Session.debug.nil? @debug = self.class.debug unless self.class.debug.nil? @debug = opts[:debug] if opts.has_key? :debug @debug = false if @debug.nil? @use_spawn = nil @use_spawn = Session.use_spawn unless Session.use_spawn.nil? @use_spawn = self.class.use_spawn unless self.class.use_spawn.nil? @use_spawn = opts[:use_spawn] if opts.has_key? :use_spawn @use_open3 = nil @use_open3 = Session.use_open3 unless Session.use_open3.nil? @use_open3 = self.class.use_open3 unless self.class.use_open3.nil? @use_open3 = opts[:use_open3] if opts.has_key? :use_open3 @outproc = nil @errproc = nil @stdin, @stdout, @stderr = if @use_spawn Spawn::spawn @prog elsif @use_open3 Open3::popen3 @prog else __popen3 @prog end @threads = [] clear if block_given? ret = nil begin ret = yield self ensure self.close! end return ret end return self #--}}} end def __popen3(*cmd) #--{{{ pw = IO::pipe # pipe[0] for read, pipe[1] for write pr = IO::pipe pe = IO::pipe pid = fork{ # child pw[1].close STDIN.reopen(pw[0]) pw[0].close pr[0].close STDOUT.reopen(pr[1]) pr[1].close pe[0].close STDERR.reopen(pe[1]) pe[1].close exec(*cmd) } Process::detach pid # avoid zombies pw[0].close pr[1].close pe[1].close pi = [pw[1], pr[0], pe[0]] pw[1].sync = true if defined? yield begin return yield(*pi) ensure pi.each{|p| p.close unless p.closed?} end end pi #--}}} end # abstract methods def clear #--{{{ raise NotImplementedError #--}}} end alias flush clear def path #--{{{ raise NotImplementedError #--}}} end def path= #--{{{ raise NotImplementedError #--}}} end def send_command cmd #--{{{ raise NotImplementedError #--}}} end # concrete methods def track_history= bool #--{{{ @history ||= History::new @track_history = bool #--}}} end def ready? #--{{{ (stdin and stdout and stderr) and (IO === stdin and IO === stdout and IO === stderr) and (not (stdin.closed? or stdout.closed? or stderr.closed?)) #--}}} end def close! #--{{{ [stdin, stdout, stderr].each{|pipe| pipe.close} stdin, stdout, stderr = nil, nil, nil true #--}}} end alias close close! def hashify(*a) #--{{{ a.inject({}){|o,h| o.update(h)} #--}}} end private :hashify def execute(command, redirects = {}) #--{{{ $session_command = command if @debug raise(PipeError, command) unless ready? # clear buffers clear # setup redirects rerr = redirects[:e] || redirects[:err] || redirects[2] || redirects[:stderr] || redirects['stderr'] rout = redirects[:o] || redirects[:out] || redirects[1] || redirects[:stdout] || redirects['stdout'] # create cmd object and add to history cmd = Command::new command.to_s # store cmd if tracking history history << cmd if track_history # mutex for accessing shared data mutex = Mutex::new # io data for stderr and stdout err = { :io => stderr, :cmd => cmd.err, :name => 'stderr', :begin => false, :end => false, :begin_pat => cmd.begin_err_pat, :end_pat => cmd.end_err_pat, :redirect => rerr, :proc => errproc, :yield => lambda{|buf| yield(nil, buf)}, :mutex => mutex, } out = { :io => stdout, :cmd => cmd.out, :name => 'stdout', :begin => false, :end => false, :begin_pat => cmd.begin_out_pat, :end_pat => cmd.end_out_pat, :redirect => rout, :proc => outproc, :yield => lambda{|buf| yield(buf, nil)}, :mutex => mutex, } begin # send command in the background so we can begin processing output # immediately - thanks to tanaka akira for this suggestion threads << Thread::new { send_command cmd } # init main = Thread::current exceptions = [] # fire off reader threads [err, out].each do |iodat| threads << Thread::new(iodat, main) do |iodat, main| loop do main.raise(PipeError, command) unless ready? main.raise ExecutionError, iodat[:name] if iodat[:end] and not iodat[:begin] break if iodat[:end] or iodat[:io].eof? line = iodat[:io].gets buf = nil case line when iodat[:end_pat] iodat[:end] = true # handle the special case of non-newline terminated output if((m = %r/(.+)__CMD/o.match(line)) and (pre = m[1])) buf = pre end when iodat[:begin_pat] iodat[:begin] = true else next unless iodat[:begin] and not iodat[:end] # ignore chaff buf = line end if buf iodat[:mutex].synchronize do iodat[:cmd] << buf iodat[:redirect] << buf if iodat[:redirect] iodat[:proc].call buf if iodat[:proc] iodat[:yield].call buf if block_given? end end end true end end ensure # reap all threads - accumulating and rethrowing any exceptions begin while((t = threads.shift)) t.join raise ExecutionError, 'iodat thread failure' unless t.value end rescue => e exceptions << e retry unless threads.empty? ensure unless exceptions.empty? meta_message = '<' << exceptions.map{|e| "#{ e.message } - (#{ e.class })"}.join('|') << '>' meta_backtrace = exceptions.map{|e| e.backtrace}.flatten raise ExecutionError, meta_message, meta_backtrace end end end # this should only happen if eof was reached before end pat [err, out].each do |iodat| raise ExecutionError, iodat[:name] unless iodat[:begin] and iodat[:end] end # get the exit status get_status if respond_to? :get_status out = err = iodat = nil return [cmd.out, cmd.err] #--}}} end #--}}} end # class AbstractSession class Sh < AbstractSession #--{{{ DEFAULT_PROG = 'sh' ECHO = 'echo' attr :status alias exit_status status alias exitstatus status def clear #--{{{ stdin.puts "#{ ECHO } __clear__ 1>&2" stdin.puts "#{ ECHO } __clear__" stdin.flush while((line = stderr.gets) and line !~ %r/__clear__/o); end while((line = stdout.gets) and line !~ %r/__clear__/o); end self #--}}} end def send_command cmd #--{{{ stdin.printf "%s '%s' 1>&2\n", ECHO, cmd.begin_err stdin.printf "%s '%s' \n", ECHO, cmd.begin_out stdin.printf "%s\n", cmd.cmd stdin.printf "export __exit_status__=$?\n" stdin.printf "%s '%s' 1>&2\n", ECHO, cmd.end_err stdin.printf "%s '%s' \n", ECHO, cmd.end_out stdin.flush #--}}} end def get_status #--{{{ @status = get_var '__exit_status__' unless @status =~ /^\s*\d+\s*$/o raise ExecutionError, "could not determine exit status from <#{ @status.inspect }>" end @status = Integer @status #--}}} end def set_var name, value #--{{{ stdin.puts "export #{ name }=#{ value }" stdin.flush #--}}} end def get_var name #--{{{ stdin.puts "#{ ECHO } \"#{ name }=${#{ name }}\"" stdin.flush var = nil while((line = stdout.gets)) m = %r/#{ name }\s*=\s*(.*)/.match line if m var = m[1] raise ExecutionError, "could not determine <#{ name }> from <#{ line.inspect }>" unless var break end end var #--}}} end def path #--{{{ var = get_var 'PATH' var.strip.split %r/:/o #--}}} end def path= arg #--{{{ case arg when Array arg = arg.join ':' else arg = arg.to_s.strip end set_var 'PATH', "'#{ arg }'" self.path #--}}} end #--}}} end # class Sh class Bash < Sh #--{{{ DEFAULT_PROG = 'bash' class Login < Bash DEFAULT_PROG = 'bash --login' end #--}}} end # class Bash class Shell < Bash; end # IDL => interactive data language - see http://www.rsinc.com/ class IDL < AbstractSession #--{{{ class LicenseManagerError < StandardError; end DEFAULT_PROG = 'idl' MAX_TRIES = 32 def initialize(*args) #--{{{ tries = 0 ret = nil begin ret = super rescue LicenseManagerError => e tries += 1 if tries < MAX_TRIES sleep 1 retry else raise LicenseManagerError, "<#{ MAX_TRIES }> attempts <#{ e.message }>" end end ret #--}}} end def clear #--{{{ stdin.puts "retall" stdin.puts "printf, -2, '__clear__'" stdin.puts "printf, -1, '__clear__'" stdin.flush while((line = stderr.gets) and line !~ %r/__clear__/o) raise LicenseManagerError, line if line =~ %r/license\s*manager/io end while((line = stdout.gets) and line !~ %r/__clear__/o) raise LicenseManagerError, line if line =~ %r/license\s*manager/io end self #--}}} end def send_command cmd #--{{{ stdin.printf "printf, -2, '%s'\n", cmd.begin_err stdin.printf "printf, -1, '%s'\n", cmd.begin_out stdin.printf "%s\n", cmd.cmd stdin.printf "retall\n" stdin.printf "printf, -2, '%s'\n", cmd.end_err stdin.printf "printf, -1, '%s'\n", cmd.end_out stdin.flush #--}}} end def path #--{{{ stdout, stderr = execute "print, !path" stdout.strip.split %r/:/o #--}}} end def path= arg #--{{{ case arg when Array arg = arg.join ':' else arg = arg.to_s.strip end stdout, stderr = execute "!path='#{ arg }'" self.path #--}}} end #--}}} end # class IDL module Spawn #--{{{ class << self def spawn command #--{{{ ipath = tmpfifo opath = tmpfifo epath = tmpfifo cmd = "#{ command } < #{ ipath } 1> #{ opath } 2> #{ epath } &" system cmd i = open ipath, 'w' o = open opath, 'r' e = open epath, 'r' [i,o,e] #--}}} end def tmpfifo #--{{{ path = nil 42.times do |i| tpath = File.join(Dir.tmpdir, "#{ $$ }.#{ rand }.#{ i }") system "mkfifo #{ tpath }" next unless $? == 0 path = tpath at_exit{ File.unlink(path) rescue STDERR.puts("rm <#{ path }> failed") } break end raise "could not generate tmpfifo" unless path path #--}}} end end #--}}} end # module Spawn #--}}} end # module Session