# Copyright (c) 2013, Citrix Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os.path
import subprocess
import re
import six
from .compat import open_with_codec_handling
_SBDF = (r"(?:(?P<segment> [\da-dA-F]{4}):)?" # Segment (optional)
r" (?P<bus> [\da-fA-F]{2}):" # Bus
r" (?P<device> [\da-fA-F]{2})\." # Device
r" (?P<function>[\da-fA-F])" # Function
)
# Don't change the meaning of VALID_SBDF as some parties may be using it
VALID_SBDF = re.compile(r"^%s$" % _SBDF, re.X)
VALID_SBDFI = re.compile(
r"^(?P<sbdf>%s)"
r" (?:[\[](?P<index>[\d]{1,2})[\]])?$" # Index (optional)
% _SBDF
, re.X)
[docs]
class PCI(object):
"""PCI address object for manipulation and comparison"""
[docs]
@classmethod
def is_valid(cls, addr):
"""
Static method to assertain whether addr is a recognised PCI address
or not
"""
try:
PCI(addr)
except Exception:
return False
return True
def __init__(self, addr):
"""Constructor"""
self.integer = -1
self.segment = -1
self.bus = -1
self.device = -1
self.function = -1
self.index = -1
if not isinstance(addr, six.string_types):
raise TypeError("String expected")
res = VALID_SBDFI.match(addr)
if res:
groups = res.groupdict()
if "segment" in groups and groups["segment"] is not None:
self.segment = int(groups["segment"], 16)
else:
self.segment = 0
self.bus = int(groups["bus"], 16)
if not 0 <= self.bus < 2**8:
raise ValueError("Bus '%d' out of range 0 <= bus < 256"
% (self.bus,))
self.device = int(groups["device"], 16)
if not 0 <= self.device < 2**5:
raise ValueError("Device '%d' out of range 0 <= device < 32"
% (self.device,))
self.function = int(groups["function"], 16)
if not 0 <= self.function < 2**3:
raise ValueError("Function '%d' out of range 0 <= device "
"< 8" % (self.function,))
if "index" in groups and groups["index"] is not None:
self.index = int(groups["index"])
else:
self.index = 0
self.integer = (int(self.segment << 16 |
self.bus << 8 |
self.device << 3 |
self.function) << 8 |
self.index)
return
raise ValueError("Unrecognised PCI address '%s'" % addr)
def __str__(self):
pci_sbdf = "%04x:%02x:%02x.%1x" % (self.segment, self.bus,
self.device, self.function)
return "%s[%d]" % (pci_sbdf, self.index)
def __repr__(self):
return "<PCI %s>" % (self,)
def __eq__(self, rhs):
if hasattr(rhs, "integer"):
return self.integer == rhs.integer
else:
try:
return self.integer == PCI(rhs).integer
except Exception:
return NotImplemented
def __ne__(self, rhs):
if hasattr(rhs, "integer"):
return self.integer != rhs.integer
else:
try:
return self.integer != PCI(rhs).integer
except Exception:
return NotImplemented
def __hash__(self):
return self.__str__().__hash__()
def __lt__(self, rhs):
if hasattr(rhs, "integer"):
return self.integer < rhs.integer
else:
try:
return self.integer < PCI(rhs).integer
except Exception:
return NotImplemented
def __le__(self, rhs):
if hasattr(rhs, "integer"):
return self.integer <= rhs.integer
else:
try:
return self.integer <= PCI(rhs).integer
except Exception:
return NotImplemented
def __gt__(self, rhs):
if hasattr(rhs, "integer"):
return self.integer > rhs.integer
else:
try:
return self.integer > PCI(rhs).integer
except Exception:
return NotImplemented
def __ge__(self, rhs):
if hasattr(rhs, "integer"):
return self.integer >= rhs.integer
else:
try:
return self.integer >= PCI(rhs).integer
except Exception:
return NotImplemented
[docs]
class PCIIds(object):
def __init__(self, fn):
self.vendor_dict = {}
self.sub_dict = {}
self.main_dict = {}
self.class_dict = {}
vendor = None
cls = None
cls_text = None
fh = open_with_codec_handling(fn, encoding="utf-8")
for l in fh:
line = l.rstrip()
if line == '' or line.startswith('#'):
continue
if line.startswith('C'):
# Class
vendor = None
_, cls, cls_text = line.split(None, 2)
if cls not in self.class_dict:
self.class_dict[cls] = (cls_text, None)
elif line.startswith('\t\t'):
if vendor:
# subvendor, subdevice
subvendor, subdevice, text = line.split(None, 2)
key = "%s:%s" % (subvendor, subdevice)
if key not in self.sub_dict:
self.sub_dict[key] = text
elif line.startswith('\t'):
if vendor:
# device
device, text = line.split(None, 1)
key = "%s:%s" % (vendor, device)
if key not in self.main_dict:
self.main_dict[key] = text
else:
# subclass
sub_cls, sub_text = line.split(None, 1)
key = "%s:%s" % (cls, sub_cls)
if key not in self.class_dict:
self.class_dict[key] = (cls_text, sub_text)
else:
# vendor
cls = None
vendor, text = line.split(None, 1)
if vendor not in self.vendor_dict:
self.vendor_dict[vendor] = text
fh.close()
[docs]
@classmethod
def read(cls):
for f in ['/usr/share/hwdata/pci.ids']:
if os.path.exists(f):
return cls(f)
# pylint: disable-next=broad-exception-raised
raise Exception('Failed to open PCI database')
[docs]
def findVendor(self, vendor):
return vendor in self.vendor_dict and self.vendor_dict[vendor] or None
[docs]
def findDevice(self, vendor, device):
key = "%s:%s" % (vendor, device)
return key in self.main_dict and self.main_dict[key] or None
[docs]
def findSubdevice(self, subvendor, subdevice):
key = "%s:%s" % (subvendor, subdevice)
return key in self.sub_dict and self.sub_dict[key] or None
[docs]
def lookupClass(self, cls_str):
ret = []
for k, (c, sc) in self.class_dict.items():
if not sc and cls_str in c and k not in ret:
ret.append(k)
return ret
[docs]
class PCIDevices(object):
def __init__(self):
self.devs = {}
cmd = subprocess.Popen(
["lspci", "-mn"],
bufsize=1,
stdout=subprocess.PIPE,
universal_newlines=True,
)
assert cmd.stdout
for l in cmd.stdout:
line = l.rstrip()
el = [x for x in line.replace('"', '').split() if not x.startswith('-')]
self.devs[el[0]] = {'id': el[0],
'class': el[1][:2],
'subclass': el[1][2:],
'vendor': el[2],
'device': el[3]}
if len(el) == 6:
self.devs[el[0]]['subvendor'] = el[4]
self.devs[el[0]]['subdevice'] = el[5]
cmd.stdout.close() # with subprocess.Popen() is not supported in Python 2
cmd.wait()
[docs]
def findByClass(self, cls, subcls = None):
# type: (str|list[str], str|None) -> list[dict[str, str]]
""" return all devices that match either of:
class, subclass
[class1, class2, ... classN]"""
if subcls:
assert isinstance(cls, str)
return [x for x in self.devs.values() if x['class'] == cls and x['subclass'] == subcls]
else:
assert isinstance(cls, list)
return [x for x in self.devs.values() if x['class'] in cls]
[docs]
def pci_sbdfi_to_nic(sbdfi, nics): # sourcery skip: raise-specific-error
"""
Return NIC for a given PCI SBDF identifier(optionally with a MAC address index)
Explanation of the PCI SBDF MAC address index implemented here:
Support the use of an index after PCI bus location
Some NICs expose multiple ethernet devices for a single PCI location, meaning
there is confusion about the physical names and no way to distinguish between
them other than MAC address. Auditing a large environment to obtain MAC
addresses to feed into the host installer (e.g. using map_netdev) is not
viable, therefore this implements an index (ordered by MAC) to the PCI
bus location to uniquely identify a NIC:
- Match SBDF[index] input against a regular expression pattern to extract the
SBDF identifier and the optional MAC index(the default value of the index is 0)
- Filter the list of NICs to find those with a matching PCI SBDF identifier.
- Sort matching NICs by MAC address (in case the passed MAC index is > 0)
The test tests/test_sbdfi_to_nic.py gives an easy overview how it is used.
@param sbdfi: PCI SBDF identifier of the NIC, Optionally with an index
@param nics: List of NIC objects
@returns: NIC object at the given MAC index
@raises: Exception if SBDF[index] is not found
"""
match = VALID_SBDFI.match(sbdfi)
if not match:
raise AttributeError("'NoneType' object has no attribute 'groupdict'")
assert match
index = 0
if 'index' in match.groupdict():
index_str = match.group("index")
if index_str is not None:
index = int(index_str)
value = match.group("sbdf")
matching_nics = [nic for nic in nics if nic.pci == value]
matching_nics.sort(key=lambda nic: nic.mac)
if index >= len(matching_nics):
# pylint: disable-next=broad-exception-raised
raise Exception(
"Insufficient NICs with PCI SBDF %s (Found %d, wanted at least %d)"
% (value, len(matching_nics), index)
)
return matching_nics[index]