Skip to content
Snippets Groups Projects
Commit e773befe authored by Per Cederqvist's avatar Per Cederqvist
Browse files

New file, extracted from pcl_expect.py.

(__all__): New constant.
(controller): New name for former class impl.
parent ccf6fde9
Branches
Tags
No related merge requests found
"""An extensible expect module with more expect-like feeling.
FIXME: more doc needed.
"""
import sets
import os
import re
import errno
import select
__all__ = [
"timeout",
"timeout_raises_exception",
"RE",
"EOF",
"TIMEOUT",
"CONT",
"Timeout",
"BadArgs",
"expectable",
"controller",
"expect_after",
"expect_before",
"expect",
]
# Default timeout, in seconds, as a floating point number.
# The user is supposed to change this as needed.
timeout = 10.0
# If a timeout occurs and no timeout handler is specified, should an
# exception be raised, or should the code just continue (TCL-style)?
# Default is to raise an exception. The user is supposed to change
# this as needed.
timeout_raises_exception = True
# Constants that identify actions for expect_after and expect_before.
RE = 0
EOF = 1
TIMEOUT = 2
# Return values from callbacks.
CONT = 3
class Timeout(Exception): pass
class BadArgs(Exception): pass
_expect_before = []
_expect_after = []
def debug(s):
if 0:
sys.stderr.write("pcl-expect: %s\n" % s)
class expectable:
def __init__(self, fileno):
self.__fileno = fileno
self.__buffer = ""
self.__eof_seen = False
def fileno(self):
return self.__fileno
def _read(self):
try:
s = os.read(self.fileno(), 8192)
except OSError, e:
if e.errno == errno.EIO:
debug("got EIO from fd %d" % self.fileno())
s = ""
else:
raise
if s == "":
debug("got eof from fd %d" % self.fileno())
return "", True
else:
return s, False
def fill_buffer(self):
if not self.__eof_seen:
s, eof = self._read()
if eof:
self.__eof_seen = True
else:
debug("got %d bytes from fd %d" % (len(s), self.fileno()))
self.__buffer = self.__buffer + s
else:
debug("eof already seen on fd %d; not reading" % self.fileno())
return self.__eof_seen
def eof(self):
"""Return True if end-of-file has been seen.
There might still be pending data in the buffer, though.
"""
return self.__eof_seen
def buffer(self):
return self.__buffer
def remove_first(self, n):
self.__buffer = self.__buffer[n:]
class controller:
def __init__(self, timeout = None):
self.__first = True
self.__inputs = sets.Set()
self.__readable = sets.Set()
self.__done = False
self.__timeout = timeout
self.__timeout_active = False
self.__acted = False
self.__all_inputs_seen = False
debug("START")
def loop(self):
debug("at top of loop")
if self.__done:
debug("done -- exiting expect")
return False
if self.__first:
self.__first = False
debug("first time round")
self.__acted = False
if self.__expect_before():
return False
return True
elif not self.__acted:
debug("no action taken during previous loop")
self.__all_inputs_seen = True
if self.__expect_after():
return False
self.__acted = False
# Nothing handled the timeout event.
if self.__timeout_active:
debug("Unhandled timeout")
if timeout_raises_exception:
raise Timeout()
else:
return False
r = ", ".join([str(x.fileno()) for x in self.__inputs])
if not self.__all_inputs_seen:
t = 0
elif self.__timeout == None:
t = timeout
else:
t = self.__timeout
if t > 0:
debug("Waiting for input on %s" % r)
else:
debug("Polling for input on %s" % r)
(r, w, e) = select.select([x for x in self.__inputs], [], [], t)
self.__readable = sets.Set(r)
if len(self.__readable) == 0:
if self.__all_inputs_seen:
debug("Processing timeout event")
self.__timeout_active = True
else:
debug("Input available")
if self.__expect_before():
return False
return True
def __fill_buffer(self, exp):
if exp in self.__readable:
debug("reading from fd %d" % exp.fileno())
self.__readable.remove(exp)
if exp.fill_buffer():
debug("got eof on fd %d" % exp.fileno())
self.__inputs.discard(exp)
def re(self, exp, regexp):
"""Implement "when re foo, foo_re".
EXP is an event source that adheres to the expectable API.
REGEXP is a regular expression to match. This can be a string
(that will be compiled using re.compile) or a compiled regexp
object.
Return true if a match was found. The match attribute of EXP
will be set to the result of the match. The matching string
will have been removed from the buffer of EXP upon return.
"""
if self.__acted:
return False
if self.__done:
raise "foo! me bad!"
self.__fill_buffer(exp)
debug("does %s match %s (fd %d)?" % (
repr(exp.buffer()), repr(regexp), exp.fileno()))
# Doing this compilation again and again could be a problem.
# I rely on the cache in module re. I hope it exists...
if isinstance(regexp, basestring):
regexp = re.compile(regexp)
match = regexp.search(exp.buffer())
if match != None:
debug("yes")
exp.match = match
exp.remove_first(match.end())
self.__done = True
self.__acted = True
return True
else:
debug("no")
if not exp.eof():
debug("adding fd %d to rd-set" % exp.fileno())
self.__inputs.add(exp)
return False
def eof(self, exp):
"""Implement "when eof foo".
"""
if self.__acted:
return False
if self.__done:
raise "foo! me bad!"
self.__fill_buffer(exp)
if exp.eof():
debug("eof seen on fd %d" % exp.fileno())
exp.match = exp.buffer()
exp.remove_first(len(exp.match))
self.__done = True
self.__acted = True
return True
else:
debug("no eof on fd %d" % exp.fileno())
debug("adding fd %d to rd-set" % exp.fileno())
self.__inputs.add(exp)
return False
def timeout(self):
"""Implement "when timeout".
Return true if a timeout has occured.
"""
if self.__acted:
return False
if self.__done:
raise "foo! me bad!"
if self.__timeout_active:
debug("eof pending")
self.__timeout_active = False
self.__done = True
self.__acted = True
return True
else:
debug("no eof pending")
return False
def cont(self):
debug("cont called")
self.__done = False
self.__first = True
# Don't touch self.__acted
def __expect_before(self):
debug("running expect_before patterns...")
ret = self.__run_expectations(_expect_before)
debug("done running expect_before patterns: %s." % ret)
return ret
def __expect_after(self):
debug("running expect_after patterns...")
ret = self.__run_expectations(_expect_after)
debug("done running expect_after patterns: %s." % ret)
return ret
def __run_expectations(self, expectations):
for pattern in expectations:
debug("running pattern %s" % (pattern,))
if pattern[0] == RE:
(cmd, exp, regexp, callback) = pattern
if self.re(exp, regexp):
if callback(exp) == CONT:
self.cont()
return False
else:
return True
elif pattern[0] == EOF:
(cmd, exp, callback) = pattern
if self.eof(exp):
if callback(exp) == CONT:
self.cont()
return False
else:
return True
elif pattern[0] == TIMEOUT:
(cmd, callback) = pattern
if self.timeout():
if callback() == CONT:
self.cont()
return False
else:
return True
else:
raise BadArgs
debug("no match")
return False
def expect_after(expectations):
"""Set a list of expect patterns to run if no other matches are found.
The argument is a list of tuples. These tuples are understood:
(RE, exp, regexp, callback): If output from EXP matches REGEXP,
call CALLBACK.
(EOF, exp, callback): If end-of-file is reached on EXP, call CALLBACK.
(TIMEOUT, callback): If a timeout occurs, call CALLBACK.
The callback function is called with EXP as argument (or, for
timeout, with no argument). It should return None or CONT.
Returning CONT from a callback means that the expect statement
should start again.
"""
global _expect_after
__validate_expectations(expectations)
_expect_after = expectations
debug("Added expect_after patterns")
def expect_before(expectations):
"""Set a list of expect patterns to run before looking at other matches.
See expect_after for more info.
"""
global _expect_before
__validate_expectations(expectations)
_expect_before = expectations
debug("Added expect_before patterns")
def __validate_expectations(expectations):
for pattern in expectations:
if pattern[0] == RE:
(cmd, exp, regexp, callback) = pattern
elif pattern[0] == EOF:
(cmd, exp, callback) = pattern
elif pattern[0] == TIMEOUT:
(cmd, callback) = pattern
else:
raise BadArgs
def expect(expectations):
x = controller()
while x.loop():
for ix in range(len(expectations)):
pattern = expectations[ix]
if pattern[0] == RE:
if len(pattern) == 3:
(cmd, exp, regexp) = pattern
callback = None
elif len(pattern) == 4:
(cmd, exp, regexp, callback) = pattern
else:
raise BadArgs
if x.re(exp, regexp):
if callback is not None and callback(exp) == CONT:
x.cont()
break
else:
return ix, exp
elif pattern[0] == EOF:
if len(pattern) == 2:
(cmd, exp) = pattern
callback = None
elif len(pattern) == 3:
(cmd, exp, callback) = pattern
else:
raise BadArgs
if x.eof(exp):
if callback is not None and callback(exp) == CONT:
x.cont()
break
else:
return ix, exp
elif pattern[0] == TIMEOUT:
if len(pattern) == 1:
cmd = pattern[0]
elif len(pattern) == 2:
(cmd, callback) = pattern
else:
raise BadArgs
if x.timeout():
if callback is not None and callback() == CONT:
x.cont()
break
else:
return ix, None
return None, None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment