Source code for fasjson.tests.unit.test_web_extension_ipacfg

import os
from types import SimpleNamespace
from unittest import mock

import dns
import dns.rdtypes.IN.SRV
import pytest

from fasjson.web.extensions.flask_ipacfg import (
    _mix_weight,
    IPAConfig,
    query_srv,
    sort_prio_weight,
)


TEST_IPACFG = """
[global]
basedn = dc=testing
realm = TESTING
domain = testing
server = ipa.testing
host = fasjson.testing
xmlrpc_uri = https://ipa.testing/ipa/xml
enable_ra = True
"""


[docs] @pytest.fixture def app_with_filtered_config(app, mocker): old_config = dict(app.config) mocker.patch.dict(app.config, clear=True) for key, value in old_config.items(): if key.startswith("FASJSON_"): continue app.config[key] = value yield app app.config = old_config
def _make_dns_answer(records): qname = "_ldap._tcp.example.com" mocked_records = dns.rrset.from_text_list( name=qname, rdclass="IN", rdtype="SRV", ttl=3600, text_rdatas=[ " ".join( [ str(record.get("priority", 10)), str(record.get("weight", 10)), str(record.get("port", 389)), record["name"], ] ) for record in records ], ) mocked_answer = dns.resolver.Answer( qname, dns.rdatatype.SRV, rdclass=dns.rdataclass.IN, response=SimpleNamespace( answer=dns.message.ANSWER, resolve_chaining=lambda: SimpleNamespace( canonical_name=qname, answer=mocked_records, minimum_ttl=3600 ), ), ) return mocked_answer
[docs] def test_ipacfg_delayed_init(mocker): init_app = mocker.patch.object(IPAConfig, "init_app") IPAConfig(None) init_app.assert_not_called()
[docs] def test_ipacfg_default_paths(app_with_filtered_config): app = app_with_filtered_config IPAConfig(app) with app.test_request_context("/v1/"): try: app.preprocess_request() except FileNotFoundError: # We may be running the testsuite on a host that does not have the IPA # config files. It's fine, ignore it. if os.path.exists("/etc/ipa/default.conf"): raise assert app.config["FASJSON_IPA_CONFIG_PATH"] == "/etc/ipa/default.conf" assert app.config["FASJSON_IPA_CA_CERT_PATH"] == "/etc/ipa/ca.crt"
[docs] def test_ipacfg_delayed_load(tmpdir, app_with_filtered_config): app = app_with_filtered_config config_path = os.path.join(tmpdir, "ipa.cfg") app.config["FASJSON_IPA_CONFIG_PATH"] = config_path IPAConfig(app) assert "FASJSON_IPA_CONFIG_LOADED" not in app.config with app.test_request_context("/v1/"): with open(config_path, "w") as ipacfg_file: ipacfg_file.write(TEST_IPACFG) app.preprocess_request() assert app.config["FASJSON_IPA_CONFIG_LOADED"] is True assert app.config["FASJSON_IPA_BASEDN"] == "dc=testing" assert app.config["FASJSON_IPA_DOMAIN"] == "testing"
[docs] def test_default_app(app): with app.test_request_context("/v1/"): IPAConfig(app)._load_config()
# This should not crash
[docs] def test_already_loaded(mocker, app): with app.test_request_context("/v1/"): app.preprocess_request() assert app.config["FASJSON_IPA_CONFIG_LOADED"] is True configparser = mocker.patch("fasjson.web.extensions.flask_ipacfg.configparser") IPAConfig(app)._load_config() configparser.ConfigParser.assert_not_called()
[docs] def test_detect_dns(mocker, app): ext = IPAConfig(app) mocker.patch( "fasjson.web.extensions.flask_ipacfg.query_srv", return_value=[ SimpleNamespace(target="ldap1", port=389), SimpleNamespace(target="ldap2", port=389), ], ) with app.test_request_context("/v1/"): ext._detect_ldap() expected = "ldap://ldap1:389 ldap://ldap2:389" assert app.config["FASJSON_LDAP_URI"] == expected
[docs] def test_dns_query(): resolver = mock.Mock() resolver.resolve.return_value = _make_dns_answer( [ dict(name="ldap1", priority=30), dict(name="ldap2", priority=20, weight=2), dict(name="ldap3", priority=20, weight=1), dict(name="ldap4", priority=10), ] ) result = query_srv("_ldap._tcp.example.com", resolver) result_names = [str(r.target) for r in result] assert result_names[0] == "ldap4" assert result_names[3] == "ldap1" assert result_names[1:3] in [["ldap2", "ldap3"], ["ldap3", "ldap2"]]
[docs] def test_dns_query_same_prio_same_weight(): names = ["ldap1", "ldap2", "ldap3"] resolver = mock.Mock() resolver.resolve.return_value = _make_dns_answer([{"name": name} for name in names]) result = query_srv("_ldap._tcp.example.com", resolver) result_names = [str(r.target) for r in result] assert set(result_names) == set(names)
[docs] def test_dns_query_no_record(): resolver = mock.Mock() resolver.resolve.return_value = _make_dns_answer([]) result = query_srv("_ldap._tcp.example.com", resolver) assert len(result) == 0
[docs] def test_dns_query_duplicates(): result = sort_prio_weight( [ SimpleNamespace(priority=1, target="ldap1"), SimpleNamespace(priority=1, target="ldap1"), ] ) assert len(result) == 1
[docs] def test_mix_weight(): total = 100 records = _make_dns_answer([{"name": f"ldap-{idx}", "weight": idx} for idx in range(total)]) result = _mix_weight(records) result_names = [str(r.target) for r in result] # Uh, I don't really know how to check for weighted randomness. # Let's check it has been shuffled assert result_names != [f"ldap-{idx}" for idx in range(total)] # And we didn't drop any item assert len(result_names) == total