def__init__(self,*,no_ack:bool,no_reply:bool,retry:Union[bool,int],broker_dependencies:Iterable["Depends"],broker_middlewares:Sequence["BrokerMiddleware[MsgType]"],default_parser:"AsyncCallable",default_decoder:"AsyncCallable",# AsyncAPI informationtitle_:Optional[str],description_:Optional[str],include_in_schema:bool,)->None:"""Initialize a new instance of the class."""self.calls=[]self._parser=default_parserself._decoder=default_decoderself._no_reply=no_reply# Watcher argsself._no_ack=no_ackself._retry=retryself._call_options=Noneself._call_decorators=()self.running=Falseself.lock=sync_fake_context()# Setup in includeself._broker_dependencies=broker_dependenciesself._broker_middlewares=broker_middlewares# register in setup laterself._producer=Noneself.graceful_timeout=Noneself.extra_context={}self.extra_watcher_options={}# AsyncAPIself.title_=title_self.description_=description_self.include_in_schema=include_in_schema
@abstractmethodasyncdefclose(self)->None:"""Close the handler. Blocks event loop up to graceful_timeout seconds. """self.running=Falseifisinstance(self.lock,MultiLock):awaitself.lock.wait_release(self.graceful_timeout)
asyncdefconsume(self,msg:MsgType)->Any:"""Consume a message asynchronously."""ifnotself.running:returnNonetry:returnawaitself.process_message(msg)exceptStopConsume:# Stop handler at StopConsume exceptionawaitself.close()exceptSystemExit:# Stop handler at `exit()` callawaitself.close()ifapp:=context.get("app"):app.exit()exceptException:# nosec B110# All other exceptions were logged by CriticalLogMiddlewarepass
asyncdefprocess_message(self,msg:MsgType)->"Response":"""Execute all message processing stages."""asyncwithAsyncExitStack()asstack:stack.enter_context(self.lock)# Enter context before middlewaresfork,vinself.extra_context.items():stack.enter_context(context.scope(k,v))stack.enter_context(context.scope("handler_",self))# enter all middlewaresmiddlewares:List[BaseMiddleware]=[]forbase_minself._broker_middlewares:middleware=base_m(msg)middlewares.append(middleware)awaitmiddleware.__aenter__()cache:Dict[Any,Any]={}parsing_error:Optional[Exception]=Noneforhinself.calls:try:message=awaith.is_suitable(msg,cache)exceptExceptionase:parsing_error=ebreakifmessageisnotNone:# Acknowledgement scope# TODO: move it to scope enter at `retry` option deprecationawaitstack.enter_async_context(self.watcher(message,**self.extra_watcher_options,))stack.enter_context(context.scope("log_context",self.get_log_context(message)))stack.enter_context(context.scope("message",message))# Middlewares should be exited before scope releaseforminmiddlewares:stack.push_async_exit(m.__aexit__)result_msg=ensure_response(awaith.call(message=message,# consumer middlewares_extra_middlewares=(m.consume_scopeforminmiddlewares[::-1]),))ifnotresult_msg.correlation_id:result_msg.correlation_id=message.correlation_idforpinchain(self.__get_response_publisher(message),h.handler._publishers,):awaitp.publish(result_msg.body,**result_msg.as_publish_kwargs(),# publisher middlewares_extra_middlewares=[m.publish_scopeforminmiddlewares[::-1]],)# Return data for testsreturnresult_msg# Suitable handler was not found or# parsing/decoding exception occurredforminmiddlewares:stack.push_async_exit(m.__aexit__)ifparsing_error:raiseparsing_errorelse:raiseSubscriberNotFound(f"There is no suitable handler for {msg=}")# An error was raised and processed by some middlewarereturnensure_response(None)
defget_description(self)->Optional[str]:"""Returns the description of the handler."""ifnotself.calls:# pragma: no coverreturnNoneiflen(self.calls)==1:returnself.calls[0].descriptionreturn"\n".join(f"{to_camelcase(h.call_name)}: {h.description}"forhinself.calls)
defget_payloads(self)->List[Tuple["AnyDict",str]]:"""Get the payloads of the handler."""payloads:List[Tuple[AnyDict,str]]=[]forhinself.calls:ifh.dependantisNone:raiseSetupError("You should setup `Handler` at first.")body=parse_handler_params(h.dependant,prefix=f"{self.title_orself.call_name}:Message",)payloads.append((body,to_camelcase(h.call_name)))ifnotself.calls:payloads.append(({"title":f"{self.title_orself.call_name}:Message:Payload",},to_camelcase(self.call_name),))returnpayloads
defschema(self)->Dict[str,Channel]:"""Returns the schema of the API operation as a dictionary of channel names and channel objects."""ifself.include_in_schema:returnself.get_schema()else:return{}