Skip to content

Async eventbus

AsyncEventbusRabbitMQ #

Source code in amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
class AsyncEventbusRabbitMQ:
    def __init__(
        self,
        config: Config,
        loop: Optional[AbstractEventLoop] = None,
        pub_publisher_confirms: bool = True,
        rpc_client_publisher_confirms: bool = True,
        rpc_server_publisher_confirms: bool = False,
        sub_prefetch_count: int = 0,
        rpc_client_prefetch_count: int = 0,
        rpc_server_prefetch_count: int = 0,
        sub_auto_ack: bool = False,
        rpc_client_auto_ack: bool = False,
        rpc_server_auto_ack: bool = False,
    ) -> None:
        """
        Create an AsyncEventbusRabbitMQ object thats interacts with Bus
        thats provides some connection management abstractions.

        Args:
            config: the Config object
            loop: pass an event loop object
            pub_publisher_confirms: set True to allow publisher confirmations on pub connectio
            rpc_client_publisher_confirms: set True to allow publisher confirmations on rpc client connection
            rpc_server_publisher_confirms: set True to allow publisher confirmations on rpc server connection
            sub_prefetch_count: set how many messages to prefetch on sub connection
            rpc_client_prefetch_count: set how many messages to prefetch on rpc client connection
            rpc_server_prefetch_count: set how many messages to prefetch on rpc server connection
            sub_auto_ack: set to True to ack messages before processing on sub connection
            rpc_client_auto_ack: set to True to ack messages before processing on rpc client connection
            rpc_server_auto_ack: set to True to ack messages before processing on rpc server connection

        Returns:
            AsyncEventbusRabbitMQ object

        Raises:

        Examples:
            >>> async_eventbus = AsyncEventbusRabbitMQ(
                config, loop, rpc_client_publisher_confirms=True,
                rpc_server_publisher_confirms=False, rpc_server_auto_ack=False)
            ### register subscribe
            >>> def handler(*body):
                    print(f"do something with: {body}")
            >>> subscribe_event = ExampleEvent("rpc_exchange")
            >>> await eventbus.subscribe(subscribe_event, handler, "user.find")
            ### provide resource
            >>> def handler2(*body):
                    print(f"do something with: {body}")
                    return "response"
            >>> await eventbus.provide_resource("user.find2", handle2)
        """
        self._loop: AbstractEventLoop = loop
        self._signal = Signal()
        self._pub_connection = AsyncConnection(
            self._loop,
            pub_publisher_confirms,
            connection_type=ConnectionType.PUBLISH,
            signal=self._signal
        )
        self._sub_connection = AsyncConnection(
            self._loop, False,
            sub_prefetch_count,
            sub_auto_ack,
            connection_type=ConnectionType.SUBSCRIBE,
            signal=self._signal
        )
        self._rpc_client_connection = AsyncConnection(
            self._loop,
            rpc_client_publisher_confirms,
            rpc_client_prefetch_count,
            rpc_client_auto_ack,
            connection_type=ConnectionType.RPC_CLIENT,
            signal=self._signal
        )
        self._rpc_server_connection = AsyncConnection(
            self._loop,
            rpc_server_publisher_confirms,
            rpc_server_prefetch_count,
            rpc_server_auto_ack,
            connection_type=ConnectionType.RPC_SERVER,
            signal=self._signal
        )
        self.on = self._signal.on
        self.config = config.build()
        self._rpc_server_initialized = False

    async def rpc_client(
        self,
        exchange: str,
        routing_key: str,
        body: List[Any],
        content_type: str = "application/json",
        timeout: float = 5,
        connection_timeout: float = 16,
        delivery_mode: DeliveryMode = DeliveryMode.Transient,
        expiration: Optional[Union[str, None]] = None,
        **kwargs
    ) -> bytes:
        """
        Sends a publish message to queue of the bus and waits for a response

        Args:
            exchange: exchange name
            routing_key:  routing key name
            body: body that will be sent
            content_type: content type of message
            timeout: timeout in seconds for waiting for response
            connection_timeout: timeout for waiting for connection restabilishment
            delivery_mode: delivery mode
            expiration: maximum lifetime of message to stay on the queue

        Returns:
            bytes: response message

        Raises:
            AutoReconnectException: when cannout reconnect on the gived timeout
            PublishTimeoutException: if publish confirmation is setted to True and \
            does not receive confirmation on the gived timeout
            NackException: if publish confirmation is setted to True and receives a nack
            ResponseTimeoutException: if response timeout is reached
            RpcProviderException: if the rpc provider responded with an error

        Examples:
            >>> await eventbus.rpc_client("example.rpc", "user.find", [{"name": "example"}], "application/json")
        """
        async def add_rpc_client():
            return await self._rpc_client_connection.rpc_client(
                exchange,
                routing_key,
                body,
                content_type,
                timeout,
                delivery_mode,
                expiration,
                **kwargs
            )

        self._rpc_client_connection.open(self.config.url)
        return await self._rpc_client_connection.add_callback(add_rpc_client, connection_timeout)

    async def publish(
        self,
        event: IntegrationEvent,
        routing_key: str,
        body: List[Any],
        content_type: str = "application/json",
        timeout: float = 5,
        connection_timeout: float = 16,
        delivery_mode: DeliveryMode = DeliveryMode.Transient,
        expiration: Optional[Union[str, None]] = None,  # example: '60000' -> 60s
        **kwargs
    ) -> Optional[bool]:
        """
        Sends a publish message to the bus following parameters passed

        Args:
            exchange: exchange name
            routing_key:  routing key name
            body: body that will be sent
            content_type: content type of message
            timeout: timeout in seconds for waiting for response
            connection_timeout: timeout for waiting for connection restabilishment
            delivery_mode: delivery mode
            expiration: maximum lifetime of message to stay on the queue

        Returns:
            None: if publish confirmation is setted to False
            True: if successful when publish confirmation is setted to True

        Raises:
            AutoReconnectException: when cannout reconnect on the gived timeout
            PublishTimeoutException: if publish confirmation is setted to True and \
            does not receive confirmation on the gived timeout
            NackException: if publish confirmation is setted to True and receives a nack


        Examples:
            >>> publish_event = ExampleEvent("example.rpc")
            >>> await eventbus.publish(publish_event, "user.find3", ["content_message"])
        """
        async def add_publish():
            return await self._pub_connection.publish(
                event.event_type,
                routing_key,
                body,
                content_type,
                timeout,
                delivery_mode,
                expiration,
                **kwargs
            )

        self._pub_connection.open(self.config.url)
        return await self._pub_connection.add_callback(add_publish, connection_timeout)

    async def provide_resource(
        self,
        name: str,
        callback: Callable[[List[Any]], Awaitable[Union[bytes, str]]],
        response_timeout: int = None,
        connection_timeout: int = 16
    ) -> None:
        """
        Register a provider to listen on queue of bus

        Args:
            name: routing_key name
            callback: message handler
            response_timeout: timeout in seconds for waiting for process the received message
            connection_timeout: timeout for waiting for connection restabilishment

        Returns:
            None: None

        Raises:
            AutoReconnectException: when cannout reconnect on the gived timeout

        Examples:
            >>> async def handle(*body) -> Union[bytes, str]:
                    print(f"received message: {body}")
                    return b"[]"
            >>> await eventbus.provide_resource("user.find", handle)
        """
        async def add_resource():
            await self._rpc_server_connection.rpc_subscribe(
                self.config.options.rpc_queue_name,
                self.config.options.rpc_exchange_name,
                name,
                callback,
                response_timeout,
            )

        self._rpc_server_connection.open(self.config.url)
        await self._rpc_server_connection.add_callback(add_resource, connection_timeout)

    async def subscribe(
        self,
        event: IntegrationEvent,
        handler: AsyncSubscriberHandler,
        routing_key: str,
        response_timeout: int = None,
        connection_timeout: int = 16
    ) -> None:
        """
        Register a provider to listen on queue of bus

        Args:
            name: routing_key name
            callback: message handler
            response_timeout: timeout in seconds for waiting for process the received message
            connection_timeout: timeout for waiting for connection restabilishment

        Returns:
            None: None

        Raises:
            AutoReconnectException: when cannout reconnect on the gived timeout

        Examples:
            >>> async def handle(*body) -> None:
                    print(f"received message: {body}")
            >>> subscribe_event = ExampleEvent("example.rpc")
            >>> await eventbus.subscribe(subscribe_event, event_handle, "user.find3")
        """
        async def add_subscribe():
            await self._sub_connection.subscribe(
                self.config.options.queue_name,
                event.event_type,
                routing_key,
                handler.handle,
                response_timeout,
            )

        self._sub_connection.open(self.config.url)
        await self._sub_connection.add_callback(add_subscribe, connection_timeout)

    async def dispose(self, stop_event_loop=True) -> None:
        await self._pub_connection.close()
        await self._sub_connection.close()
        await self._rpc_client_connection.close()
        await self._rpc_server_connection.close()
        if stop_event_loop:
            self._loop.stop()
        self._signal.dispose()

