Bases: ABC
, BasePublisher [MsgType ]
, BaseRMQInformation
A class representing an ABCPublisher.
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
calls class-attribute
instance-attribute
calls : List [ Callable [ ... , Any ]] = field (
init = False , default_factory = list , repr = False
)
description property
description : Optional [ str ]
exchange class-attribute
instance-attribute
exchange : Optional [ RabbitExchange ] = field ( default = None )
mandatory class-attribute
instance-attribute
message_kwargs class-attribute
instance-attribute
message_kwargs : AnyDict = field ( default_factory = dict )
mock class-attribute
instance-attribute
mock : Optional [ MagicMock ] = field (
init = False , default = None , repr = False
)
persist class-attribute
instance-attribute
priority class-attribute
instance-attribute
priority : Optional [ int ] = None
queue class-attribute
instance-attribute
queue : RabbitQueue = field ( default = RabbitQueue ( '' ))
reply_to class-attribute
instance-attribute
reply_to : Optional [ str ] = None
routing_key class-attribute
instance-attribute
timeout class-attribute
instance-attribute
timeout : TimeoutType = None
title class-attribute
instance-attribute
title : Optional [ str ] = field ( default = None )
virtual_host class-attribute
instance-attribute
get_payloads get_payloads () -> List [ Tuple [ AnyDict , str ]]
Source code in faststream/broker/publisher.py
def get_payloads ( self ) -> List [ Tuple [ AnyDict , str ]]:
payloads : List [ Tuple [ AnyDict , str ]] = []
if self . _schema :
call_model : CallModel [ Any , Any ] = CallModel (
call = lambda : None ,
model = create_model ( "Fake" ),
response_model = create_model (
"" ,
__base__ = ( CreateBaseModel ,), # type: ignore[arg-type]
response__ = ( self . _schema , ... ),
),
)
body = get_response_schema (
call_model ,
prefix = f " { self . name } :Message" ,
)
if body : # pragma: no branch
payloads . append (( body , "" ))
else :
for call in self . calls :
call_model = build_call_model ( call )
body = get_response_schema (
call_model ,
prefix = f " { self . name } :Message" ,
)
if body :
payloads . append (( body , to_camelcase ( unwrap ( call ) . __name__ )))
return payloads
name Source code in faststream/asyncapi/base.py
@abstractproperty
def name ( self ) -> str :
raise NotImplementedError ()
publish abstractmethod
async
publish (
message : SendableMessage ,
correlation_id : Optional [ str ] = None ,
** kwargs : Any
) -> Optional [ SendableMessage ]
Publish a message.
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/publisher.py
@abstractmethod
async def publish (
self ,
message : SendableMessage ,
correlation_id : Optional [ str ] = None ,
** kwargs : Any ,
) -> Optional [ SendableMessage ]:
"""Publish a message.
Args:
message: The message to be published.
correlation_id: Optional correlation ID for the message.
**kwargs: Additional keyword arguments.
Returns:
The published message.
Raises:
NotImplementedError: If the method is not implemented.
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
raise NotImplementedError ()
reset_test Source code in faststream/broker/publisher.py
def reset_test ( self ) -> None :
self . _fake_handler = False
self . mock = None
schema schema () -> Dict [ str , Channel ]
Source code in faststream/asyncapi/base.py
def schema ( self ) -> Dict [ str , Channel ]: # pragma: no cover
return {}
set_test set_test ( mock : MagicMock , with_fake : bool ) -> None
Source code in faststream/broker/publisher.py
def set_test (
self ,
mock : MagicMock ,
with_fake : bool ,
) -> None :
self . mock = mock
self . _fake_handler = with_fake