Skip to content

Rabbit Publishing

RabbitBroker also uses the unified publish method to send messages. However, in this case, an object of the aio_pika.Message class (if necessary) can act as a message (in addition to str, bytes, dict, pydantic.BaseModel).

import asyncio
from propan import RabbitBroker

async def pub():
    async with RabbitBroker() as broker:
        await broker.publish("Hi!", queue="test", exchange="test")

asyncio.run(pub())

Basic arguments

The publish method takes the following arguments:

  • message: bytes | str | dict | Sequence[Any] | pydantic.BaseModel | aio_pika.Message = "" - message to send
  • exchange: str | RabbitExchange | None = None - the exchange where the message will be sent to. If not specified - default is used
  • queue: str | RabbitQueue = "" - the queue where the message will be sent (since most queues use their name as the routing key, this is a human-readable version of routing_key)
  • routing_key: str = "" - also a message routing key, if not specified, the queue argument is used

Message parameters

You can read more about all the flags in the RabbitMQ documentation

  • headers: dict[str, Any] | None = None - message headers (used by consumers)
  • content_type: str | None = None - the content_type of the message being sent. Propan sets it automatically in most cases (used by consumers)
  • content_encoding: str | None = None - encoding of the message (used by consumers)
  • persist: bool = False - restore messages on reboot RabbitMQ
  • priority: int | None = None - the priority of the message
  • correlation_id: str | None = None - message id, which helps to match the original message with the reply to it (Propan sets it automatically)
  • reply_to: str | None = None - the name of the queue where the response to the message should be sent (when using a blocking RPC, it is set automatically)
  • message_id: str | None = None - message ID (generated by RabbitMQ automatically)
  • timestamp: int | float | time delta | datetime | None - message sending time (set RabbitMQ automatically)
  • expiration: int | float | time delta | datetime | None - message lifetime (in seconds)
  • type: str | None = None - the type of message (used by consumers)
  • user_id: str | None - ID of the RabbitMQ user who sent the message
  • app_id: str | None - ID of the application that sent the message (used by consumers)

Send flags

Arguments for sending a message:

  • mandatory: bool = True - the client is waiting for confirmation that the message will be placed in some queue (if there are no queues, return it to the sender)
  • immediate: bool = False - the client expects that there is a consumer ready to take the message to work "right now" (if there is no consumer, return it to the sender)
  • timeout: int | float | None = None - send confirmation time from RabbitMQ

RPC arguments

Also publish supports common arguments for making RPC requests:

  • callback: bool = False - whether to wait for a response to the message
  • callback_timeout: float | None = 30.0 - response waiting timeout. In case of None - waits indefinitely
  • raise_timeout: bool = False
    • False - return None on timeout
    • True - TimeoutError error in case of timeout