__init__(config, loop=None, pub_publisher_confirms=True, rpc_client_publisher_confirms=True, rpc_server_publisher_confirms=False, sub_prefetch_count=0, rpc_client_prefetch_count=0, rpc_server_prefetch_count=0, sub_auto_ack=False, rpc_client_auto_ack=False, rpc_server_auto_ack=False) #

Create an AsyncEventbusRabbitMQ object thats interacts with Bus thats provides some connection management abstractions.

Parameters:

Name Type Description Default
config Config

the Config object

required
loop Optional[AbstractEventLoop]

pass an event loop object

None
pub_publisher_confirms bool

set True to allow publisher confirmations on pub connectio

True
rpc_client_publisher_confirms bool

set True to allow publisher confirmations on rpc client connection

True
rpc_server_publisher_confirms bool

set True to allow publisher confirmations on rpc server connection

False
sub_prefetch_count int

set how many messages to prefetch on sub connection

0
rpc_client_prefetch_count int

set how many messages to prefetch on rpc client connection

0
rpc_server_prefetch_count int

set how many messages to prefetch on rpc server connection

0
sub_auto_ack bool

set to True to ack messages before processing on sub connection

False
rpc_client_auto_ack bool

set to True to ack messages before processing on rpc client connection

False
rpc_server_auto_ack bool

