Slave (Class)

In: lib/slave.rb
Parent: Object
o Slave TopLevel

the Slave class encapsulates the work of setting up a drb server in another process running on localhost. the slave process is attached to it’s parent via a Heartbeat which is designed such that the slave cannot out-live it’s parent and become a zombie, even if the parent dies and early death, such as by ‘kill -9’. the concept and purpose of the Slave class is to be able to setup any server object in another process so easily that using a multi-process, drb/ipc, based design is as easy, or easier, than a multi-threaded one. eg

  class Server
    def add_two n
      n + 2
    end
  end

  slave = Slave.new 'object' => Server.new
  server = slave.object

  p server.add_two(40) #=> 42

two other methods of providing server objects exist:

a) server = Server.new "this is called the parent" }

   Slave.new(:object=>server){|s| puts "#{ s.inspect } passed to block in child process"}

b) Slave.new{ Server.new "this is called only in the child" }

of the two ‘b’ is preferred.

Methods

default   default   detach   fork   gen_psname   getopts   getopts   new   shutdown   shutdown?   trace   version   wait   wait2  

Constants

VERSION = '1.0.0'
DEFAULT_SOCKET_CREATION_ATTEMPTS = Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42)
  config
DEFAULT_PULSE_RATE = Float(ENV['SLAVE_PULSE_RATE'] || 8)
DEFAULT_DEBUG = (ENV['SLAVE_DEBUG'] ? true : false)

Attributes

at_exit  [R] 
debug  [RW]  if this is true and you are running from a terminal information is printed on STDERR
debug  [R] 
obj  [R] 
object  [R] 
pid  [R] 
ppid  [R] 
psname  [R] 
pulse_rate  [RW]  defined the rate of pinging in the Heartbeat object
pulse_rate  [R] 
shutdown  [R] 
socket  [R] 
socket_creation_attempts  [RW] 
socket_creation_attempts  [R] 
status  [R] 
uri  [R] 

Classes and Modules

Class Slave::Heartbeat

Public Class methods

get a default value

[Source]

    # File lib/slave.rb, line 73
73:       def default key
74: #--{{{
75:         send key
76: #--}}}
77:       end

just fork with out silly warnings

[Source]

     # File lib/slave.rb, line 94
 94:       def fork &block
 95: #--{{{
 96:         v = $VERBOSE
 97:         begin
 98:           $VERBOSE = nil
 99:           Process::fork &block
100:         ensure
101:         $VERBOSE = v
102:         end
103: #--}}}
104:       end

[Source]

    # File lib/slave.rb, line 79
79:       def getopts opts
80: #--{{{
81:         raise ArgumentError, opts.class unless
82:           opts.respond_to?('has_key?') and opts.respond_to?('[]')
83: 
84:         lambda do |key, *defval|
85:           defval = defval.shift
86:           keys = [key, key.to_s, key.to_s.intern]
87:           key = keys.detect{|k| opts.has_key? k } and break opts[key]
88:           defval
89:         end
90: #--}}}
91:       end

opts may contain the keys ‘object’, ‘socket_creation_attempts’, ‘pulse_rate’, ‘psname’, ‘dumped’, or ‘debug’

[Source]

     # File lib/slave.rb, line 128
