A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://learn.microsoft.com/en-us/python/api/overview/azure/servicebus-readme below:

Azure Service Bus client library for Python

Azure Service Bus is a high performance cloud-managed messaging service for providing real-time and fault-tolerant communication between distributed senders and receivers.

Service Bus provides multiple mechanisms for asynchronous highly reliable communication, such as structured first-in-first-out messaging, publish/subscribe capabilities, and the ability to easily scale as your needs grow.

Use the Service Bus client library for Python to communicate between applications and services and implement asynchronous messaging patterns.

Source code | Package (PyPi) | Package (Conda) | API reference documentation | Product documentation | Samples | Changelog

NOTE: If you are using version 0.50 or lower and want to migrate to the latest version of this package please look at our migration guide to move from Service Bus V0.50 to Service Bus V7.

Getting started Install the package

Install the Azure Service Bus client library for Python with pip:

pip install azure-servicebus
Prerequisites:

To use this package, you must have:

If you need an Azure service bus namespace, you can create it via the Azure Portal. If you do not wish to use the graphical portal UI, you can use the Azure CLI via Cloud Shell, or Azure CLI run locally, to create one with this Azure CLI command:

az servicebus namespace create --resource-group <resource-group-name> --name <servicebus-namespace-name> --location <servicebus-namespace-location>
Authenticate the client

Interaction with Service Bus starts with an instance of the ServiceBusClient class. You either need a connection string with SAS key, or a namespace and one of its account keys to instantiate the client object. Please find the samples linked below for demonstration as to how to authenticate via either approach.

Create client from connection string Create client using the azure-identity library:

Note: client can be initialized without a context manager, but must be manually closed via client.close() to not leak resources.

Key concepts

Once you've initialized a ServiceBusClient, you can interact with the primary resource types within a Service Bus Namespace, of which multiple can exist and on which actual message transmission takes place, the namespace often serving as an application container:

For more information about these resources, see What is Azure Service Bus?.

To interact with these resources, one should be familiar with the following SDK concepts:

Thread safety

We do not guarantee that the ServiceBusClient, ServiceBusSender, and ServiceBusReceiver are thread-safe. We do not recommend reusing these instances across threads. It is up to the running application to use these classes in a thread-safe manner.

Examples

The following sections provide several code snippets covering some of the most common Service Bus tasks, including:

To perform management tasks such as creating and deleting queues/topics/subscriptions, please utilize the azure-mgmt-servicebus library, available here.

Please find further examples in the samples directory demonstrating common Service Bus scenarios such as sending, receiving, session management and message handling.

Send messages to a queue

NOTE: see reference documentation here.

This example sends single message and array of messages to a queue that is assumed to already exist, created via the Azure portal or az commands.

from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential

import os
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

credential = DefaultAzureCredential()
with ServiceBusClient(fully_qualified_namespace, credential) as client:
    with client.get_queue_sender(queue_name) as sender:
        # Sending a single message
        single_message = ServiceBusMessage("Single message")
        sender.send_messages(single_message)

        # Sending a list of messages
        messages = [ServiceBusMessage("First message"), ServiceBusMessage("Second message")]
        sender.send_messages(messages)

NOTE: A message may be scheduled for delayed delivery using the ServiceBusSender.schedule_messages() method, or by specifying ServiceBusMessage.scheduled_enqueue_time_utc before calling ServiceBusSender.send_messages()

For more detail on scheduling and schedule cancellation please see a sample here.

Receive messages from a queue

To receive from a queue, you can either perform an ad-hoc receive via receiver.receive_messages() or receive persistently through the receiver itself.

Receive messages from a queue through iterating over ServiceBusReceiver
from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential

import os
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

credential = DefaultAzureCredential()
with ServiceBusClient(fully_qualified_namespace, credential) as client:
    # max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
    # Default is None; to receive forever.
    with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
        for msg in receiver:  # ServiceBusReceiver instance is a generator.
            print(str(msg))
            # If it is desired to halt receiving early, one can break out of the loop here safely.

NOTE: Any message received with receive_mode=PEEK_LOCK (this is the default, with the alternative RECEIVE_AND_DELETE removing the message from the queue immediately on receipt) has a lock that must be renewed via receiver.renew_message_lock before it expires if processing would take longer than the lock duration. See AutoLockRenewer for a helper to perform this in the background automatically. Lock duration is set in Azure on the queue or topic itself.

Receive messages from a queue through ServiceBusReceiver.receive_messages()

NOTE: ServiceBusReceiver.receive_messages() receives a single or constrained list of messages through an ad-hoc method call, as opposed to receiving perpetually from the generator. It always returns a list.

from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential

import os
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

credential = DefaultAzureCredential()
with ServiceBusClient(fully_qualified_namespace, credential) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        received_message_array = receiver.receive_messages(max_wait_time=10)  # try to receive a single message within 10 seconds
        if received_message_array:
            print(str(received_message_array[0]))

    with client.get_queue_receiver(queue_name) as receiver:
        received_message_array = receiver.receive_messages(max_message_count=5, max_wait_time=10)  # try to receive maximum 5 messages in a batch within 10 seconds
        for message in received_message_array:
            print(str(message))