set to True to ack messages before processing on rpc server connection

False

Returns:

Type Description
None

AsyncEventbusRabbitMQ object

Examples:

>>> async_eventbus = AsyncEventbusRabbitMQ(
    config, loop, rpc_client_publisher_confirms=True,
    rpc_server_publisher_confirms=False, rpc_server_auto_ack=False)
### register subscribe
>>> def handler(*body):
        print(f"do something with: {body}")
>>> subscribe_event = ExampleEvent("rpc_exchange")
>>> await eventbus.subscribe(subscribe_event, handler, "user.find")
### provide resource
>>> def handler2(*body):
        print(f"do something with: {body}")
        return "response"
>>> await eventbus.provide_resource("user.find2", handle2)
Source code in amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def __init__(
    self,
    config: Config,
    loop: Optional[AbstractEventLoop] = None,
    pub_publisher_confirms: bool = True,
    rpc_client_publisher_confirms: bool = True,
    rpc_server_publisher_confirms: bool = False,
    sub_prefetch_count: int = 0,
    rpc_client_prefetch_count: int = 0,
    rpc_server_prefetch_count: int = 0,
    sub_auto_ack: bool = False,
    rpc_client_auto_ack: bool = False,
    rpc_server_auto_ack: bool = False,
) -> None:
    """
    Create an AsyncEventbusRabbitMQ object thats interacts with Bus
    thats provides some connection management abstractions.

    Args:
        config: the Config object
        loop: pass an event loop object
        pub_publisher_confirms: set True to allow publisher confirmations on pub connectio
        rpc_client_publisher_confirms: set True to allow publisher confirmations on rpc client connection
        rpc_server_publisher_confirms: set True to allow publisher confirmations on rpc server connection
        sub_prefetch_count: set how many messages to prefetch on sub connection
        rpc_client_prefetch_count: set how many messages to prefetch on rpc client connection
        rpc_server_prefetch_count: set how many messages to prefetch on rpc server connection
        sub_auto_ack: set to True to ack messages before processing on sub connection
        rpc_client_auto_ack: set to True to ack messages before processing on rpc client connection
        rpc_server_auto_ack: set to True to ack messages before processing on rpc server connection

    Returns:
        AsyncEventbusRabbitMQ object

    Raises:

    Examples:
        >>> async_eventbus = AsyncEventbusRabbitMQ(
            config, loop, rpc_client_publisher_confirms=True,
            rpc_server_publisher_confirms=False, rpc_server_auto_ack=False)
        ### register subscribe
        >>> def handler(*body):
                print(f"do something with: {body}")
        >>> subscribe_event = ExampleEvent("rpc_exchange")
        >>> await eventbus.subscribe(subscribe_event, handler, "user.find")
        ### provide resource
        >>> def handler2(*body):
                print(f"do something with: {body}")
                return "response"
        >>> await eventbus.provide_resource("user.find2", handle2)
    """
    self._loop: AbstractEventLoop = loop
    self._signal = Signal()
    self._pub_connection = AsyncConnection(
        self._loop,
        pub_publisher_confirms,
        connection_type=ConnectionType.PUBLISH,
        signal=self._signal
    )
    self._sub_connection = AsyncConnection(
        self._loop, False,
        sub_prefetch_count,
        sub_auto_ack,
        connection_type=ConnectionType.SUBSCRIBE,
        signal=self._signal
    )
    self._rpc_client_connection = AsyncConnection(
        self._loop,
        rpc_client_publisher_confirms,
        rpc_client_prefetch_count,
        rpc_client_auto_ack,
        connection_type=ConnectionType.RPC_CLIENT,
        signal=self._signal
    )
    self._rpc_server_connection = AsyncConnection(
        self._loop,
        rpc_server_publisher_confirms,
        rpc_server_prefetch_count,
        rpc_server_auto_ack,
        connection_type=ConnectionType.RPC_SERVER,
        signal=self._signal
    )
    self.on = self._signal.on
    self.config = config.build()
    self._rpc_server_initialized = False

