In: |
lib/slave.rb
|
Parent: | Object |
the Slave class encapsulates the work of setting up a drb server in another process running on localhost. the slave process is attached to it’s parent via a Heartbeat which is designed such that the slave cannot out-live it’s parent and become a zombie, even if the parent dies and early death, such as by ‘kill -9’. the concept and purpose of the Slave class is to be able to setup any server object in another process so easily that using a multi-process, drb/ipc, based design is as easy, or easier, than a multi-threaded one. eg
class Server def add_two n n + 2 end end slave = Slave.new 'object' => Server.new server = slave.object p server.add_two(40) #=> 42
two other methods of providing server objects exist:
a) server = Server.new "this is called the parent" }
Slave.new(:object=>server){|s| puts "#{ s.inspect } passed to block in child process"}
b) Slave.new{ Server.new "this is called only in the child" }
of the two ‘b’ is preferred.
VERSION | = | '1.0.0' |
DEFAULT_SOCKET_CREATION_ATTEMPTS | = | Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42) |
config | ||
DEFAULT_PULSE_RATE | = | Float(ENV['SLAVE_PULSE_RATE'] || 8) |
DEFAULT_DEBUG | = | (ENV['SLAVE_DEBUG'] ? true : false) |
at_exit | [R] | |
debug | [RW] | if this is true and you are running from a terminal information is printed on STDERR |
debug | [R] | |
obj | [R] | |
object | [R] | |
pid | [R] | |
ppid | [R] | |
psname | [R] | |
pulse_rate | [RW] | defined the rate of pinging in the Heartbeat object |
pulse_rate | [R] | |
shutdown | [R] | |
socket | [R] | |
socket_creation_attempts | [RW] | |
socket_creation_attempts | [R] | |
status | [R] | |
uri | [R] |
get a default value
# File lib/slave.rb, line 73 73: def default key 74: #--{{{ 75: send key 76: #--}}} 77: end
just fork with out silly warnings
# File lib/slave.rb, line 94 94: def fork &block 95: #--{{{ 96: v = $VERBOSE 97: begin 98: $VERBOSE = nil 99: Process::fork &block 100: ensure 101: $VERBOSE = v 102: end 103: #--}}} 104: end
# File lib/slave.rb, line 79 79: def getopts opts 80: #--{{{ 81: raise ArgumentError, opts.class unless 82: opts.respond_to?('has_key?') and opts.respond_to?('[]') 83: 84: lambda do |key, *defval| 85: defval = defval.shift 86: keys = [key, key.to_s, key.to_s.intern] 87: key = keys.detect{|k| opts.has_key? k } and break opts[key] 88: defval 89: end 90: #--}}} 91: end
opts may contain the keys ‘object’, ‘socket_creation_attempts’, ‘pulse_rate’, ‘psname’, ‘dumped’, or ‘debug’
# File lib/slave.rb, line 128 128: def initialize opts = {}, &block 129: #--{{{ 130: getopt = getopts opts 131: 132: @obj = getopt['object'] 133: @socket_creation_attempts = getopt['socket_creation_attempts'] || default('socket_creation_attempts') 134: @pulse_rate = getopt['pulse_rate'] || default('pulse_rate') 135: @debug = getopt['debug'] || default('debug') 136: @psname = getopt['psname'] 137: @at_exit = getopt['at_exit'] 138: @dumped = getopt['dumped'] 139: 140: raise ArgumentError, 'no slave object!' if 141: @obj.nil? and block.nil? 142: 143: @shutdown = false 144: @waiter = @status = nil 145: 146: @heartbeat = Heartbeat::new @pulse_rate, @debug 147: @r, @w = IO::pipe 148: @r2, @w2 = IO::pipe 149: 150: # weird syntax because dot/rdoc chokes on this!?!? 151: init_failure = lambda do |e| 152: o = Object.new 153: class << o 154: attr_accessor '__slave_object_failure__' 155: end 156: o.__slave_object_failure__ = Marshal.dump [e.class, e.message, e.backtrace] 157: @object = o 158: end 159: 160: # 161: # child 162: # 163: unless((@pid = Slave::fork)) 164: e = nil 165: begin 166: Kernel.at_exit{ Kernel.exit! } 167: 168: if @obj 169: @object = @obj 170: else 171: begin 172: @object = block.call 173: rescue Exception => e 174: init_failure[e] 175: end 176: end 177: 178: if block and @obj 179: begin 180: block[@obj] 181: rescue Exception => e 182: init_failure[e] 183: end 184: end 185: 186: $0 = (@psname ||= gen_psname(@object)) 187: unless @dumped or @object.respond_to?('__slave_object_failure__') 188: @object.extend DRbUndumped 189: end 190: 191: @ppid, @pid = Process::ppid, Process::pid 192: 193: @r.close 194: @r2.close 195: @socket = nil 196: @uri = nil 197: 198: tmpdir, basename = Dir::tmpdir, File::basename(@psname) 199: 200: @socket_creation_attempts.times do |attempt| 201: se = nil 202: begin 203: s = File::join(tmpdir, "#{ basename }_#{ attempt }") 204: u = "drbunix://#{ s }" 205: DRb::start_service u, @object 206: @socket = s 207: @uri = u 208: trace{ "child - socket <#{ @socket }>" } 209: trace{ "child - uri <#{ @uri }>" } 210: break 211: rescue Errno::EADDRINUSE => se 212: nil 213: end 214: end 215: 216: if @socket and @uri 217: @heartbeat.start 218: 219: trap('SIGUSR2') do 220: # @heartbeat.stop rescue nil 221: DBb::thread.kill rescue nil 222: FileUtils::rm_f @socket rescue nil 223: exit 224: end 225: 226: @w.write @socket 227: @w.close 228: DRb::thread.join 229: else 230: @w.close 231: end 232: rescue Exception => e 233: trace{ %[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] } 234: ensure 235: status = e.respond_to?('status') ? e.status : 1 236: exit(status) 237: end 238: # 239: # parent 240: # 241: else 242: detach 243: @w.close 244: @w2.close 245: @socket = @r.read 246: @r.close 247: 248: trace{ "parent - socket <#{ @socket }>" } 249: 250: if @at_exit 251: @at_exit_thread = Thread.new{ 252: Thread.current.abort_on_exception = true 253: 254: @r2.read rescue 42 255: 256: if @at_exit.respond_to? 'call' 257: @at_exit.call self 258: else 259: send @at_exit.to_s, self 260: end 261: } 262: end 263: 264: if @socket and File::exist? @socket 265: Kernel.at_exit{ FileUtils::rm_f @socket } 266: @uri = "drbunix://#{ socket }" 267: trace{ "parent - uri <#{ @uri }>" } 268: @heartbeat.start 269: # 270: # starting drb on localhost avoids dns lookups! 271: # 272: DRb::start_service('druby://localhost:0', nil) unless DRb::thread 273: @object = DRbObject::new nil, @uri 274: if @object.respond_to? '__slave_object_failure__' 275: c, m, bt = Marshal.load @object.__slave_object_failure__ 276: (e = c.new(m)).set_backtrace bt 277: raise e 278: end 279: @psname ||= gen_psname(@object) 280: else 281: raise "failed to find slave socket <#{ @socket }>" 282: end 283: end 284: #--}}} 285: end
see docs for Slave.default
# File lib/slave.rb, line 364 364: def default key 365: #--{{{ 366: self.class.default key 367: #--}}} 368: end
starts a thread to collect the child status and sets up at_exit handler to prevent zombies. the at_exit handler is canceled if the thread is able to collect the status
# File lib/slave.rb, line 291 291: def detach 292: #--{{{ 293: reap = lambda do |cid| 294: begin 295: @status = Process::waitpid2(cid).last 296: rescue Exception => e 297: m, c, b = e.message, e.class, e.backtrace.join("\n") 298: warn "#{ m } (#{ c })\n#{ b }" unless e.is_a? Errno::ECHILD 299: end 300: end 301: 302: Kernel.at_exit do 303: shutdown rescue nil 304: reap[@pid] rescue nil 305: end 306: 307: @waiter = 308: Thread.new do 309: begin 310: @status = Process::waitpid2(@pid).last 311: ensure 312: reap = lambda{|cid| 'no-op' } 313: end 314: end 315: #--}}} 316: end
generate a default name to appear in ps/top
# File lib/slave.rb, line 356 356: def gen_psname obj 357: #--{{{ 358: "#{ obj.class }_#{ obj.object_id }_#{ Process::ppid }_#{ Process::pid }".downcase.gsub(%/\s+/,'_') 359: #--}}} 360: end
see docs for Slave.getopts
# File lib/slave.rb, line 372 372: def getopts opts 373: #--{{{ 374: self.class.getopts opts 375: #--}}} 376: end
stops the heartbeat thread and kills the child process - give the key ‘quiet’ to ignore errors shutting down, including having already shutdown
# File lib/slave.rb, line 335 335: def shutdown opts = {} 336: #--{{{ 337: quiet = getopts(opts)['quiet'] 338: raise "already shutdown" if @shutdown unless quiet 339: failure = lambda{ raise $! unless quiet } 340: @heartbeat.stop rescue failure.call 341: Process::kill('SIGUSR2', @pid) rescue failure.call 342: @shutdown = true 343: #--}}} 344: end
true
# File lib/slave.rb, line 348 348: def shutdown? 349: #--{{{ 350: @shutdown 351: #--}}} 352: end
debugging output - ENV[‘SLAVE_DEBUG’]=1 to enable
# File lib/slave.rb, line 380 380: def trace 381: #--{{{ 382: STDERR.puts(yield) if @debug and STDERR.tty? 383: #--}}} 384: end
wait for slave to finish. if the keyword ‘non_block’=>true is given a thread is returned to do the waiting in an async fashion. eg
thread = slave.wait(:non_block=>true){|value| "background <#{ value }>"}
# File lib/slave.rb, line 323 323: def wait opts = {}, &b 324: #--{{{ 325: b ||= lambda{|exit_status|} 326: non_block = getopts(opts)['non_block'] 327: non_block ? Thread.new{ b[ @waiter.value ] } : b[ @waiter.value ] 328: #--}}} 329: end