In this example, max_message_count declares the maximum number of messages to attempt receiving before hitting a max_wait_time as specified in seconds.

NOTE: It should also be noted that ServiceBusReceiver.peek_messages() is subtly different than receiving, as it does not lock the messages being peeked, and thus they cannot be settled.

Send and receive a message from a session enabled queue

NOTE: see reference documentation for session send and receive.

Sessions provide first-in-first-out and single-receiver semantics on top of a queue or subscription. While the actual receive syntax is the same, initialization differs slightly.

from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential

import os
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
queue_name = os.environ['SERVICE_BUS_SESSION_QUEUE_NAME']
session_id = os.environ['SERVICE_BUS_SESSION_ID']

credential = DefaultAzureCredential()
with ServiceBusClient(fully_qualified_namespace, credential) as client:
    with client.get_queue_sender(queue_name) as sender:
        sender.send_messages(ServiceBusMessage("Session Enabled Message", session_id=session_id))

    # If session_id is null here, will receive from the first available session.
    with client.get_queue_receiver(queue_name, session_id=session_id) as receiver:
        for msg in receiver:
            print(str(msg))

NOTE: Messages received from a session do not need their locks renewed like a non-session receiver; instead the lock management occurs at the session level with a session lock that may be renewed with receiver.session.renew_lock()

Working with topics and subscriptions

NOTE: see reference documentation for topics and subscriptions.

Topics and subscriptions give an alternative to queues for sending and receiving messages. See documents here for more overarching detail, and of how these differ from queues.

from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential


import os
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']

credential = DefaultAzureCredential()
with ServiceBusClient(fully_qualified_namespace, credential) as client:
    with client.get_topic_sender(topic_name) as sender:
        sender.send_messages(ServiceBusMessage("Data"))

    # If session_id is null here, will receive from the first available session.
    with client.get_subscription_receiver(topic_name, subscription_name) as receiver:
        for msg in receiver:
            print(str(msg))
Settle a message after receipt

When receiving from a queue, you have multiple actions you can take on the messages you receive.

NOTE: You can only settle ServiceBusReceivedMessage objects which are received in ServiceBusReceiveMode.PEEK_LOCK mode (this is the default). ServiceBusReceiveMode.RECEIVE_AND_DELETE mode removes the message from the queue on receipt. ServiceBusReceivedMessage messages returned from peek_messages() cannot be settled, as the message lock is not taken like it is in the aforementioned receive methods.

If the message has a lock as mentioned above, settlement will fail if the message lock has expired. If processing would take longer than the lock duration, it must be maintained via receiver.renew_message_lock before it expires. Lock duration is set in Azure on the queue or topic itself. See AutoLockRenewer for a helper to perform this in the background automatically.

Complete

Declares the message processing to be successfully completed, removing the message from the queue.

from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential


import os
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

credential = DefaultAzureCredential()
with ServiceBusClient(fully_qualified_namespace, credential) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver:
            print(str(msg))
            receiver.complete_message(msg)
Abandon

Abandon processing of the message for the time being, returning the message immediately back to the queue to be picked up by another (or the same) receiver.

from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential


import os
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

credential = DefaultAzureCredential()
with ServiceBusClient(fully_qualified_namespace, credential) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver:
            print(str(msg))
            receiver.abandon_message(msg)
DeadLetter

Transfer the message from the primary queue into a special "dead-letter sub-queue" where it can be accessed using the ServiceBusClient.get_<queue|subscription>_receiver function with parameter sub_queue=ServiceBusSubQueue.DEAD_LETTER and consumed from like any other receiver. (see sample here)

from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential


import os
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

credential = DefaultAzureCredential()
with ServiceBusClient(fully_qualified_namespace, credential) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver:
            print(str(msg))
            receiver.dead_letter_message(msg)
Defer

Defer is subtly different from the prior settlement methods. It prevents the message from being directly received from the queue by setting it aside such that it must be received by sequence number in a call to ServiceBusReceiver.receive_deferred_messages (see sample here)

from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential


import os
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

credential = DefaultAzureCredential()
with ServiceBusClient(fully_qualified_namespace, credential) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver:
            print(str(msg))
            receiver.defer_message(msg)
Automatically renew Message or Session locks

NOTE: see reference documentation for auto-lock-renewal.

AutoLockRenewer is a simple method for ensuring your message or session remains locked even over long periods of time, if calling receiver.renew_message_lock/receiver.session.renew_lock is impractical or undesired. Internally, it is not much more than shorthand for creating a concurrent watchdog to do lock renewal if the object is nearing expiry. It should be used as follows:

from azure.servicebus import ServiceBusClient, AutoLockRenewer
from azure.identity import DefaultAzureCredential


import os
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

