Source code for fasjson.web.extensions.flask_ipacfg
import configparser
import operator
import random
import dns.rdatatype
import dns.resolver
from dns.exception import DNSException
from flask import current_app
[docs]
class IPAConfig:
prefix = "FASJSON_IPA"
def __init__(self, app=None):
self.app = app
if app is not None:
self.init_app(app)
[docs]
def init_app(self, app):
if "FASJSON_IPA_CONFIG_PATH" not in app.config:
app.config.setdefault("FASJSON_IPA_CONFIG_PATH", "/etc/ipa/default.conf")
if "FASJSON_IPA_CA_CERT_PATH" not in app.config:
app.config.setdefault("FASJSON_IPA_CA_CERT_PATH", "/etc/ipa/ca.crt")
try:
self._load_config(app)
except FileNotFoundError:
pass # The config will be loaded on request by _detect_ldap
app.before_request(self._detect_ldap)
def _load_config(self, app=None):
_app = app
if _app is None:
_app = current_app
if _app.config.get("FASJSON_IPA_CONFIG_LOADED", False):
return
p = configparser.ConfigParser()
with open(_app.config["FASJSON_IPA_CONFIG_PATH"]) as f:
p.read_file(f)
_app.config.setdefault("FASJSON_IPA_BASEDN", p.get("global", "basedn"))
_app.config.setdefault("FASJSON_IPA_DOMAIN", p.get("global", "domain"))
_app.config.setdefault("FASJSON_IPA_REALM", p.get("global", "realm"))
_app.config.setdefault("FASJSON_IPA_SERVER", p.get("global", "server", fallback=None))
_app.config.setdefault("FASJSON_IPA_CONFIG_LOADED", True)
def _detect_ldap(self) -> None:
# Load the config if it wasn't loaded before
self._load_config()
domain = current_app.config["FASJSON_IPA_DOMAIN"]
servers = []
try:
answers = query_srv(f"_ldap._tcp.{domain}")
except DNSException:
servers.append("ldap://" + current_app.config["FASJSON_IPA_SERVER"])
else:
for answer in answers:
server = str(answer.target).rstrip(".")
servers.append(f"ldap://{server}:{answer.port}")
current_app.config["FASJSON_LDAP_URI"] = " ".join(servers)
def _mix_weight(records):
"""Weighted population sorting for records with same priority"""
# trivial case
if len(records) <= 1:
return records
# Optimization for common case: If all weights are the same (e.g. 0),
# just shuffle the records, which is about four times faster.
if all(rr.weight == records[0].weight for rr in records):
random.shuffle(records)
return records
noweight = 0.01 # give records with 0 weight a small chance
result = []
records = set(records)
while len(records) > 1:
# Compute the sum of the weights of those RRs. Then choose a
# uniform random number between 0 and the sum computed (inclusive).
urn = random.uniform(0, sum(rr.weight or noweight for rr in records)) # noqa: S311
# Select the RR whose running sum value is the first in the selected
# order which is greater than or equal to the random number selected.
acc = 0.0
for rr in records.copy():
acc += rr.weight or noweight
if acc >= urn:
records.remove(rr)
result.append(rr)
# randomness makes it hard to check for coverage in these next 2 lines
if records: # pragma: no cover
result.append(records.pop()) # pragma: no cover
return result
[docs]
def sort_prio_weight(records):
"""RFC 2782 sorting algorithm for SRV and URI records
RFC 2782 defines a sorting algorithms for SRV records, that is also used
for sorting URI records. Records are sorted by priority and than randomly
shuffled according to weight.
This implementation also removes duplicate entries.
"""
# order records by priority
records = sorted(records, key=operator.attrgetter("priority"))
# remove duplicate entries
uniquerecords = []
seen = set()
for rr in records:
# A SRV record has target and port, URI just has target.
target = (rr.target, getattr(rr, "port", None))
if target not in seen:
uniquerecords.append(rr)
seen.add(target)
# weighted randomization of entries with same priority
result = []
sameprio = []
for rr in uniquerecords:
# add all items with same priority in a bucket
if not sameprio or sameprio[0].priority == rr.priority:
sameprio.append(rr)
else:
# got different priority, shuffle bucket
result.extend(_mix_weight(sameprio))
# start a new priority list
sameprio = [rr]
# add last batch of records with same priority
if sameprio:
result.extend(_mix_weight(sameprio))
return result
[docs]
def query_srv(qname, resolver=None, **kwargs):
"""Query SRV records and sort reply according to RFC 2782
:param qname: query name, _service._proto.domain.
:return: list of dns.rdtypes.IN.SRV.SRV instances
"""
if resolver is None:
resolver = dns.resolver
answer = resolver.resolve(qname, rdtype=dns.rdatatype.SRV, **kwargs)
return sort_prio_weight(answer)