Skip to content
Snippets Groups Projects
Select Git revision
  • d4847b52087253446ccf9a15d3e6e40e77c5950f
  • master default
  • wip-slh-dsa-sha2-128s
  • master-updates
  • release-3.10-fixes
  • getopt-prototype
  • fix-bcrypt-warning
  • refactor-hmac
  • wip-use-alignas
  • trim-sha3-context
  • fix-gitlab-ci
  • check-fat-emulate
  • delete-digest_func-size
  • slh-dsa-shake-128f-nettle
  • slh-dsa-shake-128s-nettle
  • slh-dsa-shake-128s
  • delete-openpgp
  • ppc64-sha512
  • delete-md5-compat
  • cleanup-hmac-tests
  • ppc64-sha256
  • nettle_3.10.2_release_20250626
  • nettle_3.10.1_release_20241230
  • nettle_3.10_release_20240616
  • nettle_3.10rc2
  • nettle_3.10rc1
  • nettle_3.9.1_release_20230601
  • nettle_3.9_release_20230514
  • nettle_3.8.1_release_20220727
  • nettle_3.8_release_20220602
  • nettle_3.7.3_release_20210606
  • nettle_3.7.2_release_20210321
  • nettle_3.7.1_release_20210217
  • nettle_3.7_release_20210104
  • nettle_3.7rc1
  • nettle_3.6_release_20200429
  • nettle_3.6rc3
  • nettle_3.6rc2
  • nettle_3.6rc1
  • nettle_3.5.1_release_20190627
  • nettle_3.5_release_20190626
41 results

sha256-meta.c