128:     def initialize opts = {}, &block
129: #--{{{
130:       getopt = getopts opts
131: 
132:       @obj = getopt['object']
133:       @socket_creation_attempts = getopt['socket_creation_attempts'] || default('socket_creation_attempts')
134:       @pulse_rate = getopt['pulse_rate'] || default('pulse_rate')
135:       @debug = getopt['debug'] || default('debug')
136:       @psname = getopt['psname']
137:       @at_exit = getopt['at_exit']
138:       @dumped = getopt['dumped']
139: 
140:       raise ArgumentError, 'no slave object!' if 
141:         @obj.nil? and block.nil?
142: 
143:       @shutdown = false
144:       @waiter = @status = nil
145: 
146:       @heartbeat = Heartbeat::new @pulse_rate, @debug
147:       @r, @w = IO::pipe
148:       @r2, @w2 = IO::pipe
149: 
150:       # weird syntax because dot/rdoc chokes on this!?!?
151:       init_failure = lambda do |e|
152:         o = Object.new
153:         class << o
154:           attr_accessor '__slave_object_failure__'
155:         end
156:         o.__slave_object_failure__ = Marshal.dump [e.class, e.message, e.backtrace]
157:         @object = o
158:       end
159: 
160:     #
161:     # child
162:     #
163:       unless((@pid = Slave::fork))
164:         e = nil
165:         begin
166:           Kernel.at_exit{ Kernel.exit! }
167: 
168:           if @obj
169:             @object = @obj
170:           else
171:             begin
172:               @object = block.call 
173:             rescue Exception => e
174:               init_failure[e]
175:             end
176:           end
177: 
178:           if block and @obj
179:             begin
180:               block[@obj]
181:             rescue Exception => e
182:               init_failure[e]
183:             end
184:           end
185: 
186:           $0 = (@psname ||= gen_psname(@object))
187:           unless @dumped or @object.respond_to?('__slave_object_failure__')
188:             @object.extend DRbUndumped
189:           end
190: 
191:           @ppid, @pid = Process::ppid, Process::pid
192: 
193:           @r.close
194:           @r2.close
195:           @socket = nil
196:           @uri = nil
197: 
198:           tmpdir, basename = Dir::tmpdir, File::basename(@psname)
199: 
200:           @socket_creation_attempts.times do |attempt|
201:             se = nil
202:             begin
203:               s = File::join(tmpdir, "#{ basename }_#{ attempt }")
204:               u = "drbunix://#{ s }"
205:               DRb::start_service u, @object 
206:               @socket = s
207:               @uri = u
208:               trace{ "child - socket <#{ @socket }>" }
209:               trace{ "child - uri <#{ @uri }>" }
210:               break
211:             rescue Errno::EADDRINUSE => se
212:               nil
213:             end
214:           end
215: 
216:           if @socket and @uri
217:             @heartbeat.start
218: 
219:             trap('SIGUSR2') do
220:               # @heartbeat.stop rescue nil
221:               DBb::thread.kill rescue nil
222:               FileUtils::rm_f @socket rescue nil
223:               exit
224:             end
225: 
226:             @w.write @socket 
227:             @w.close
228:             DRb::thread.join
229:           else
230:             @w.close
231:           end
232:         rescue Exception => e
233:           trace{ %[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
234:         ensure
235:           status = e.respond_to?('status') ? e.status : 1
236:           exit(status)
237:         end
238:     #
239:     # parent 
240:     #
241:       else
242:         detach
243:         @w.close
244:         @w2.close
245:         @socket = @r.read
246:         @r.close
247: 
248:         trace{ "parent - socket <#{ @socket }>" }
249: 
250:         if @at_exit
251:           @at_exit_thread = Thread.new{
252:             Thread.current.abort_on_exception = true
253: 
254:             @r2.read rescue 42
255: 
256:             if @at_exit.respond_to? 'call'
257:               @at_exit.call self
258:             else
259:               send @at_exit.to_s, self
260:             end
261:           }
262:         end
263: 
264:         if @socket and File::exist? @socket
265:           Kernel.at_exit{ FileUtils::rm_f @socket }
266:           @uri = "drbunix://#{ socket }"
267:           trace{ "parent - uri <#{ @uri }>" }
268:           @heartbeat.start
269:         #
270:         # starting drb on localhost avoids dns lookups!
271:         #
272:           DRb::start_service('druby://localhost:0', nil) unless DRb::thread
273:           @object = DRbObject::new nil, @uri
274:           if @object.respond_to? '__slave_object_failure__'
275:             c, m, bt = Marshal.load @object.__slave_object_failure__
276:             (e = c.new(m)).set_backtrace bt
277:             raise e 
278:           end
279:           @psname ||= gen_psname(@object)
280:         else
281:           raise "failed to find slave socket <#{ @socket }>"
282:         end
283:       end
284: #--}}}
285:     end

[Source]

    # File lib/slave.rb, line 40
40:     def self.version() VERSION end

Public Instance methods

see docs for Slave.default

[Source]

     # File lib/slave.rb, line 364
364:     def default key
365: #--{{{
366:       self.class.default key
367: #--}}}
368:     end

starts a thread to collect the child status and sets up at_exit handler to prevent zombies. the at_exit handler is canceled if the thread is able to collect the status

[Source]

     # File lib/slave.rb, line 291
291:     def detach
292: #--{{{
293:       reap = lambda do |cid|
294:         begin
295:           @status = Process::waitpid2(cid).last
296:         rescue Exception => e 
297:           m, c, b = e.message, e.class, e.backtrace.join("\n")
298:           warn "#{ m } (#{ c })\n#{ b }"  unless e.is_a? Errno::ECHILD
299:         end
300:       end
301: 
302:       Kernel.at_exit do
303:         shutdown rescue nil
304:         reap[@pid] rescue nil
305:       end
306: 
307:       @waiter = 
308:         Thread.new do
309:           begin
310:             @status = Process::waitpid2(@pid).last
311:           ensure
312:             reap = lambda{|cid| 'no-op' }
313:           end
314:         end
315: #--}}}
316:     end

generate a default name to appear in ps/top

[Source]

     # File lib/slave.rb, line 356
356:     def gen_psname obj
357: #--{{{
358:       "#{ obj.class }_#{ obj.object_id }_#{ Process::ppid }_#{ Process::pid }".downcase.gsub(%/\s+/,'_')
359: #--}}}
360:     end

see docs for Slave.getopts

[Source]

     # File lib/slave.rb, line 372
372:     def getopts opts 
373: #--{{{
374:       self.class.getopts opts 
375: #--}}}
376:     end

stops the heartbeat thread and kills the child process - give the key ‘quiet’ to ignore errors shutting down, including having already shutdown

[Source]

     # File lib/slave.rb, line 335
335:     def shutdown opts = {}
336: #--{{{
337:       quiet = getopts(opts)['quiet']
338:       raise "already shutdown" if @shutdown unless quiet
339:       failure = lambda{ raise $! unless quiet }
340:       @heartbeat.stop rescue failure.call 
341:       Process::kill('SIGUSR2', @pid) rescue failure.call 
342:       @shutdown = true
343: #--}}}
344:     end

true

[Source]

     # File lib/slave.rb, line 348
348:     def shutdown?
349: #--{{{
350:       @shutdown
351: #--}}}
352:     end

debugging output - ENV[‘SLAVE_DEBUG’]=1 to enable

[Source]

     # File lib/slave.rb, line 380
380:     def trace
381: #--{{{
382:       STDERR.puts(yield) if @debug and STDERR.tty?
383: #--}}}
384:     end

wait for slave to finish. if the keyword ‘non_block’=>true is given a thread is returned to do the waiting in an async fashion. eg

  thread = slave.wait(:non_block=>true){|value| "background <#{ value }>"}

[Source]

     # File lib/slave.rb, line 323
323:     def wait opts = {}, &b
324: #--{{{
325:       b ||= lambda{|exit_status|}
326:       non_block = getopts(opts)['non_block']
327:       non_block ? Thread.new{ b[ @waiter.value ] } : b[ @waiter.value ]
328: #--}}}
329:     end
wait2opts = {}, &b

Alias for wait

[Validate]