# Can also be called via "with AutoLockRenewer() as renewer" to automate closing.
renewer = AutoLockRenewer()
credential = DefaultAzureCredential()
with ServiceBusClient(fully_qualified_namespace, credential) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver.receive_messages():
            renewer.register(receiver, msg, max_lock_renewal_duration=60)
            # Do your application logic here
            receiver.complete_message(msg)
renewer.close()
from azure.servicebus import ServiceBusClient, AutoLockRenewer
from azure.identity import DefaultAzureCredential


import os
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
session_queue_name = os.environ['SERVICE_BUS_SESSION_QUEUE_NAME']
session_id = os.environ['SERVICE_BUS_SESSION_ID']

# Can also be called via "with AutoLockRenewer() as renewer" to automate closing.
renewer = AutoLockRenewer()
credential = DefaultAzureCredential()
with ServiceBusClient(fully_qualified_namespace, credential) as client:
    with client.get_queue_receiver(session_queue_name, session_id=session_id) as receiver:
        renewer.register(receiver, receiver.session, max_lock_renewal_duration=300) # Duration for how long to maintain the lock for, in seconds.

        for msg in receiver.receive_messages():
            # Do your application logic here
            receiver.complete_message(msg)
renewer.close()

If for any reason auto-renewal has been interrupted or failed, this can be observed via the auto_renew_error property on the object being renewed, or by having passed a callback to the on_lock_renew_failure parameter on renewer initialization. It would also manifest when trying to take action (such as completing a message) on the specified object.

Troubleshooting Logging
import logging
import sys

handler = logging.StreamHandler(stream=sys.stdout)
log_fmt = logging.Formatter(fmt="%(asctime)s | %(threadName)s | %(levelname)s | %(name)s | %(message)s")
handler.setFormatter(log_fmt)
logger = logging.getLogger('azure.servicebus')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

...

from azure.servicebus import ServiceBusClient

client = ServiceBusClient(..., logging_enable=True)
Timeouts

There are various timeouts a user should be aware of within the library.

NOTE: If processing of a message or session is sufficiently long as to cause timeouts, as an alternative to calling receiver.renew_message_lock/receiver.session.renew_lock manually, one can leverage the AutoLockRenewer functionality detailed above.

Common Exceptions

The Service Bus APIs generate the following exceptions in azure.servicebus.exceptions:

Please view the exceptions reference docs for detailed descriptions of our common Exception types.

Next steps More sample code

Please find further examples in the samples directory demonstrating common Service Bus scenarios such as sending, receiving, session management and message handling.

Additional documentation

For more extensive documentation on the Service Bus service, see the Service Bus documentation on learn.microsoft.com.

Management capabilities and documentation

For users seeking to perform management operations against ServiceBus (Creating a queue/topic/etc, altering filter rules, enumerating entities) please see the azure-mgmt-servicebus documentation for API documentation. Terse usage examples can be found here as well.

Pure Python AMQP Transport and Backward Compatibility Support

The Azure Service Bus client library is now based on a pure Python AMQP implementation. uAMQP has been removed as required dependency.

To use uAMQP as the underlying transport:

  1. Install uamqp with pip.
$ pip install uamqp
  1. Pass uamqp_transport=True during client construction.
from azure.servicebus import ServiceBusClient
connection_str = '<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>'
queue_name = '<< NAME OF THE QUEUE >>'
client = ServiceBusClient.from_connection_string(
    connection_str, uamqp_transport=True
)

Note: The message attribute on ServiceBusMessage/ServiceBusMessageBatch/ServiceBusReceivedMessage, which previously exposed the uamqp.Message, has been deprecated. The "Legacy" objects returned by message attribute have been introduced to help facilitate the transition.

To enable the uamqp logger to collect traces from the underlying uAMQP library:

import logging

uamqp_logger = logging.getLogger('uamqp')
uamqp_logger.setLevel(logging.DEBUG)
uamqp_logger.addHandler(handler)

...

from azure.servicebus import ServiceBusClient

client = ServiceBusClient(..., logging_enable=True)

There may be cases where you consider the uamqp logging to be too verbose. To suppress unnecessary logging, add the following snippet to the top of your code:

import logging

# The logging levels below may need to be changed based on the logging that you want to suppress.
uamqp_logger = logging.getLogger('uamqp')
uamqp_logger.setLevel(logging.ERROR)

# or even further fine-grained control, suppressing the warnings in uamqp.connection module
uamqp_connection_logger = logging.getLogger('uamqp.connection')
uamqp_connection_logger.setLevel(logging.ERROR)
Building uAMQP wheel from source

azure-servicebus depends on the uAMQP for the AMQP protocol implementation. uAMQP wheels are provided for most major operating systems and will be installed automatically when installing azure-servicebus. If uAMQP is intended to be used as the underlying AMQP protocol implementation for azure-servicebus, uAMQP wheels can be found for most major operating systems.

If you're running on a platform for which uAMQP wheels are not provided, please follow If you intend to use uAMQP and you're running on a platform for which uAMQP wheels are not provided, please follow the uAMQP Installation guidance to install from source.

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4