Blame
  • check_ping_multiaddr.py 7.88 KiB
    #!/usr/bin/env python2
    # -*- coding: utf-8; indent-tabs-mode: nil -*-
    
    # Copyright © 2018   Thomas Bellman, Linköping, Sweden
    # Licensed under the GNU LGPL v3+; see the README file for more information.
    
    
    # Explicitly assign to __doc__ to avoid doc string being optimized out.
    __doc__ = """\
    Check that one or more IP addresses responds to ping.
    
    Both IPv4 and IPv6 can be checked at the same time.
    """
    
    __version__ = '<#VERSION#>'
    
    
    import sys
    import re
    import os
    import optparse
    import ipaddr
    import socket
    import subprocess
    
    import trh_nagioslib
    
    
    
    class ProgramFailure(Exception):
        def __init__(self, status, msg):
            Exception.__init__(self)
            self.status = status
            self.msg = msg
    
    
    class Options(optparse.OptionParser):
    
        def __init__(self):
            global __doc__, __version__
            optparse.OptionParser.__init__(
                self,
                usage="%prog {-4|-6} [options] -- address ...",
                version=__version__,
                description=__doc__)
            self.add_option(
                '-4', '--ipv4', action='store_true', default=False,
                help=("Use IPv4 [default: %default]."
                      " At least one of --ipv4 and --ipv6 must be given."))
            self.add_option(
                '-6', '--ipv6', action='store_true', default=False,
                help=("Use IPv6 [default: %default]."
                      " At least one of --ipv4 and --ipv6 must be given."))
            self.add_option(
                '-A', '--all-addresses', action='store_true', default=False,
                help=("Check all addresses each hostname resolves to."
                      " By default, only the first address for each host is"
                      " checked."))
            self.add_option(
                '-r', '--retries', action='store', type='int', default=9,
                help=("Number of ICMP ECHO retries to send [default: %default]."
                      " Must be in the range 1 <= RETRIES <= 20."))
            self.add_option(
                '-d', '--debug', action='count', default=0,
                help=("Increase debug level [default: %default]."))
    
        def get_version(self):
            progname = self.get_prog_name()
            pkgname = "<#PKGNAME#>"
            version = self.version
            vinfo = "%s (%s) version %s" % (progname, pkgname, version)
            return vinfo
    
        def check_values(self, values, args):
            if len(args) < 1:
                self.error("At least one IP address is required")
            if not values.ipv4 and not values.ipv6:
                self.error("At least one IP version must be specified (-4, -6)")
            if values.retries < 1 or values.retries > 20:
                self.error(
                    "Retries (-r) must be in range 1 <= RETRIES <= 20")
            return values,args
    
        def exit(self, status=0, msg=None):
            if msg:
                sys.stderr.write(msg)
            # Exit with EX_USAGE, unless status==0 (which happens for --help)
            raise ProgramFailure(status=(status and os.EX_USAGE), msg=msg)
    
    
    OPTIONS,_ = Options().parse_args(['-A', '-6', '-4', '--', 'localhost'])
    
    
    def fail(status, fmt, *args):
        progname = os.path.basename(sys.argv[0] or "check_ospf_nbr")
        msg = progname + ": " + fmt % args + "\n"
        sys.stderr.write(msg)
        raise ProgramFailure(status=status, msg=msg)
    
    
    def chatter(level, fmt, *args, **kwargs):
        if level <= OPTIONS.debug:
            msg = fmt % (kwargs or args)
            sys.stderr.write("#" + "  " * level + msg + "\n")
    
    
    def eai_errno_to_symbol(errno):
        for symbol in dir(socket):
            if symbol.startswith('EAI_') and getattr(socket, symbol) == errno:
                return symbol
        return
    
    
    
    def collect_addresses(hosts, all_addresses, do_v4, do_v6):
        if do_v4 and do_v6:
            ipfamily = socket.AF_UNSPEC ;  ipversion = None
        elif do_v4:
            ipfamily = socket.AF_INET ;    ipversion = 4
        elif do_v6:
            ipfamily = socket.AF_INET6 ;   ipversion = 6
        else:
            raise ValueError("Neither IPv4 nor IPv6 selected")
    
        lookupflags = 0
        lookupflags |= getattr(socket, 'AI_IDN', 0)
    
        addresses = { 4: set(), 6: set() }
        for host in hosts:
            # Try it as a numerical IP address first
            try:
                addr = ipaddr.IPAddress(host, ipversion)
            except ValueError:
                pass
            else:
                addresses[addr.version].add(addr)
                continue
    
            # And if that failed, try resolving the name
            try:
                ipres = socket.getaddrinfo(host, None, ipfamily, 0, 0, lookupflags)
            except socket.gaierror as e:
                fail(os.EX_NOHOST, "%s, %s", e.strerror, host)
            for ai in ipres:
                ipfam = ai[0] ;  ip = ai[4][0]
                addr = ipaddr.IPAddress(ip)
                addresses[addr.version].add(addr)
                if not all_addresses:
                    break
    
        return addresses
    
    
    
    __fping_parser_re = re.compile(r"^([0-9a-f.:]+) is ([a-z]+)$")
    
    def parse_fping_output(output, expected_addrs):
        alive = set()
        unreachable = set()
        for line in filter(bool, output):
            match = __fping_parser_re.match(line)
            if not match:
                continue
            ip = ipaddr.IPAddress(match.group(1))
            status = match.group(2)
            if status == 'alive':
                alive.add(ip)
            elif status == 'unreachable':
                unreachable.add(ip)
            else:
                raise RuntimeError(
                    "Unexpected status line from fping, " + repr(line))
        not_reported = expected_addrs - alive - unreachable
        unexpected = (alive | unreachable) - expected_addrs
    
        return (alive, unreachable, not_reported, unexpected)
    
    
    def ping_addresses(addresses):
        fpingcmds = {
            4: ['fping'],
            6: ['fping6'],
        }
        fpingflags = [
            # These settings, with default 9 retries, gives ca 5 seconds timeout
            # for unreachable addresses
            '-i10',         # -i10 is the fastest fping allows without being root
            '-t250',
            '-B1.125',
            '-r%d' % (OPTIONS.retries,),
        ]
        all_output = []
        for ipver,addrs in addresses.items():
            if not addrs:
                continue
            cmd = fpingcmds[ipver] + fpingflags + map(str, addrs)
            chatter(1, "Running %r", cmd)
            p = subprocess.Popen(
                cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            output,errors = p.communicate()
            output = output.split("\n"); errors = errors.split("\n")
            chatter(3, "Received output %r", output)
            chatter(3, "Received errors %r", errors)
            all_output += output
    
        alive, unreachable, not_reported, unexpected = parse_fping_output(
            all_output, set().union(*addresses.values()))
    
        return (alive, unreachable, not_reported, unexpected)
    
    
    
    def main(argv):
        global OPTIONS
        OPTIONS, arg_addresses = Options().parse_args(argv[1:])
    
        addresses = collect_addresses(
            arg_addresses, OPTIONS.all_addresses, OPTIONS.ipv4, OPTIONS.ipv6)
    
        for ipver,addrs in addresses.items():
            chatter(2, "IPv%d addresses:   %s", ipver, "  ".join(map(str, addrs)))
    
        (alive, unreachable, not_reported, unexpected) = \
            ping_addresses(addresses)
    
        ping_statuses = {
            'OK':       [ '%s is alive' % (ip,) for ip in alive ],
            'WARNING':  [ '%s was reported despite not being pinged' % (ip,)
                          for ip in unexpected ],
            'CRITICAL': [ '%s is unreachable' % (ip,) for ip in unreachable ],
            'UNKNOWN':  [ '%s has no information' % (ip,) for ip in not_reported ],
        }
        lvl,message = trh_nagioslib.nagios_report(ping_statuses)
        sys.stdout.write(message)
        return lvl
    
    
    
    if __name__ == '__main__':
        try:
            code = main(sys.argv)
            sys.exit(code)
        except ProgramFailure as failure:
            sys.exit(failure.status)
        except Exception:
            # An exception would normally cause Python to exit with code == 1,
            # but that would be a WARNING for Nagios.  Avoid that.
            (exc_type, exc_value, exc_traceback) = sys.exc_info()
            import traceback
            traceback.print_exception(exc_type, exc_value, exc_traceback)
            sys.exit(os.EX_SOFTWARE)