Skip to content

Config

Config #

Source code in amqp_client_python/domain/models/config.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class Config:
    def __init__(
        self, options: Options, ssl_options: Optional[SSLOptions] = None
    ) -> None:
        """
        Create an Config object thats hold and mount the connection information.

        Args:
            options: hold information for estabilish connection
            ssl_options: hold information for estabilish SSL connections

        Returns:

        Raises:

        Examples:
            >>> Config(
                    Options("example", "example.rpc", "example.rpc", "amqp://admin:admin@localhost:5672/"),
                    SSLOptions("./.certs/cert.pem", "./.certs/privkey.pem", "./.certs/ca.pem")
                )
        """
        self.url = None
        self.options = options
        self.ssl_options = ssl_options

    def build(self) -> "Config":
        """
        Create an Config object thats hold and mount the connection information.

        Args:

        Returns:
            Config object

        Raises:

        Examples:
            >>> config.build()
        """
        opt = {
            **self.options.kwargs,
            "heartbeat": self.options.heartbeat,
        }
        if bool(self.ssl_options):
            protocol = "amqps"
            if self.options.port is None:
                self.options.port = 5671
            opt["ssl_options"] = {
                "keyfile": self.ssl_options.keyfile_path,
                "certfile": self.ssl_options.certfile_path,
                "ca_certs": self.ssl_options.ca_certs_path,
            }
        else:
            protocol = "amqp"
            if self.options.port is None:
                self.options.port = 5672
        if self.options.uri:
            url = "{}?{} ".format(self.options.uri, urlencode(opt))
        else:
            url = "{}://{}:{}@{}:{}{}?{} ".format(
                protocol,
                self.options.login,
                self.options.passwd,
                self.options.domain,
                self.options.port,
                self.options.vhost,
                urlencode(opt),
            )
        self.url = URLParameters(url)
        return self

__init__(options, ssl_options=None) #

Create an Config object thats hold and mount the connection information.

Parameters:

Name Type Description Default
options Options

hold information for estabilish connection

required
ssl_options Optional[SSLOptions]

hold information for estabilish SSL connections

None

Examples:

>>> Config(
        Options("example", "example.rpc", "example.rpc", "amqp://admin:admin@localhost:5672/"),
        SSLOptions("./.certs/cert.pem", "./.certs/privkey.pem", "./.certs/ca.pem")
    )
Source code in amqp_client_python/domain/models/config.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def __init__(
    self, options: Options, ssl_options: Optional[SSLOptions] = None
) -> None:
    """
    Create an Config object thats hold and mount the connection information.

    Args:
        options: hold information for estabilish connection
        ssl_options: hold information for estabilish SSL connections

    Returns:

    Raises:

    Examples:
        >>> Config(
                Options("example", "example.rpc", "example.rpc", "amqp://admin:admin@localhost:5672/"),
                SSLOptions("./.certs/cert.pem", "./.certs/privkey.pem", "./.certs/ca.pem")
            )
    """
    self.url = None
    self.options = options
    self.ssl_options = ssl_options

build() #

Create an Config object thats hold and mount the connection information.

Returns:

Type Description
Config

Config object

Examples:

>>> config.build()
Source code in amqp_client_python/domain/models/config.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def build(self) -> "Config":
    """
    Create an Config object thats hold and mount the connection information.

    Args:

    Returns:
        Config object

    Raises:

    Examples:
        >>> config.build()
    """
    opt = {
        **self.options.kwargs,
        "heartbeat": self.options.heartbeat,
    }
    if bool(self.ssl_options):
        protocol = "amqps"
        if self.options.port is None:
            self.options.port = 5671
        opt["ssl_options"] = {
            "keyfile": self.ssl_options.keyfile_path,
            "certfile": self.ssl_options.certfile_path,
            "ca_certs": self.ssl_options.ca_certs_path,
        }
    else:
        protocol = "amqp"
        if self.options.port is None:
            self.options.port = 5672
    if self.options.uri:
        url = "{}?{} ".format(self.options.uri, urlencode(opt))
    else:
        url = "{}://{}:{}@{}:{}{}?{} ".format(
            protocol,
            self.options.login,
            self.options.passwd,
            self.options.domain,
            self.options.port,
            self.options.vhost,
            urlencode(opt),
        )
    self.url = URLParameters(url)
    return self