Select Git revision
fat-setup.h
check_ping_multiaddr.py 7.88 KiB
#!/usr/bin/env python2
# -*- coding: utf-8; indent-tabs-mode: nil -*-
# Copyright © 2018 Thomas Bellman, Linköping, Sweden
# Licensed under the GNU LGPL v3+; see the README file for more information.
# Explicitly assign to __doc__ to avoid doc string being optimized out.
__doc__ = """\
Check that one or more IP addresses responds to ping.
Both IPv4 and IPv6 can be checked at the same time.
"""
__version__ = '<#VERSION#>'
import sys
import re
import os
import optparse
import ipaddr
import socket
import subprocess
import trh_nagioslib
class ProgramFailure(Exception):
def __init__(self, status, msg):
Exception.__init__(self)
self.status = status
self.msg = msg
class Options(optparse.OptionParser):
def __init__(self):
global __doc__, __version__
optparse.OptionParser.__init__(
self,
usage="%prog {-4|-6} [options] -- address ...",
version=__version__,
description=__doc__)
self.add_option(
'-4', '--ipv4', action='store_true', default=False,
help=("Use IPv4 [default: %default]."
" At least one of --ipv4 and --ipv6 must be given."))
self.add_option(
'-6', '--ipv6', action='store_true', default=False,
help=("Use IPv6 [default: %default]."
" At least one of --ipv4 and --ipv6 must be given."))
self.add_option(
'-A', '--all-addresses', action='store_true', default=False,
help=("Check all addresses each hostname resolves to."
" By default, only the first address for each host is"
" checked."))
self.add_option(
'-r', '--retries', action='store', type='int', default=9,
help=("Number of ICMP ECHO retries to send [default: %default]."
" Must be in the range 1 <= RETRIES <= 20."))
self.add_option(
'-d', '--debug', action='count', default=0,
help=("Increase debug level [default: %default]."))
def get_version(self):
progname = self.get_prog_name()
pkgname = "<#PKGNAME#>"
version = self.version
vinfo = "%s (%s) version %s" % (progname, pkgname, version)
return vinfo
def check_values(self, values, args):
if len(args) < 1:
self.error("At least one IP address is required")
if not values.ipv4 and not values.ipv6:
self.error("At least one IP version must be specified (-4, -6)")
if values.retries < 1 or values.retries > 20:
self.error(
"Retries (-r) must be in range 1 <= RETRIES <= 20")
return values,args
def exit(self, status=0, msg=None):
if msg:
sys.stderr.write(msg)
# Exit with EX_USAGE, unless status==0 (which happens for --help)
raise ProgramFailure(status=(status and os.EX_USAGE), msg=msg)
OPTIONS,_ = Options().parse_args(['-A', '-6', '-4', '--', 'localhost'])
def fail(status, fmt, *args):
progname = os.path.basename(sys.argv[0] or "check_ospf_nbr")
msg = progname + ": " + fmt % args + "\n"
sys.stderr.write(msg)
raise ProgramFailure(status=status, msg=msg)
def chatter(level, fmt, *args, **kwargs):
if level <= OPTIONS.debug:
msg = fmt % (kwargs or args)
sys.stderr.write("#" + " " * level + msg + "\n")
def eai_errno_to_symbol(errno):
for symbol in dir(socket):
if symbol.startswith('EAI_') and getattr(socket, symbol) == errno:
return symbol
return
def collect_addresses(hosts, all_addresses, do_v4, do_v6):
if do_v4 and do_v6:
ipfamily = socket.AF_UNSPEC ; ipversion = None
elif do_v4:
ipfamily = socket.AF_INET ; ipversion = 4
elif do_v6:
ipfamily = socket.AF_INET6 ; ipversion = 6
else:
raise ValueError("Neither IPv4 nor IPv6 selected")
lookupflags = 0
lookupflags |= getattr(socket, 'AI_IDN', 0)
addresses = { 4: set(), 6: set() }
for host in hosts:
# Try it as a numerical IP address first
try:
addr = ipaddr.IPAddress(host, ipversion)
except ValueError:
pass
else:
addresses[addr.version].add(addr)
continue
# And if that failed, try resolving the name
try:
ipres = socket.getaddrinfo(host, None, ipfamily, 0, 0, lookupflags)
except socket.gaierror as e:
fail(os.EX_NOHOST, "%s, %s", e.strerror, host)
for ai in ipres:
ipfam = ai[0] ; ip = ai[4][0]
addr = ipaddr.IPAddress(ip)
addresses[addr.version].add(addr)
if not all_addresses:
break
return addresses
__fping_parser_re = re.compile(r"^([0-9a-f.:]+) is ([a-z]+)$")
def parse_fping_output(output, expected_addrs):
alive = set()
unreachable = set()
for line in filter(bool, output):
match = __fping_parser_re.match(line)
if not match:
continue
ip = ipaddr.IPAddress(match.group(1))
status = match.group(2)
if status == 'alive':
alive.add(ip)
elif status == 'unreachable':
unreachable.add(ip)
else:
raise RuntimeError(
"Unexpected status line from fping, " + repr(line))
not_reported = expected_addrs - alive - unreachable
unexpected = (alive | unreachable) - expected_addrs
return (alive, unreachable, not_reported, unexpected)
def ping_addresses(addresses):
fpingcmds = {
4: ['fping'],
6: ['fping6'],
}
fpingflags = [
# These settings, with default 9 retries, gives ca 5 seconds timeout
# for unreachable addresses
'-i10', # -i10 is the fastest fping allows without being root
'-t250',
'-B1.125',
'-r%d' % (OPTIONS.retries,),
]
all_output = []
for ipver,addrs in addresses.items():
if not addrs:
continue
cmd = fpingcmds[ipver] + fpingflags + map(str, addrs)
chatter(1, "Running %r", cmd)
p = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output,errors = p.communicate()
output = output.split("\n"); errors = errors.split("\n")
chatter(3, "Received output %r", output)
chatter(3, "Received errors %r", errors)
all_output += output
alive, unreachable, not_reported, unexpected = parse_fping_output(
all_output, set().union(*addresses.values()))
return (alive, unreachable, not_reported, unexpected)
def main(argv):
global OPTIONS
OPTIONS, arg_addresses = Options().parse_args(argv[1:])
addresses = collect_addresses(
arg_addresses, OPTIONS.all_addresses, OPTIONS.ipv4, OPTIONS.ipv6)
for ipver,addrs in addresses.items():
chatter(2, "IPv%d addresses: %s", ipver, " ".join(map(str, addrs)))
(alive, unreachable, not_reported, unexpected) = \
ping_addresses(addresses)
ping_statuses = {
'OK': [ '%s is alive' % (ip,) for ip in alive ],
'WARNING': [ '%s was reported despite not being pinged' % (ip,)
for ip in unexpected ],
'CRITICAL': [ '%s is unreachable' % (ip,) for ip in unreachable ],
'UNKNOWN': [ '%s has no information' % (ip,) for ip in not_reported ],
}
lvl,message = trh_nagioslib.nagios_report(ping_statuses)
sys.stdout.write(message)
return lvl
if __name__ == '__main__':
try:
code = main(sys.argv)
sys.exit(code)
except ProgramFailure as failure:
sys.exit(failure.status)
except Exception:
# An exception would normally cause Python to exit with code == 1,
# but that would be a WARNING for Nagios. Avoid that.
(exc_type, exc_value, exc_traceback) = sys.exc_info()
import traceback
traceback.print_exception(exc_type, exc_value, exc_traceback)
sys.exit(os.EX_SOFTWARE)