Skip to content
Snippets Groups Projects
Commit ccf6fde9 authored by Per Cederqvist's avatar Per Cederqvist
Browse files

New file, extracted from pcl_expect.py.

(__all__): New constant.
parent ea80b084
No related branches found
No related tags found
No related merge requests found
import os
import pcl_expect
__all__ = [
"popen",
]
class popen(pcl_expect.expectable):
def __init__(self, cmd):
self.__cmd = os.popen(cmd, "r")
pcl_expect.expectable.__init__(self, self.__cmd.fileno())
def close(self):
return self.__cmd.close()
import os
import pty
import fcntl
import pcl_expect
__all__ = [
"inherit_stty",
"stty_init",
"spawn",
]
# When a new pty is created by spawn(), the terminal settings will be
# inherited from the settings of stdin of the current process.
# Setting inherit_stty to False will disable this inheritance. After
# the inheritance (if any), if stty_init is set to a string, stty will
# be run with that string as its argument.
inherit_stty = True
stty_init = None
def set_cloexec_flag(fd):
old = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
class spawn(pcl_expect.expectable):
def __init__(self, cmd):
if inherit_stty:
f = os.popen("stty -g")
settings = f.read()
f.close()
if isinstance(cmd, basestring):
cmd = cmd.split(" ")
(r, w) = os.pipe()
set_cloexec_flag(w)
self.__child, self.__pty = pty.fork()
if self.__child == 0:
try:
os.close(r)
if inherit_stty:
os.system("stty %s" % settings)
if stty_init is not None:
os.system("stty %s" % stty_init)
try:
os.execvp(cmd[0], cmd)
except OSError:
os.write(w, "exec failed\n")
finally:
os._exit(1)
else:
os.close(w)
res = os.read(r, 100)
os.close(r)
if res != "":
os.close(self.__pty)
os.waitpid(self.__child, 0)
raise OSError("exec failed")
pcl_expect.expectable.__init__(self, self.__pty)
def send(self, s):
pcl_expect.debug("sending \"%s\" to fd %d" % (s, self.fileno()))
os.write(self.__pty, s)
def close(self):
os.close(self.__pty)
return os.waitpid(self.__child, 0)
import telnetlib
import pcl_expect
__all__ = [
"telnet",
]
class telnet(pcl_expect.expectable):
def __init__(self, host, port):
self.telnet = telnetlib.Telnet(host, port)
pcl_expect.expectable.__init__(self, self.telnet.fileno())
def _read(self):
try:
s = self.telnet.read_eager()
except EOFError:
return "", True
return s, False
def send(self, s):
self.telnet.write(s)
def close(self):
self.telnet.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment