Source code for fasjson.web.utils.ipa
from flask import current_app, g, request
from flask_restx import abort, fields, Mask
from python_freeipa import ClientMeta
from fasjson.lib.ldap import converters, get_client
[docs]
def ldap_client():
if g.gss_creds is None or g.username is None:
abort(401)
return get_client(
current_app.config["FASJSON_LDAP_URI"],
basedn=current_app.config["FASJSON_IPA_BASEDN"],
login=g.username,
timeout=current_app.config.get("FASJSON_LDAP_TIMEOUT", 30),
)
[docs]
def rpc_client():
if g.gss_creds is None:
abort(401)
client = ClientMeta(
current_app.config["FASJSON_IPA_SERVER"],
verify_ssl=current_app.config["FASJSON_IPA_CA_CERT_PATH"],
)
client.login_kerberos()
return client
[docs]
def get_fields_from_ldap_model(ldap_model, endpoint, field_args=None):
field_args = field_args or {}
result = {}
for attr, ldap_converter in ldap_model.fields.items():
if attr in ldap_model.hidden_fields:
continue
if isinstance(ldap_converter, converters.BoolConverter):
field = fields.Boolean
elif isinstance(ldap_converter, converters.GeneralTimeConverter):
field = fields.DateTime
else:
field = fields.String
field = field(**field_args.get(attr, {}))
if ldap_converter.multivalued:
field = fields.List(field)
result[attr] = field
result["uri"] = fields.Url(endpoint, absolute=True)
return result
[docs]
def get_attrs_from_mask(model):
mask_header = current_app.config["RESTX_MASK_HEADER"]
mask = request.headers.get(mask_header)
if mask is None:
return None
return list(Mask(mask).keys())