diff --git a/test/test_environment.py b/test/test_environment.py new file mode 100644 index 0000000000000000000000000000000000000000..6eb4acd505b040e46ec120b3119b30c13ee16957 --- /dev/null +++ b/test/test_environment.py @@ -0,0 +1,674 @@ +# Test the interaction between the various ways to set timeouts et c. + +import os +import time +import signal + +import pcl_expect +import pcl_expect.spawn + +import test.base + +class TestEnvironment(test.base.TestCase): + def terminate_sleeper(self, env, sleeper): + sleeper.kill(signal.SIGINT) + x = env.controller() + while x.loop(): + if sleeper.re(x, '.'): + self.fail('got output from sleep') + elif x.timeout(): + self.fail('timeout waiting for sleep to die') + elif sleeper.eof(x): + break + pid, status = sleeper.close() + self.assertEqual(os.WIFSIGNALED(status), True) + self.assertEqual(os.WTERMSIG(status), signal.SIGINT) + + def test_spawn_timeout_1(self): + t0 = time.time() + env = pcl_expect.Environment() + env.set_timeout(1) + sleeper = pcl_expect.spawn.Spawn(['sleep', '5']) + x = env.controller() + while x.loop(): + if sleeper.re(x, '.'): + self.fail('got output from sleep') + elif x.timeout(): + break + elif sleeper.eof(x): + self.fail('eof waiting for timeout') + t1 = time.time() + self.assertTimeDiff(t0, t1, 1.0) + self.terminate_sleeper(env, sleeper) + + def test_spawn_timeout_5_1(self): + t0 = time.time() + env = pcl_expect.Environment() + env.set_timeout(.5) + sleeper = pcl_expect.spawn.Spawn(['sleep', '7']) + x = env.controller() + while x.loop(): + if sleeper.re(x, '.'): + self.fail('got output from sleep') + elif x.timeout(): + break + elif sleeper.eof(x): + self.fail('eof waiting for timeout') + t1 = time.time() + self.assertTimeDiff(t0, t1, 0.5) + env.set_timeout(1) + x = env.controller() + while x.loop(): + if sleeper.re(x, '.'): + self.fail('got output from sleep') + elif x.timeout(): + break + elif sleeper.eof(x): + self.fail('eof waiting for timeout') + t2 = time.time() + self.assertTimeDiff(t0, t2, 1.5) + self.terminate_sleeper(env, sleeper) + + def test_spawn_timeout_1_environment_overrides_global(self): + old = pcl_expect.timeout + pcl_expect.timeout = 2 + env = pcl_expect.Environment() + pcl_expect.timeout = 1 + t0 = time.time() + sleeper = pcl_expect.spawn.Spawn(['sleep', '5']) + x = env.controller() + while x.loop(): + if sleeper.re(x, '.'): + self.fail('got output from sleep') + elif x.timeout(): + break + elif sleeper.eof(x): + self.fail('eof waiting for timeout') + t1 = time.time() + self.assertTimeDiff(t0, t1, 1.0) + self.terminate_sleeper(env, sleeper) + + def expect_exception(self, controller): + sleeper = pcl_expect.spawn.Spawn(['sleep', '5']) + t0 = time.time() + try: + while controller.loop(): + #t1 = time.time() + #if t1 - t0 > 2: + # self.fail('no exception raised') + if sleeper.re(controller, '.'): + self.fail('got output from sleep') + elif sleeper.eof(controller): + self.fail('eof waiting for timeout') + self.fail('got no exception') + except pcl_expect.Timeout: + pass + t1 = time.time() + self.assertTimeDiff(t0, t1, .5) + + def expect_no_exception(self, controller): + sleeper = pcl_expect.spawn.Spawn(['sleep', '5']) + t0 = time.time() + try: + while controller.loop(): + #t1 = time.time() + #if t1 - t0 > 2: + # self.fail('still in loop') + if sleeper.re(controller, '.'): + self.fail('got output from sleep') + elif sleeper.eof(controller): + self.fail('eof waiting for timeout') + except pcl_expect.Timeout: + self.fail('got exception') + t1 = time.time() + self.assertTimeDiff(t0, t1, .5) + + # Helper methods. See below. + + def vrfy_tre_1110(self, a, b, c): + pcl_expect.timeout_raises_exception = a + pcl_expect.default_environment.set_timeout_raises_exception(b) + + env = pcl_expect.Environment() + env.set_timeout(.5) + env.set_timeout_raises_exception(c) + if c: + self.expect_exception(env.controller()) + else: + self.expect_no_exception(env.controller()) + + def vrfy_tre_0110(self, b, c): + pcl_expect.default_environment.set_timeout_raises_exception(b) + + env = pcl_expect.Environment() + env.set_timeout(.5) + env.set_timeout_raises_exception(c) + if c: + self.expect_exception(env.controller()) + else: + self.expect_no_exception(env.controller()) + + def vrfy_tre_1111(self, a, b, c, d): + pcl_expect.timeout_raises_exception = a + pcl_expect.default_environment.set_timeout_raises_exception(b) + + env = pcl_expect.Environment() + env.set_timeout(.5) + env.set_timeout_raises_exception(c) + ctrl = pcl_expect.Controller(env=env, timeout_raises_exception=d) + if d: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + def vrfy_tre_0111(self, b, c, d): + pcl_expect.default_environment.set_timeout_raises_exception(b) + + env = pcl_expect.Environment() + env.set_timeout(.5) + env.set_timeout_raises_exception(c) + ctrl = pcl_expect.Controller(env=env, timeout_raises_exception=d) + if d: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + def vrfy_tre_1011(self, a, c, d): + pcl_expect.timeout_raises_exception = a + + env = pcl_expect.Environment() + env.set_timeout(.5) + env.set_timeout_raises_exception(c) + ctrl = pcl_expect.Controller(env=env, timeout_raises_exception=d) + if d: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + def vrfy_tre_0011(self, c, d): + env = pcl_expect.Environment() + env.set_timeout(.5) + env.set_timeout_raises_exception(c) + ctrl = pcl_expect.Controller(env=env, timeout_raises_exception=d) + if d: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + def vrfy_tre_1001(self, a, d): + pcl_expect.timeout_raises_exception = a + + env = pcl_expect.Environment() + env.set_timeout(.5) + ctrl = pcl_expect.Controller(env=env, timeout_raises_exception=d) + if d: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + def vrfy_tre_10n1(self, a, d): + pcl_expect.timeout_raises_exception = a + + ctrl = pcl_expect.Controller(0.5, timeout_raises_exception=d) + if d: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + + def vrfy_tre_1010(self, a, c): + pcl_expect.timeout_raises_exception = a + + env = pcl_expect.Environment() + env.set_timeout(.5) + env.set_timeout_raises_exception(c) + ctrl = env.controller() + if c: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + def vrfy_tre_1101(self, a, b, d): + pcl_expect.timeout_raises_exception = a + pcl_expect.default_environment.set_timeout_raises_exception(b) + + env = pcl_expect.Environment() + env.set_timeout(.5) + ctrl = pcl_expect.Controller(env=env, timeout_raises_exception=d) + if d: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + def vrfy_tre_0101(self, b, d): + pcl_expect.default_environment.set_timeout_raises_exception(b) + + env = pcl_expect.Environment() + env.set_timeout(.5) + ctrl = pcl_expect.Controller(env=env, timeout_raises_exception=d) + if d: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + def vrfy_tre_1100(self, a, b): + pcl_expect.timeout_raises_exception = a + pcl_expect.default_environment.set_timeout_raises_exception(b) + + env = pcl_expect.Environment() + env.set_timeout(.5) + ctrl = env.controller() + if a: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + def vrfy_tre_11n1(self, a, b, d): + pcl_expect.timeout_raises_exception = a + pcl_expect.default_environment.set_timeout_raises_exception(b) + ctrl = pcl_expect.Controller(0.5, timeout_raises_exception=d) + if d: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + def vrfy_tre_01n1(self, b, d): + pcl_expect.default_environment.set_timeout_raises_exception(b) + ctrl = pcl_expect.Controller(0.5, timeout_raises_exception=d) + if d: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + def vrfy_tre_11n0(self, a, b): + pcl_expect.timeout_raises_exception = a + pcl_expect.default_environment.set_timeout_raises_exception(b) + ctrl = pcl_expect.Controller(0.5) + if b: + self.expect_exception(ctrl) + else: + self.expect_no_exception(ctrl) + + # Test all combinations of setting timeout_raises_exception, to + # make sure the proper value is listened to. These methods have + # name that follow this pattern: + # + # test_tre_abcd + # + # where abcd have the following meaning: + # + # a - TFx: setting of pcl_expect.timeout_raises_exception + # b - TFx: setting of default_environment.timeout_raises_exception + # c - TFxn: setting of Environment.timeout_raises_exception + # d - TFx: setting of Controller.timeout_raises_exception + # + # and where TFxn indicates the possible values: + # + # T: True + # F: False + # x: not set + # n: no explicit Environment object used/not applicable + + def test_tre_TTTT(self): + self.vrfy_tre_1111(True, True, True, True) + + def test_tre_TTTF(self): + self.vrfy_tre_1111(True, True, True, False) + + def test_tre_TTTx(self): + pcl_expect.timeout_raises_exception = True + pcl_expect.default_environment.set_timeout_raises_exception(True) + + env = pcl_expect.Environment() + env.set_timeout(.5) + env.set_timeout_raises_exception(True) + self.expect_exception(env.controller()) + + def test_tre_TTFT(self): + self.vrfy_tre_1111(True, True, False, True) + + def test_tre_TTFF(self): + self.vrfy_tre_1111(True, True, False, False) + + def test_tre_TTFx(self): + self.vrfy_tre_1110(True, True, False) + + def test_tre_TTnT(self): + self.vrfy_tre_11n1(True, True, True) + + def test_tre_TTnF(self): + self.vrfy_tre_11n1(True, True, False) + + def test_tre_TTnx(self): + self.vrfy_tre_11n0(True, True) + + def test_tre_TTxT(self): + self.vrfy_tre_1101(True, True, True) + + def test_tre_TTxF(self): + self.vrfy_tre_1101(True, True, False) + + def test_tre_TTxx(self): + self.vrfy_tre_1100(True, True) + + def test_tre_TFTT(self): + self.vrfy_tre_1111(True, False, True, True) + + def test_tre_TFTF(self): + self.vrfy_tre_1111(True, False, True, False) + + def test_tre_TFTx(self): + self.vrfy_tre_1110(True, False, True) + + def test_tre_TFFT(self): + self.vrfy_tre_1111(True, False, False, True) + + def test_tre_TFFF(self): + self.vrfy_tre_1111(True, False, False, False) + + def test_tre_TFFx(self): + self.vrfy_tre_1110(True, False, False) + + def test_tre_TFnT(self): + self.vrfy_tre_11n1(True, False, True) + + def test_tre_TFnF(self): + self.vrfy_tre_11n1(True, False, False) + + def test_tre_TFnx(self): + self.vrfy_tre_11n0(True, False) + + def test_tre_TFxT(self): + self.vrfy_tre_1101(True, False, True) + + def test_tre_TFxF(self): + self.vrfy_tre_1101(True, False, False) + + def test_tre_TFxx(self): + self.vrfy_tre_1100(True, False) + + def test_tre_TxTT(self): + self.vrfy_tre_1011(True, True, True) + + def test_tre_TxTF(self): + self.vrfy_tre_1011(True, True, False) + + def test_tre_TxTx(self): + self.vrfy_tre_1010(True, True) + + def test_tre_TxFT(self): + self.vrfy_tre_1011(True, False, True) + + def test_tre_TxFF(self): + self.vrfy_tre_1011(True, False, False) + + def test_tre_TxFx(self): + self.vrfy_tre_1010(True, False) + + def test_tre_TxnT(self): + self.vrfy_tre_10n1(True, True) + + def test_tre_TxnF(self): + self.vrfy_tre_10n1(True, False) + + def test_tre_Txnx(self): + pcl_expect.timeout_raises_exception = True + self.expect_exception(pcl_expect.Controller(0.5)) + + def test_tre_TxxT(self): + self.vrfy_tre_1001(True, True) + + def test_tre_TxxF(self): + self.vrfy_tre_1001(True, False) + + def test_tre_Txxx(self): + pcl_expect.timeout_raises_exception = True + env = pcl_expect.Environment() + env.set_timeout(.5) + self.expect_exception(env.controller()) + + def test_tre_FTTT(self): + self.vrfy_tre_1111(False, True, True, True) + + def test_tre_FTTF(self): + self.vrfy_tre_1111(False, True, True, False) + + def test_tre_FTTx(self): + self.vrfy_tre_1110(False, True, True) + + def test_tre_FTFT(self): + self.vrfy_tre_1111(False, True, False, True) + + def test_tre_FTFF(self): + self.vrfy_tre_1111(False, True, False, False) + + def test_tre_FTFx(self): + self.vrfy_tre_1110(False, True, False) + + def test_tre_FTnT(self): + self.vrfy_tre_11n1(False, True, True) + + def test_tre_FTnF(self): + self.vrfy_tre_11n1(False, True, False) + + def test_tre_FTnx(self): + self.vrfy_tre_11n0(False, True) + + def test_tre_FTxT(self): + self.vrfy_tre_1101(False, True, True) + + def test_tre_FTxF(self): + self.vrfy_tre_1101(False, True, False) + + def test_tre_FTxx(self): + self.vrfy_tre_1100(False, True) + + def test_tre_FFTT(self): + self.vrfy_tre_1111(False, False, True, True) + + def test_tre_FFTF(self): + self.vrfy_tre_1111(False, False, True, False) + + def test_tre_FFTx(self): + self.vrfy_tre_1110(False, False, True) + + def test_tre_FFFT(self): + self.vrfy_tre_1111(False, False, False, True) + + def test_tre_FFFF(self): + self.vrfy_tre_1111(False, False, False, False) + + def test_tre_FFFx(self): + self.vrfy_tre_1110(False, False, False) + + def test_tre_FFnT(self): + self.vrfy_tre_11n1(False, False, True) + + def test_tre_FFnF(self): + self.vrfy_tre_11n1(False, False, False) + + def test_tre_FFnx(self): + self.vrfy_tre_11n0(False, False) + + def test_tre_FFxT(self): + self.vrfy_tre_1101(False, False, True) + + def test_tre_FFxF(self): + self.vrfy_tre_1101(False, False, False) + + def test_tre_FFxx(self): + self.vrfy_tre_1100(False, False) + + def test_tre_FxTT(self): + self.vrfy_tre_1011(False, True, True) + + def test_tre_FxTF(self): + self.vrfy_tre_1011(False, True, False) + + def test_tre_FxTx(self): + self.vrfy_tre_1010(False, True) + + def test_tre_FxFT(self): + self.vrfy_tre_1011(False, False, True) + + def test_tre_FxFF(self): + self.vrfy_tre_1011(False, False, False) + + def test_tre_FxFx(self): + self.vrfy_tre_1010(False, False) + + def test_tre_FxnT(self): + self.vrfy_tre_10n1(False, True) + + def test_tre_FxnF(self): + self.vrfy_tre_10n1(False, False) + + def test_tre_Fxnx(self): + pcl_expect.timeout_raises_exception = False + self.expect_no_exception(pcl_expect.Controller(0.5)) + + def test_tre_FxxT(self): + self.vrfy_tre_1001(False, True) + + def test_tre_FxxF(self): + self.vrfy_tre_1001(False, False) + + def test_tre_Fxxx(self): + pcl_expect.timeout_raises_exception = False + env = pcl_expect.Environment() + env.set_timeout(.5) + self.expect_no_exception(env.controller()) + + def test_tre_xTTT(self): + self.vrfy_tre_0111(True, True, True) + + def test_tre_xTTF(self): + self.vrfy_tre_0111(True, True, False) + + def test_tre_xTTx(self): + self.vrfy_tre_0110(True, True) + + def test_tre_xTFT(self): + self.vrfy_tre_0111(True, False, True) + + def test_tre_xTFF(self): + self.vrfy_tre_0111(True, False, False) + + def test_tre_xTFx(self): + self.vrfy_tre_0110(True, False) + + def test_tre_xTnT(self): + self.vrfy_tre_01n1(True, True) + + def test_tre_xTnF(self): + self.vrfy_tre_01n1(True, False) + + def test_tre_xTnx(self): + pcl_expect.default_environment.set_timeout_raises_exception(True) + self.expect_exception(pcl_expect.Controller(0.5)) + + def test_tre_xTxT(self): + self.vrfy_tre_0101(True, True) + + def test_tre_xTxF(self): + self.vrfy_tre_0101(True, False) + + def test_tre_xTxx(self): + pcl_expect.default_environment.set_timeout_raises_exception(True) + env = pcl_expect.Environment() + env.set_timeout(0.5) + self.expect_exception(env.controller()) + + def test_tre_xFTT(self): + self.vrfy_tre_0111(False, True, True) + + def test_tre_xFTF(self): + self.vrfy_tre_0111(False, True, False) + + def test_tre_xFTx(self): + self.vrfy_tre_0110(False, True) + + def test_tre_xFFT(self): + self.vrfy_tre_0111(False, False, True) + + def test_tre_xFFF(self): + self.vrfy_tre_0111(False, False, False) + + def test_tre_xFFx(self): + self.vrfy_tre_0110(False, False) + + def test_tre_xFnT(self): + self.vrfy_tre_01n1(False, True) + + def test_tre_xFnF(self): + self.vrfy_tre_01n1(False, False) + + def test_tre_xFnx(self): + pcl_expect.default_environment.set_timeout_raises_exception(False) + self.expect_no_exception(pcl_expect.Controller(0.5)) + + def test_tre_xFxT(self): + self.vrfy_tre_0101(False, True) + + def test_tre_xFxF(self): + self.vrfy_tre_0101(False, False) + + def test_tre_xFxx(self): + pcl_expect.default_environment.set_timeout_raises_exception(False) + env = pcl_expect.Environment() + env.set_timeout(0.5) + self.expect_exception(env.controller()) + + def test_tre_xxTT(self): + self.vrfy_tre_0011(True, True) + + def test_tre_xxTF(self): + self.vrfy_tre_0011(True, False) + + def test_tre_xxTx(self): + env = pcl_expect.Environment() + env.set_timeout(0.5) + env.set_timeout_raises_exception(True) + self.expect_exception(env.controller()) + + def test_tre_xxFT(self): + self.vrfy_tre_0011(False, True) + + def test_tre_xxFF(self): + self.vrfy_tre_0011(False, False) + + def test_tre_xxFx(self): + env = pcl_expect.Environment() + env.set_timeout(0.5) + env.set_timeout_raises_exception(False) + self.expect_no_exception(env.controller()) + + def test_tre_xxnT(self): + ctrl = pcl_expect.Controller(timeout=0.5, + timeout_raises_exception=True) + self.expect_exception(ctrl) + + def test_tre_xxnF(self): + ctrl = pcl_expect.Controller(timeout=0.5, + timeout_raises_exception=False) + self.expect_no_exception(ctrl) + + def test_tre_xxnx(self): + self.expect_exception(pcl_expect.Controller(.5)) + + def test_tre_xxxT(self): + env = pcl_expect.Environment() + env.set_timeout(.5) + ctrl = pcl_expect.Controller(env=env, timeout_raises_exception=True) + self.expect_exception(ctrl) + + def test_tre_xxxF(self): + env = pcl_expect.Environment() + env.set_timeout(.5) + ctrl = pcl_expect.Controller(env=env, timeout_raises_exception=False) + self.expect_no_exception(ctrl) + + def test_tre_xxxx(self): + env = pcl_expect.Environment() + env.set_timeout(.5) + self.expect_exception(env.controller())