provide_resource(name, callback, response_timeout=None, connection_timeout=16) async #

Register a provider to listen on queue of bus

Parameters:

Name Type Description Default
name str

routing_key name

required
callback Callable[[List[Any]], Awaitable[Union[bytes, str]]]

message handler

required
response_timeout int

timeout in seconds for waiting for process the received message

None
connection_timeout int

timeout for waiting for connection restabilishment

16

Returns:

Name Type Description
None None

None

Raises:

Type Description
AutoReconnectException

when cannout reconnect on the gived timeout

Examples:

>>> async def handle(*body) -> Union[bytes, str]:
        print(f"received message: {body}")
        return b"[]"
>>> await eventbus.provide_resource("user.find", handle)
Source code in amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
async def provide_resource(
    self,
    name: str,
    callback: Callable[[List[Any]], Awaitable[Union[bytes, str]]],
    response_timeout: int = None,
    connection_timeout: int = 16
) -> None:
    """
    Register a provider to listen on queue of bus

    Args:
        name: routing_key name
        callback: message handler
        response_timeout: timeout in seconds for waiting for process the received message
        connection_timeout: timeout for waiting for connection restabilishment

    Returns:
        None: None

    Raises:
        AutoReconnectException: when cannout reconnect on the gived timeout

    Examples:
        >>> async def handle(*body) -> Union[bytes, str]:
                print(f"received message: {body}")
                return b"[]"
        >>> await eventbus.provide_resource("user.find", handle)
    """
    async def add_resource():
        await self._rpc_server_connection.rpc_subscribe(
            self.config.options.rpc_queue_name,
            self.config.options.rpc_exchange_name,
            name,
            callback,
            response_timeout,
        )

    self._rpc_server_connection.open(self.config.url)
    await self._rpc_server_connection.add_callback(add_resource, connection_timeout)

