You are viewing documentation for version 2 of the AWS SDK for Ruby. Version 3 documentation can be found here.
Class: Aws::SQS::QueuePollerA utility class for long polling messages in a loop. Messages are automatically deleted from the queue at the end of the given block.
poller = Aws::SQS::QueuePoller.new(queue_url)
poller.poll do |msg|
puts msg.body
end
Long Polling
By default, messages are received using long polling. This method will force a default :wait_time_seconds
of 20 seconds. If you prefer to use the queue default wait time, then pass a nil
value for :wait_time_seconds
.
poller.poll(wait_time_seconds:nil) do |msg|
end
When disabling :wait_time_seconds
by passing nil
, you must ensure the queue ReceiveMessageWaitTimeSeconds
attribute is set to a non-zero value, or you will be short-polling. This will trigger significantly more API calls.
You can specify a maximum number of messages to receive with each polling attempt via :max_number_of_messages
. When this is set to a positive value, greater than 1, the block will receive an array of messages, instead of a single message.
poller.poll do |msg|
end
poller.poll(max_number_of_messages:10) do |messages|
messages.each do |msg|
end
end
The maximum value for :max_number_of_messages
is enforced by Amazon SQS.
When receiving messages, you have a fixed amount of time to process and delete the message before it is added back into the queue. This is the visibility timeout. By default, the queue's VisibilityTimeout
attribute is used. You can provide an alternative visibility timeout when polling.
poller.poll do |msg|
end
poller.poll(visibility_timeout:10) do |msg|
end
You can reset the visibility timeout of a single message by calling #change_message_visibility_timeout. This is useful when you need more time to finish processing the message.
poller.poll do |msg|
poller.change_message_visibility_timeout(msg, 60)
end
If you change the visibility timeout of a message to zero, it will return to the queue immediately.
Deleting MessagesMessages are deleted from the queue when the block returns normally.
poller.poll do |msg|
end
You can skip message deletion by passing skip_delete: true
. This allows you to manually delete the messages using #delete_message, or #delete_messages.
poller.poll(skip_delete: true) do |msg|
poller.delete_message(msg) end
poller.poll(skip_delete: true, max_number_of_messages:10) do |messages|
poller.delete_messages(messages)
end
Another way to manage message deletion is to throw :skip_delete
from the poll block. You can use this to choose when a message, or message batch is deleted on an individual basis. This can be very useful when you are capturing temporal errors and wish for the message to timeout.
poller.poll do |msg|
begin
rescue
throw :skip_delete
end
end
Terminating the Polling Loop
By default, polling will continue indefinitely. You can stop the poller by providing an idle timeout or by throwing :stop_polling
from the #before_request callback.
:idle_timeout
Option
This is a configurable, maximum number of seconds to wait for a new message before the polling loop exists. By default, there is no idle timeout.
poller.poll(idle_timeout: 60) do |msg|
end
Throw :stop_polling
If you want more fine grained control, you can configure a before request callback to trigger before each long poll. Throwing :stop_polling
from this callback will cause the poller to exit normally without making the next request.
poller.before_request do |stats|
throw :stop_polling if stats.received_message_count >= 100
end
poller.poll do |msg|
end
Tracking Progress
The poller will automatically track a few statistics client-side in a PollerStats object. You can access the poller stats three ways:
Here are examples of accessing the statistics.
Configure a #before_request callback.
poller.before_request do |stats|
logger.info("requests: #{stats.request_count}")
logger.info("messages: #{stats.received_message_count}")
logger.info("last-timestamp: #{stats.last_message_received_at}")
end
Accept a 2nd argument in the poll block, for example:
poller.poll do |msg, stats|
logger.info("requests: #{stats.request_count}")
logger.info("messages: #{stats.received_message_count}")
logger.info("last-timestamp: #{stats.last_message_received_at}")
end
Return value:
stats = poller.poll(idle_timeout:10) do |msg|
end
logger.info("requests: #{stats.request_count}")
logger.info("messages: #{stats.received_message_count}")
logger.info("last-timestamp: #{stats.last_message_received_at}")
Classes: PollerConfig, PollerStats
Instance Attribute Summary collapseRegisters a callback that is invoked once before every polling attempt.
A new instance of QueuePoller.
Polls the queue, yielded a message, or an array of messages.
Returns a new instance of QueuePoller.
208 209 210 211 212
# File 'aws-sdk-resources/lib/aws-sdk-resources/services/sqs/queue_poller.rb', line 208 def initialize(queue_url, options = {}) @queue_url = queue_url @client = options.delete(:client) || Client.new @default_config = PollerConfig.new(options) endInstance Attribute Details #client ⇒ Client
218 219 220
# File 'aws-sdk-resources/lib/aws-sdk-resources/services/sqs/queue_poller.rb', line 218 def client @client end#default_config ⇒ PollerConfig
221 222 223
# File 'aws-sdk-resources/lib/aws-sdk-resources/services/sqs/queue_poller.rb', line 221 def default_config @default_config end#queue_url ⇒ String
215 216 217
# File 'aws-sdk-resources/lib/aws-sdk-resources/services/sqs/queue_poller.rb', line 215 def queue_url @queue_url endInstance Method Details #before_request {|stats| ... } ⇒ void
This method returns an undefined value.
Registers a callback that is invoked once before every polling attempt.
poller.before_request do |stats|
logger.info("requests: #{stats.request_count}")
logger.info("messages: #{stats.received_message_count}")
logger.info("last-timestamp: #{stats.last_message_received_at}")
end
poller.poll do |msg|
end
:stop_polling
If you throw :stop_polling
from the #before_request callback, then the poller will exit normally before making the next long poll request.
poller.before_request do |stats|
throw :stop_polling if stats.received_messages >= 100
end
poller.poll do |msg|
end
255 256 257
# File 'aws-sdk-resources/lib/aws-sdk-resources/services/sqs/queue_poller.rb', line 255 def before_request(&block) @default_config = @default_config.with(before_request: block) if block_given? end#change_message_visibility_timeout(message, seconds) ⇒ Object Note:
This method should be called from inside a #poll block.
348 349 350 351 352 353 354
# File 'aws-sdk-resources/lib/aws-sdk-resources/services/sqs/queue_poller.rb', line 348 def change_message_visibility_timeout(message, seconds) @client.change_message_visibility({ queue_url: @queue_url, receipt_handle: message.receipt_handle, visibility_timeout: seconds, }) end#delete_message(message) ⇒ Object Note:
This method should be called from inside a #poll block.
359 360 361 362 363 364
# File 'aws-sdk-resources/lib/aws-sdk-resources/services/sqs/queue_poller.rb', line 359 def delete_message(message) @client.delete_message({ queue_url: @queue_url, receipt_handle: message.receipt_handle, }) end#delete_messages(messages) ⇒ Object Note:
This method should be called from inside a #poll block.
370 371 372 373 374 375 376 377
# File 'aws-sdk-resources/lib/aws-sdk-resources/services/sqs/queue_poller.rb', line 370 def delete_messages(messages) @client.delete_message_batch( queue_url: @queue_url, entries: messages.map { |msg| { id: msg.message_id, receipt_handle: msg.receipt_handle } } ) end#poll(options = {}, &block) ⇒ PollerStats
Polls the queue, yielded a message, or an array of messages. Messages are automatically deleted from the queue at the end of the given block. See the class documentation on Aws::SQS::QueuePoller for more examples.
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
# File 'aws-sdk-resources/lib/aws-sdk-resources/services/sqs/queue_poller.rb', line 327 def poll(options = {}, &block) config = @default_config.with(options) stats = PollerStats.new catch(:stop_polling) do loop do messages = get_messages(config, stats) if messages.empty? check_idle_timeout(config, stats, messages) else process_messages(config, stats, messages, &block) end end end stats.polling_stopped_at = Time.now stats end
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4