def__init__(self,*args:Any,logger:Optional["LoggerProto"]=EMPTY,log_level:int=logging.INFO,log_fmt:Optional[str]=None,**kwargs:Any,)->None:super().__init__(*args,logger=logger,# TODO: generate unique logger names to not share between brokersdefault_logger=get_broker_logger(name="redis",default_context={"channel":"",},message_id_ln=self.__max_msg_id_ln,),log_level=log_level,log_fmt=log_fmt,**kwargs,)self._max_channel_name=4
defsetup(self)->None:"""Prepare all Broker entities to startup."""forhinself._subscribers.values():self.setup_subscriber(h)forpinself._publishers.values():self.setup_publisher(p)
defadd_middleware(self,middleware:"BrokerMiddleware[MsgType]")->None:"""Append BrokerMiddleware to the end of middlewares list. Current middleware will be used as a most inner of already existed ones. """self._middlewares=(*self._middlewares,middleware)forsubinself._subscribers.values():sub.add_middleware(middleware)forpubinself._publishers.values():pub.add_middleware(middleware)
definclude_router(self,router:"ABCBroker[Any]",*,prefix:str="",dependencies:Iterable["Depends"]=(),middlewares:Iterable["BrokerMiddleware[MsgType]"]=(),include_in_schema:Optional[bool]=None,)->None:"""Includes a router in the current object."""forhinrouter._subscribers.values():h.add_prefix("".join((self.prefix,prefix)))if(key:=hash(h))notinself._subscribers:ifinclude_in_schemaisNone:h.include_in_schema=self._solve_include_in_schema(h.include_in_schema)else:h.include_in_schema=include_in_schemah._broker_middlewares=(*self._middlewares,*middlewares,*h._broker_middlewares,)h._broker_dependencies=(*self._dependencies,*dependencies,*h._broker_dependencies,)self._subscribers={**self._subscribers,key:h}forpinrouter._publishers.values():p.add_prefix(self.prefix)if(key:=hash(p))notinself._publishers:ifinclude_in_schemaisNone:p.include_in_schema=self._solve_include_in_schema(p.include_in_schema)else:p.include_in_schema=include_in_schemap._broker_middlewares=(*self._middlewares,*middlewares,*p._broker_middlewares,)self._publishers={**self._publishers,key:p}
asyncdefconnect(self,**kwargs:Any)->ConnectionType:"""Connect to a remote server."""ifself._connectionisNone:connection_kwargs=self._connection_kwargs.copy()connection_kwargs.update(kwargs)self._connection=awaitself._connect(**connection_kwargs)self.setup()returnself._connection
defsetup_subscriber(self,subscriber:SubscriberProto[MsgType],**kwargs:Any,)->None:"""Setup the Subscriber to prepare it to starting."""data=self._subscriber_setup_extra.copy()data.update(kwargs)subscriber.setup(**data)
defsetup_publisher(self,publisher:"PublisherProto[MsgType]",**kwargs:Any,)->None:"""Setup the Publisher to prepare it to starting."""data=self._publisher_setup_extra.copy()data.update(kwargs)publisher.setup(**data)
asyncdefclose(self,exc_type:Optional[Type[BaseException]]=None,exc_val:Optional[BaseException]=None,exc_tb:Optional["TracebackType"]=None,)->None:"""Closes the object."""self.running=Falseforhinself._subscribers.values():awaith.close()ifself._connectionisnotNone:awaitself._close(exc_type,exc_val,exc_tb)