AsyncConfluentProducer
faststream.confluent.client.AsyncConfluentProducer #
AsyncConfluentProducer(*, loop: Optional[AbstractEventLoop] = None, bootstrap_servers: Union[str, List[str]] = 'localhost', client_id: Optional[str] = None, metadata_max_age_ms: int = 300000, request_timeout_ms: int = 40000, api_version: str = 'auto', acks: Any = _missing, key_serializer: Optional[Callable[[bytes], bytes]] = None, value_serializer: Optional[Callable[[bytes], bytes]] = None, compression_type: Optional[str] = None, max_batch_size: int = 16384, partitioner: str = 'consistent_random', max_request_size: int = 1048576, linger_ms: int = 0, send_backoff_ms: int = 100, retry_backoff_ms: int = 100, security_protocol: str = 'PLAINTEXT', ssl_context: Optional[SSLContext] = None, connections_max_idle_ms: int = 540000, enable_idempotence: bool = False, transactional_id: Optional[Union[str, int]] = None, transaction_timeout_ms: int = 60000, sasl_mechanism: Optional[str] = None, sasl_plain_password: Optional[str] = None, sasl_plain_username: Optional[str] = None, sasl_kerberos_service_name: str = 'kafka', sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None)
An asynchronous Python Kafka client using the "confluent-kafka" package.
Initializes the AsyncConfluentProducer with the given configuration.
PARAMETER | DESCRIPTION |
---|---|
loop | The event loop to use for asynchronous operations. TYPE: |
bootstrap_servers | A list of bootstrap servers for Kafka. |
client_id | A unique identifier for the client. |
metadata_max_age_ms | The maximum age of metadata before a refresh is forced. TYPE: |
request_timeout_ms | The maximum time to wait for a request to complete. TYPE: |
api_version | The Kafka API version to use. TYPE: |
acks | The number of acknowledgments the producer requires before considering a request complete. TYPE: |
key_serializer | A callable to serialize the key. |
value_serializer | A callable to serialize the value. |
compression_type | The compression type for message batches. |
max_batch_size | The maximum size of a message batch. TYPE: |
partitioner | The partitioning strategy to use when sending messages. TYPE: |
max_request_size | The maximum size of a request in bytes. TYPE: |
linger_ms | The time to wait before sending a batch in milliseconds. TYPE: |
send_backoff_ms | The time to back off when sending fails. TYPE: |
retry_backoff_ms | The time to back off when a retry is needed. TYPE: |
security_protocol | The security protocol to use. TYPE: |
ssl_context | The SSL context for secure connections. TYPE: |
connections_max_idle_ms | The maximum time a connection can be idle. TYPE: |
enable_idempotence | Whether to enable idempotent producer capabilities. TYPE: |
transactional_id | The transactional ID for transactional delivery. |
transaction_timeout_ms | The maximum time allowed for transactions. TYPE: |
sasl_mechanism | The SASL mechanism to use for authentication. TYPE: |
sasl_plain_password | The password for SASL/PLAIN authentication. |
sasl_plain_username | The username for SASL/PLAIN authentication. |
sasl_kerberos_service_name | The Kerberos service name for SASL/GSSAPI. TYPE: |
sasl_kerberos_domain_name | The Kerberos domain name for SASL/GSSAPI. |
sasl_oauth_token_provider | The OAuth token provider for SASL/OAUTHBEARER. |
RAISES | DESCRIPTION |
---|---|
ValueError | If the provided bootstrap_servers is not a string or list of strings. |
Source code in faststream/confluent/client.py
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
|
config instance-attribute
#
config = {'bootstrap.servers': bootstrap_servers, 'client.id': client_id, 'metadata.max.age.ms': metadata_max_age_ms, 'request.timeout.ms': request_timeout_ms, 'acks': acks, 'compression.type': compression_type, 'partitioner': partitioner, 'message.max.bytes': max_request_size, 'linger.ms': linger_ms, 'enable.idempotence': enable_idempotence, 'transactional.id': transactional_id, 'transaction.timeout.ms': transaction_timeout_ms, 'retry.backoff.ms': retry_backoff_ms, 'security.protocol': lower(), 'connections.max.idle.ms': connections_max_idle_ms, 'sasl.kerberos.service.name': sasl_kerberos_service_name}
create_batch #
create_batch() -> BatchBuilder
Creates a batch for sending multiple messages.
RETURNS | DESCRIPTION |
---|---|
BatchBuilder | An instance of BatchBuilder for building message batches. TYPE: |
send async
#
send(topic: str, value: Optional[Union[str, bytes]] = None, key: Optional[Union[str, bytes]] = None, partition: Optional[int] = None, timestamp_ms: Optional[int] = None, headers: Optional[List[Tuple[str, Union[str, bytes]]]] = None) -> None
Sends a single message to a Kafka topic.
PARAMETER | DESCRIPTION |
---|---|
topic | The topic to send the message to. TYPE: |
value | The message value. |
key | The message key. |
partition | The partition to send the message to. |
timestamp_ms | The timestamp of the message in milliseconds. |
headers | A list of headers for the message. TYPE: |
Source code in faststream/confluent/client.py
send_batch async
#
send_batch(batch: BatchBuilder, topic: str, *, partition: Optional[int]) -> None
Sends a batch of messages to a Kafka topic.
PARAMETER | DESCRIPTION |
---|---|
batch | The batch of messages to send. TYPE: |
topic | The topic to send the messages to. TYPE: |
partition | The partition to send the messages to. |