publish(event, routing_key, body, content_type='application/json', timeout=5, connection_timeout=16, delivery_mode=DeliveryMode.Transient, expiration=None, **kwargs) async #

Sends a publish message to the bus following parameters passed

Parameters:

Name Type Description Default
exchange

exchange name

required
routing_key str

routing key name

required
body List[Any]

body that will be sent

required
content_type str

content type of message

'application/json'
timeout float

timeout in seconds for waiting for response

5
connection_timeout float

timeout for waiting for connection restabilishment

16
delivery_mode DeliveryMode

delivery mode

DeliveryMode.Transient
expiration Optional[Union[str, None]]

maximum lifetime of message to stay on the queue

None

Returns:

Name Type Description
None Optional[bool]

if publish confirmation is setted to False

True Optional[bool]

if successful when publish confirmation is setted to True

Raises:

Type Description
AutoReconnectException

when cannout reconnect on the gived timeout

PublishTimeoutException

if publish confirmation is setted to True and does not receive confirmation on the gived timeout

NackException

if publish confirmation is setted to True and receives a nack

Examples:

>>> publish_event = ExampleEvent("example.rpc")
>>> await eventbus.publish(publish_event, "user.find3", ["content_message"])
Source code in amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
async def publish(
    self,
    event: IntegrationEvent,
    routing_key: str,
    body: List[Any],
    content_type: str = "application/json",
    timeout: float = 5,
    connection_timeout: float = 16,
    delivery_mode: DeliveryMode = DeliveryMode.Transient,
    expiration: Optional[Union[str, None]] = None,  # example: '60000' -> 60s
    **kwargs
) -> Optional[bool]:
    """
    Sends a publish message to the bus following parameters passed

    Args:
        exchange: exchange name
        routing_key:  routing key name
        body: body that will be sent
        content_type: content type of message
        timeout: timeout in seconds for waiting for response
        connection_timeout: timeout for waiting for connection restabilishment
        delivery_mode: delivery mode
        expiration: maximum lifetime of message to stay on the queue

    Returns:
        None: if publish confirmation is setted to False
        True: if successful when publish confirmation is setted to True

    Raises:
        AutoReconnectException: when cannout reconnect on the gived timeout
        PublishTimeoutException: if publish confirmation is setted to True and \
        does not receive confirmation on the gived timeout
        NackException: if publish confirmation is setted to True and receives a nack


    Examples:
        >>> publish_event = ExampleEvent("example.rpc")
        >>> await eventbus.publish(publish_event, "user.find3", ["content_message"])
    """
    async def add_publish():
        return await self._pub_connection.publish(
            event.event_type,
            routing_key,
            body,
            content_type,
            timeout,
            delivery_mode,
            expiration,
            **kwargs
        )

    self._pub_connection.open(self.config.url)
    return await self._pub_connection.add_callback(add_publish, connection_timeout)

rpc_client(exchange, routing_key, body, content_type='application/json', timeout=5, connection_timeout=16, delivery_mode=DeliveryMode.Transient, expiration=None, **kwargs) async #

Sends a publish message to queue of the bus and waits for a response

Parameters:

Name Type Description Default
exchange str

exchange name

required
routing_key str

routing key name

required
body List[Any]

body that will be sent

required
content_type str

content type of message

'application/json'
timeout float

timeout in seconds for waiting for response

5
connection_timeout float

timeout for waiting for connection restabilishment

16
delivery_mode DeliveryMode

delivery mode

DeliveryMode.Transient
expiration Optional[Union[str, None]]

maximum lifetime of message to stay on the queue

None

Returns:

Name Type Description
bytes bytes

response message

Raises:

Type Description
AutoReconnectException

when cannout reconnect on the gived timeout

PublishTimeoutException

if publish confirmation is setted to True and does not receive confirmation on the gived timeout

NackException

if publish confirmation is setted to True and receives a nack

ResponseTimeoutException

if response timeout is reached

RpcProviderException

if the rpc provider responded with an error

Examples:

>>> await eventbus.rpc_client("example.rpc", "user.find", [{"name": "example"}], "application/json")
Source code in amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
async def rpc_client(
    self,
    exchange: str,
    routing_key: str,
    body: List[Any],
    content_type: str = "application/json",
    timeout: float = 5,
    connection_timeout: float = 16,
    delivery_mode: DeliveryMode = DeliveryMode.Transient,
    expiration: Optional[Union[str, None]] = None,
    **kwargs
) -> bytes:
    """
    Sends a publish message to queue of the bus and waits for a response

    Args:
        exchange: exchange name
        routing_key:  routing key name
        body: body that will be sent
        content_type: content type of message
        timeout: timeout in seconds for waiting for response
        connection_timeout: timeout for waiting for connection restabilishment
        delivery_mode: delivery mode
        expiration: maximum lifetime of message to stay on the queue

    Returns:
        bytes: response message

    Raises:
        AutoReconnectException: when cannout reconnect on the gived timeout
        PublishTimeoutException: if publish confirmation is setted to True and \
        does not receive confirmation on the gived timeout
        NackException: if publish confirmation is setted to True and receives a nack
        ResponseTimeoutException: if response timeout is reached
        RpcProviderException: if the rpc provider responded with an error

    Examples:
        >>> await eventbus.rpc_client("example.rpc", "user.find", [{"name": "example"}], "application/json")
    """
    async def add_rpc_client():
        return await self._rpc_client_connection.rpc_client(
            exchange,
            routing_key,
            body,
            content_type,
            timeout,
            delivery_mode,
            expiration,
            **kwargs
        )

    self._rpc_client_connection.open(self.config.url)
    return await self._rpc_client_connection.add_callback(add_rpc_client, connection_timeout)

subscribe(event, handler, routing_key, response_timeout=None, connection_timeout=16) async #

Register a provider to listen on queue of bus

Parameters:

Name Type Description Default
name

routing_key name

required
callback

message handler

required
response_timeout int

timeout in seconds for waiting for process the received message

None
connection_timeout int

timeout for waiting for connection restabilishment

16

Returns:

Name Type Description
None None

None

Raises:

Type Description
AutoReconnectException

when cannout reconnect on the gived timeout

Examples:

>>> async def handle(*body) -> None:
        print(f"received message: {body}")
>>> subscribe_event = ExampleEvent("example.rpc")
>>> await eventbus.subscribe(subscribe_event, event_handle, "user.find3")
Source code in amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
async def subscribe(
    self,
    event: IntegrationEvent,
    handler: AsyncSubscriberHandler,
    routing_key: str,
    response_timeout: int = None,
    connection_timeout: int = 16
) -> None:
    """
    Register a provider to listen on queue of bus

    Args:
        name: routing_key name
        callback: message handler
        response_timeout: timeout in seconds for waiting for process the received message
        connection_timeout: timeout for waiting for connection restabilishment

    Returns:
        None: None

    Raises:
        AutoReconnectException: when cannout reconnect on the gived timeout

    Examples:
        >>> async def handle(*body) -> None:
                print(f"received message: {body}")
        >>> subscribe_event = ExampleEvent("example.rpc")
        >>> await eventbus.subscribe(subscribe_event, event_handle, "user.find3")
    """
    async def add_subscribe():
        await self._sub_connection.subscribe(
            self.config.options.queue_name,
            event.event_type,
            routing_key,
            handler.handle,
            response_timeout,
        )

    self._sub_connection.open(self.config.url)
    await self._sub_connection.add_callback(add_subscribe, connection_timeout)