Showing posts with label python. Show all posts
Showing posts with label python. Show all posts

Saturday, March 27, 2010

Fetching all keys for a column using Cassandra's Thrift API

Figured out how to get all of the Cassandra key/value pairs for a column using the thrift API. The documentation on top of the horrible naming scheme definitely doesn't make it easy to figure things out quickly. Below is an example in Python

#/usr/bin/env python

from thrift import Thrift
from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.protocol.TBinaryProtocol import TBinaryProtocolAccelerated
from cassandra import Cassandra
from cassandra.ttypes import *
import time

socket = TSocket.TSocket("localhost", 9160)
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
client = Cassandra.Client(protocol)

keyspace = "MyUberSite"
user_uuid = "0ad503dd-2642-4a1e-9113-a75bfd183c34"

try:
    transport.open()

    print "UUID: ", user_uuid, "\n"

    # Create a column (user record) who's name is a UUID.
    # Add two key/value pairs:  one for email, one for username

    column_path = ColumnPath(column_family="Users", column=user_uuid)

    client.insert(
        keyspace,
        "email",
        column_path,
        "email@example.com",
        time.time(),
        ConsistencyLevel.ZERO)

    client.insert(
        keyspace,
        "username",
        column_path,
        "user_account_name",
        time.time(),
        ConsistencyLevel.ZERO)

    # Which column family are we interested in querying.
    column_parent = ColumnParent(column_family="Users")

    # A slice dictates our start and stop for column names we are interested in.
    # We only want records for one column (user) so we are going to set the 
    # start and stop to the same values which is the UUID of the user we 
    # created above. 
    slice_range = SliceRange(
        start=user_uuid, 
        finish=user_uuid)

    # Create our predicate using the range instantiated above.
    predicate = SlicePredicate(slice_range=slice_range)

    # We want all of the column's (user's) keys so we are going to specify an
    # empty key range.  If we wanted a subset of the columns keys we could
    # specify that subset here.  The range is from start to stop using the
    # sort we specified when we declared our column family in storage-conf.xml.
    key_range = KeyRange("", "")

    # Perform the query and pring the results.
    print "\n\n".join(
        map(
            repr, 
            client.get_range_slices(
                keyspace, 
                column_parent, 
                predicate, 
                key_range, 
                ConsistencyLevel.ONE)))

except Thrift.TException, tx:
    print 'Thrift: %s' % tx.message

finally:
    transport.close()

Thursday, April 3, 2008

Calling asynchronous Twisted Python code from a blocking function

So I had to write a VAPI-XP test for Mercury/HP Quality Center, and the way that this particular test type works is that you write a script in Python, VBScript, or Perl. This script must have a TestMain function that is called and as soon as that function exits the test is over and everything exits. This test had to SSH into one of the machines in our virtualized test bed, and tell the machine to netboot in order to pick up the newest build of our server product.

I could have just used a call to os.popen() to some SSH client on our intermediary testing host but I had have bad experiences with that in the past, mostly weird behavior with OpenSSH and plink. I had written some SSH code with the Twisted module Conch before so I decided to just go ahead and use that. So got the SSH code up and running no problem, but how was I to block execution until the event was triggered letting me know that the session was complete? Furthermore how was I going to get my data back without doing something dirty like a global variable?

I talked to some guys on IRC and came up with the solution below. The SSH code itself isn't that interesting and I've only really included it for completeness the important bits to note is the call to "threads.blockingCallFromThread" in RunSSHCommand and note that I'm passing the deferred that I create in "_runSSHCommand" through each of the SSH classes (usually 'd' or 'self._d') and that it is finally being called in the method "CommandChannel.closed".

#!/usr/bin/python

class ClientTransport(transport.SSHClientTransport):

   def __init__(self, user, command, d):
       self._user = user
       self._command = command
       self._d = d

   def verifyHostKey(self, pubKey, fingerprint):
       #we don't care accept any host key
       return defer.succeed(1)

   def connectionSecure(self):
       #connection made, instiantiate the class that will handle authentication
       #and pass an instance of the class that embodies the user behavior as a param
       self.requestService(ClientUserAuth(self._user, ClientConnection(self._command, self._d)))


class ClientUserAuth(userauth.SSHUserAuthClient):

   def getPassword(self, prompt = None):
       # this says we won't do password authentication        
       return

   def getPublicKey(self):
       #return the public key which is defined up top as a string
       return keys.getPublicKeyString(data=SSH_PUB_KEY)

   def getPrivateKey(self):
       #return the pricate key which is also defined up top as a string
       return defer.succeed(keys.getPrivateKeyObject(data=SSH_PRIV_KEY))


class ClientConnection(connection.SSHConnection):

   def __init__(self, cmd, d, *args, **kwargs):
       connection.SSHConnection.__init__(self)
       self._command = cmd
       self._d = d


   def serviceStarted(self):
       self.openChannel(CommandChannel(self._command, self._d, conn=self))


class CommandChannel(channel.SSHChannel):
   name = 'session'

   def __init__(self, command, d, *args, **kwargs):
       channel.SSHChannel.__init__(self, *args, **kwargs)
       self.command = command
       self.d = d
       self.data = ""

   def channelOpen(self, data):
       #send an execute request passing the user supplied string.
       self.conn.sendRequest(self,
                             'exec',
                             common.NS(self.command),
                             wantReply=True
                             ).addCallback(self._gotResponse)

   def _gotResponse(self, _):
       #command has returned, send an EOF in order to terminate the connection
       self.conn.sendEOF(self)

   def dataReceived(self, data):
       #append the output data to the string
       self.data = self.data + data

   def closed(self):
       #connection closed, execute the callback on the deferred
       #that we have been passing around
       self.d.callback(self.data)


class ClientCommandFactory(protocol.ClientFactory):
   #factory class for our SSH protocol
  
   def __init__(self, user, command, d):
       self._user = user
       self._command = command
       self._d = d

   def buildProtocol(self, addr):
       protocol = ClientTransport(self._user, self._command, self._d)
       return protocol


def RunSSHCommand(hostname, port, user, cmd):
   #Run an SSH command as a user on a given host, we only use key authentication
   thread.start_new_thread(reactor.run, (False,))
   try:
       return threads.blockingCallFromThread(reactor, _runSSHCommand, hostname, port, user, cmd)
   except:
       return None
  
   reactor.stop()

def _runSSHCommand(hostname, port, user, cmd):
   d = defer.Deferred()
   factory = ClientCommandFactory(user, cmd, d)   
   reactor.connectTCP(hostname, 22, factory)
   return d
  

if __name__ == "__main__":
   RunSSHCommand(hostname, port, user, "register_netboot")