Thursday, April 3, 2008

Calling asynchronous Twisted Python code from a blocking function

So I had to write a VAPI-XP test for Mercury/HP Quality Center, and the way that this particular test type works is that you write a script in Python, VBScript, or Perl. This script must have a TestMain function that is called and as soon as that function exits the test is over and everything exits. This test had to SSH into one of the machines in our virtualized test bed, and tell the machine to netboot in order to pick up the newest build of our server product.

I could have just used a call to os.popen() to some SSH client on our intermediary testing host but I had have bad experiences with that in the past, mostly weird behavior with OpenSSH and plink. I had written some SSH code with the Twisted module Conch before so I decided to just go ahead and use that. So got the SSH code up and running no problem, but how was I to block execution until the event was triggered letting me know that the session was complete? Furthermore how was I going to get my data back without doing something dirty like a global variable?

I talked to some guys on IRC and came up with the solution below. The SSH code itself isn't that interesting and I've only really included it for completeness the important bits to note is the call to "threads.blockingCallFromThread" in RunSSHCommand and note that I'm passing the deferred that I create in "_runSSHCommand" through each of the SSH classes (usually 'd' or 'self._d') and that it is finally being called in the method "CommandChannel.closed".

#!/usr/bin/python

class ClientTransport(transport.SSHClientTransport):

   def __init__(self, user, command, d):
       self._user = user
       self._command = command
       self._d = d

   def verifyHostKey(self, pubKey, fingerprint):
       #we don't care accept any host key
       return defer.succeed(1)

   def connectionSecure(self):
       #connection made, instiantiate the class that will handle authentication
       #and pass an instance of the class that embodies the user behavior as a param
       self.requestService(ClientUserAuth(self._user, ClientConnection(self._command, self._d)))


class ClientUserAuth(userauth.SSHUserAuthClient):

   def getPassword(self, prompt = None):
       # this says we won't do password authentication        
       return

   def getPublicKey(self):
       #return the public key which is defined up top as a string
       return keys.getPublicKeyString(data=SSH_PUB_KEY)

   def getPrivateKey(self):
       #return the pricate key which is also defined up top as a string
       return defer.succeed(keys.getPrivateKeyObject(data=SSH_PRIV_KEY))


class ClientConnection(connection.SSHConnection):

   def __init__(self, cmd, d, *args, **kwargs):
       connection.SSHConnection.__init__(self)
       self._command = cmd
       self._d = d


   def serviceStarted(self):
       self.openChannel(CommandChannel(self._command, self._d, conn=self))


class CommandChannel(channel.SSHChannel):
   name = 'session'

   def __init__(self, command, d, *args, **kwargs):
       channel.SSHChannel.__init__(self, *args, **kwargs)
       self.command = command
       self.d = d
       self.data = ""

   def channelOpen(self, data):
       #send an execute request passing the user supplied string.
       self.conn.sendRequest(self,
                             'exec',
                             common.NS(self.command),
                             wantReply=True
                             ).addCallback(self._gotResponse)

   def _gotResponse(self, _):
       #command has returned, send an EOF in order to terminate the connection
       self.conn.sendEOF(self)

   def dataReceived(self, data):
       #append the output data to the string
       self.data = self.data + data

   def closed(self):
       #connection closed, execute the callback on the deferred
       #that we have been passing around
       self.d.callback(self.data)


class ClientCommandFactory(protocol.ClientFactory):
   #factory class for our SSH protocol
  
   def __init__(self, user, command, d):
       self._user = user
       self._command = command
       self._d = d

   def buildProtocol(self, addr):
       protocol = ClientTransport(self._user, self._command, self._d)
       return protocol


def RunSSHCommand(hostname, port, user, cmd):
   #Run an SSH command as a user on a given host, we only use key authentication
   thread.start_new_thread(reactor.run, (False,))
   try:
       return threads.blockingCallFromThread(reactor, _runSSHCommand, hostname, port, user, cmd)
   except:
       return None
  
   reactor.stop()

def _runSSHCommand(hostname, port, user, cmd):
   d = defer.Deferred()
   factory = ClientCommandFactory(user, cmd, d)   
   reactor.connectTCP(hostname, 22, factory)
   return d
  

if __name__ == "__main__":
   RunSSHCommand(hostname, port, user, "register_netboot")

Print this post

No comments: