Bases: ABC
, BasePublisher[MsgType]
, BaseRMQInformation
A class representing an ABCPublisher.
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
calls class-attribute
instance-attribute
calls: List[Callable[..., Any]] = field(
init=False, default_factory=list, repr=False
)
description property
description: Optional[str]
exchange class-attribute
instance-attribute
exchange: Optional[RabbitExchange] = field(default=None)
mandatory class-attribute
instance-attribute
message_kwargs class-attribute
instance-attribute
message_kwargs: AnyDict = field(default_factory=dict)
mock class-attribute
instance-attribute
mock: Optional[MagicMock] = field(
init=False, default=None, repr=False
)
persist class-attribute
instance-attribute
priority class-attribute
instance-attribute
priority: Optional[int] = None
queue class-attribute
instance-attribute
queue: RabbitQueue = field(default=RabbitQueue(''))
reply_to class-attribute
instance-attribute
reply_to: Optional[str] = None
routing_key class-attribute
instance-attribute
timeout class-attribute
instance-attribute
timeout: TimeoutType = None
title class-attribute
instance-attribute
title: Optional[str] = field(default=None)
virtual_host class-attribute
instance-attribute
get_payloads
get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/publisher.py
| def get_payloads(self) -> List[Tuple[AnyDict, str]]:
payloads: List[Tuple[AnyDict, str]] = []
if self._schema:
call_model: CallModel[Any, Any] = CallModel(
call=lambda: None,
model=create_model("Fake"),
response_model=create_model(
"",
__base__=(CreateBaseModel,), # type: ignore[arg-type]
response__=(self._schema, ...),
),
)
body = get_response_schema(
call_model,
prefix=f"{self.name}:Message",
)
if body: # pragma: no branch
payloads.append((body, ""))
else:
for call in self.calls:
call_model = build_call_model(call)
body = get_response_schema(
call_model,
prefix=f"{self.name}:Message",
)
if body:
payloads.append((body, to_camelcase(unwrap(call).__name__)))
return payloads
|
name
Source code in faststream/asyncapi/base.py
| @abstractproperty
def name(self) -> str:
raise NotImplementedError()
|
publish abstractmethod
async
publish(
message: SendableMessage,
correlation_id: Optional[str] = None,
**kwargs: Any
) -> Optional[SendableMessage]
Publish a message.
PARAMETER | DESCRIPTION |
message | The message to be published. TYPE: SendableMessage |
correlation_id | Optional correlation ID for the message. TYPE: Optional[str] DEFAULT: None |
**kwargs | Additional keyword arguments. TYPE: Any DEFAULT: {} |
RETURNS | DESCRIPTION |
Optional[SendableMessage] | |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/publisher.py
| @abstractmethod
async def publish(
self,
message: SendableMessage,
correlation_id: Optional[str] = None,
**kwargs: Any,
) -> Optional[SendableMessage]:
"""Publish a message.
Args:
message: The message to be published.
correlation_id: Optional correlation ID for the message.
**kwargs: Additional keyword arguments.
Returns:
The published message.
Raises:
NotImplementedError: If the method is not implemented.
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
raise NotImplementedError()
|
reset_test
Source code in faststream/broker/publisher.py
| def reset_test(self) -> None:
self._fake_handler = False
self.mock = None
|
schema
schema() -> Dict[str, Channel]
Source code in faststream/asyncapi/base.py
| def schema(self) -> Dict[str, Channel]: # pragma: no cover
return {}
|
set_test
set_test(mock: MagicMock, with_fake: bool) -> None
Source code in faststream/broker/publisher.py
| def set_test(
self,
mock: MagicMock,
with_fake: bool,
) -> None:
self.mock = mock
self._fake_handler = with_fake
|