Skip to content
Snippets Groups Projects
Select Git revision
  • 291582ea1b71c03bd0a1ab1a08800a03348751fb
  • master default
  • pcl_expect_0_1_0
3 results

pcl_expect.py

Blame
  • pcl_expect.py 7.41 KiB
    import os
    import re
    import select
    import sets
    import sys
    import types
    
    #
    # Global variables.  The user are supposed to change these.
    #
    
    # Default timeout, in seconds, as a floating point number.
    timeout = 10.0
    
    # If a timeout occurs and no timeout handler is specified, should an
    # exception be raised, or should the code just continue (TCL-style)?
    # Default is to raise an exception.
    timeout_raises_exception = True
    
    #
    # Internal code
    #
    
    class Timeout(Exception): pass
    
    class expectable:
        def __init__(self, fileno):
            self.__fileno = fileno
            self.__buffer = ""
            self.__eof_seen = False
    
        def fileno(self):
            return self.__fileno
    
        def fill_buffer(self):
            if not self.__eof_seen:
                s = os.read(self.fileno(), 8192)
                if s == "":
                    debug("got eof from fd %d" % self.fileno())
                    self.__eof_seen = True
                else:
                    debug("got %d bytes from fd %d" % (len(s), self.fileno()))
                    self.__buffer = self.__buffer + s
            else:
                debug("eof already seen on fd %d; not reading" % self.fileno())
            return self.__eof_seen
    
        def eof(self):
            """Return True if end-of-file has been seen.
    
            There might still be pending data in the buffer, though.
            """
            return self.__eof_seen
    
        def buffer(self):
            return self.__buffer
    
        def remove_first(self, n):
            self.__buffer = self.__buffer[n:]
    
    class popen(expectable):
        def __init__(self, cmd):
            self.__cmd = os.popen(cmd, "r")
            expectable.__init__(self, self.__cmd.fileno())
    
        def close(self):
            return self.__cmd.close()
    
    class impl:
        def __init__(self, timeout = None):
            self.__first = True
            self.__inputs = sets.Set()
            self.__readable = sets.Set()
            self.__done = False
            self.__timeout = timeout
            self.__timeout_active = False
            self.__acted = False
            self.__all_inputs_seen = False
            debug("START")
    
        def loop(self):
            debug("at top of loop")
            if self.__done:
                debug("done -- exiting expect")
                return False
    
            if self.__first:
                self.__first = False
                debug("first time round")
                self.__acted = False
                if _expect_before():
                    return False
                return True
            elif not self.__acted:
                debug("no action taken during previous loop")
                self.__all_inputs_seen = True
                if _expect_after():
                    return False
    
            self.__acted = False
    
            # Nothing handled the timeout event.
            if self.__timeout_active:
                debug("Unhandled timeout")
                if timeout_raises_exception:
                    raise Timeout()
                else:
                    return False
    
            r = ", ".join([str(x.fileno()) for x in self.__inputs])
            if not self.__all_inputs_seen:
                t = 0
            elif self.__timeout == None:
                t = timeout
            else:
                t = self.__timeout
            if t > 0:
                debug("Waiting for input on %s" % r)
            else:
                debug("Polling for input on %s" % r)
            (r, w, e) = select.select([x for x in self.__inputs], [], [], t)
            self.__readable = sets.Set(r)
            if len(self.__readable) == 0:
                if self.__all_inputs_seen:
                    debug("Processing timeout event")
                    self.__timeout_active = True
            else:
                debug("Input available")
            if _expect_before():
                return False
            return True
    
    
        def __fill_buffer(self, exp):
            if exp in self.__readable:
                debug("reading from fd %d" % exp.fileno())
                self.__readable.remove(exp)
                if exp.fill_buffer():
                    debug("got eof on fd %d" % exp.fileno())
                    self.__inputs.discard(exp)
    
        def re(self, exp, regexp):
            """Implement "when re foo, foo_re".
    
            EXP is an event source that adheres to the expectable API.
            REGEXP is a regular expression to match.  This can be a string
            (that will be compiled using re.compile) or a compiled regexp
            object.
    
            Return true if a match was found.  The match attribute of EXP
            will be set to the result of the match.  The matching string
            will have been removed from the buffer of EXP upon return.
            """
    
            if self.__acted:
                return False
    
            if self.__done:
                raise "foo! me bad!"
    
            self.__fill_buffer(exp)
    
            debug("does \"%s\" match \"%s\" (fd %d)?" % (
                exp.buffer(), regexp, exp.fileno()))
    
            # Doing this compilation again and again could be a problem.
            # I rely on the cache in module re.  I hope it exists...
            if isinstance(regexp, types.StringTypes):
                regexp = re.compile(regexp)
    
            match = regexp.search(exp.buffer())
            if match != None:
                debug("yes")
                exp.match = match
                exp.remove_first(match.end())
                self.__done = True
                self.__acted = True
                return True
            else:
                debug("no")
                if not exp.eof():
                    debug("adding fd %d to rd-set" % exp.fileno())
                    self.__inputs.add(exp)
                return False
    
        def eof(self, exp):
            """Implement "when eof foo".
            """
            if self.__acted:
                return False
            
            if self.__done:
                raise "foo! me bad!"
    
            self.__fill_buffer(exp)
    
            if exp.eof():
                debug("eof seen on fd %d" % exp.fileno())
                exp.match = exp.buffer()
                exp.remove_first(len(exp.match))
                self.__done = True
                self.__acted = True
                return True
            else:
                debug("no eof on fd %d" % exp.fileno())
                debug("adding fd %d to rd-set" % exp.fileno())
                self.__inputs.add(exp)
                return False
    
        def timeout(self):
            """Implement "when timeout".
    
            Return true if a timeout has occured.
            """
    
            if self.__acted:
                return False
            
            if self.__done:
                raise "foo! me bad!"
    
            if self.__timeout_active:
                debug("eof pending")
                self.__timeout_active = False
                self.__done = True
                self.__acted = True
                return True
            else:
                debug("no eof pending")
                return False
    
        def cont(self):
            debug("cont called")
            self.__done = False
            self.__first = True
            # Don't touch self.__acted
    
    def _expect_before():
        return False
    
    def _expect_after():
        return False
    
    def debug(s):
        if 0:
            sys.stderr.write("pcl-expect: %s\n" % s)
    
    if __name__ == '__main__':
        df = popen("df -k")
    
        x = impl()
        while x.loop():
            if x.re(df, "Filesystem.*\n"):
                print "Header:", df.match.group()
    
        x = impl()
        while x.loop():
            if x.re(df, "^/.* .*\n"):
                print "Normal line:", df.match.group()
                x.cont()
                continue
            elif x.re(df, "^/.*\n"):
                print "Mount point only:", df.match.group()
                x.cont()
                continue
            elif x.re(df, "^ .*\n"):
                print "Info only:", df.match.group()
                x.cont()
                continue
            elif x.re(df, "^.*\n"):
                print "Unexpected line", df.match.group()
                x.cont()
                continue
            elif x.eof(df):
                print "And that's all, folks!"