Source code for fasjson.web.resources.certs

from flask import current_app
from flask_restx import fields, reqparse, Resource
from python_freeipa.exceptions import BadRequest

from fasjson.web.utils.ipa import rpc_client

from .base import Namespace


api_v1 = Namespace("certs", description="Certificates related operations")


[docs] class Base64Dict(fields.String): """IPA returns this weird structure in certificate chains"""
[docs] def output(self, key, obj, ordered=False, **kwargs): return super().output("__base64__", obj, ordered=ordered, **kwargs)
CertModel = api_v1.model( "Cert", { "cacn": fields.String(), "certificate": fields.String(), "certificate_chain": fields.List(Base64Dict()), "issuer": fields.String(), "revoked": fields.Boolean(), "san_other": fields.List(fields.String()), "san_other_kpn": fields.List(fields.String()), "san_other_upn": fields.List(fields.String()), "serial_number": fields.Integer(), "serial_number_hex": fields.String(), "sha1_fingerprint": fields.String(), "sha256_fingerprint": fields.String(), "subject": fields.String(), "valid_not_after": fields.DateTime(dt_format="rfc822"), "valid_not_before": fields.DateTime(dt_format="rfc822"), "uri": fields.Url("v1.certs_cert", absolute=True), }, ) create_request_parser = reqparse.RequestParser() create_request_parser.add_argument("user", required=True, help="User name.") create_request_parser.add_argument("csr", required=True, help="Certificate Signing Request.") create_request_parser.add_argument("profile", required=False, help="Certificate Profile.")
[docs] @api_v1.route("/") @api_v1.response(400, "The CSR could not be signed") class Certs(Resource):
[docs] @api_v1.doc("sign_csr") @api_v1.expect(create_request_parser) @api_v1.marshal_with(CertModel) def post(self): """Send a CSR and get a signed certificate in return""" args = create_request_parser.parse_args() client = rpc_client() profile = args["profile"] or current_app.config["CERTIFICATE_PROFILE"] result = client.cert_request(args["csr"], o_principal=args["user"], o_profile_id=profile) return result["result"]
[docs] @api_v1.route("/<int:serial_number>/") @api_v1.param("serial_number", "The certificate's serial number") @api_v1.response(404, "Certificate not found") class Cert(Resource):
[docs] @api_v1.doc("get_cert") @api_v1.marshal_with(CertModel) def get(self, serial_number): """Fetch a certificate given its serial number Certificates are also present on users' results, but this method gives more details. """ client = rpc_client() try: result = client.cert_show(serial_number) except BadRequest as e: if e.code == 4301: api_v1.abort( 404, "Certificate not found", serial_number=serial_number, server_message=e.message, ) else: raise return result["result"]