Source code for pyhpecw7.features.ping

"""Ping another device from HPCOM7 devices.
"""
from lxml import etree
from pyhpecw7.features.errors import InvalidIPAddress
from pyhpecw7.utils.validate import valid_ip_network
from pyhpecw7.utils.xml.lib import *


[docs]class Ping(object): """Ping another device from an HPCOM7 device. Args: device (HPCOM7): connected instance of a ``pyhpecw7.comware.HPCOM7`` object. host (str): IP address or name to ping from the switch vrf (vrf): source VRF on the switch the ping will come from v6 (bool): set to true if dest is v6 target detail (bool): set to true if you want to see per ping (ICMP echo request) response details Note: If an IPv6 address is provided for ``host``, there is no need to set ``v6`` to ``True``, but it doesn't hurt either way. If a name is used for ``host`` and that resolves to a v6 address, then ``v6`` must be set to ``True``. Attributes: device (HPCOM7): connected instance of a ``pyhpecw7.comware.HPCOM7`` object. host (str): IP address or name to ping from the switch vrf (vrf): source VRF on the switch the ping will come from v6 (bool): set to true if dest is v6 target detail (bool): set to true if you want to see per ping (ICMP echo request) response details response (dict): see ``_ping`` """ def __init__(self, device, host, vrf='', v6=False, detail=False): self.device = device self.host = host self.vrf = vrf or '' self.v6 = v6 self.detail = detail # list of XML tags used to build proper objects self.v4tags = ['Ping', 'IPv4Ping', 'PingTest'] self.v6tags = ['Ping', 'IPv6Ping', 'PingTest'] self.response = self._ping() def _ping(self): """Builds XML object for VLAN configuration and sends to staging Returns: It returns a dictionary with the following k/v pairs: :payload_length (str): bytes of payload :max (str): represents max response time in milliseconds :min (str): represents min response time in milliseconds :avg (str): represents avg response time in milliseconds :packets_tx (str): number of packets sent :packets_rx (str): number of packets received :host (str): target host being "pinged" :loss_rate (str): percent of which pings are dropped :detailed_response (list): list of dicts that provides detail for each icmp request including icmp seq # and reply time. """ if '.' in self.host or ':' in self.host: self.param_check(host=self.host) E = action_element_maker() if ':' in self.host or self.v6 is True: top = E.top( E.Ping( E.IPv6Ping( E.PingTest( E.Host(self.host), E.VRF(self.vrf) ) ) ) ) else: top = E.top( E.Ping( E.IPv4Ping( E.PingTest( E.Host(self.host), E.VRF(self.vrf) ) ) ) ) rsp = self.device.action(top) return self._build_response(rsp) def _build_response(self, response): """Builds dictionary from XML response coming from device """ as_string = response.xml as_xml = etree.fromstring(as_string.encode('ascii')) key_map = { 'payload_length': 'PayloadLength', 'packets_tx': 'TotalTransmitPacket', 'packets_rx': 'TotalReceivePacket', 'loss_rate': 'LossRate', 'min': 'MinReplyTime', 'max': 'MaxReplyTime', 'avg': 'AvgReplyTime', 'host': 'Host', 'source': 'SrcAddr' } def _get_time(time): return str(int(find_in_action('ReplyTime', time).text) // 1000) ping_response = {} for new_key, xml_tag in key_map.items(): value = '' if new_key in ['min', 'avg', 'max']: value = _get_time(as_xml) else: get_obj = find_in_action(xml_tag, as_xml) if get_obj is not None: value = get_obj.text if value: ping_response[new_key] = value if self.detail: five_replies = findall_in_action('EchoReply', as_xml) replies = [] for reply in five_replies: icmp_seq = find_in_action('IcmpSequence', reply).text reply_time = _get_time(reply) temp = dict(icmp_seq=icmp_seq, reply_time=reply_time) replies.append(temp) ping_response['detailed_response'] = replies return ping_response
[docs] def param_check(self, **kvargs): """Basic param validation for v4 and v6 addresses """ host = kvargs.get('host') if host: if not valid_ip_network(host): raise InvalidIPAddress(host)