Create functions that transfer files using several of the available transfer manager settings. Use a callback class to write callback progress during file transfer.
import sys
import threading
import boto3
from boto3.s3.transfer import TransferConfig
MB = 1024 * 1024
s3 = boto3.resource("s3")
class TransferCallback:
"""
Handle callbacks from the transfer manager.
The transfer manager periodically calls the __call__ method throughout
the upload and download process so that it can take action, such as
displaying progress to the user and collecting data about the transfer.
"""
def __init__(self, target_size):
self._target_size = target_size
self._total_transferred = 0
self._lock = threading.Lock()
self.thread_info = {}
def __call__(self, bytes_transferred):
"""
The callback method that is called by the transfer manager.
Display progress during file transfer and collect per-thread transfer
data. This method can be called by multiple threads, so shared instance
data is protected by a thread lock.
"""
thread = threading.current_thread()
with self._lock:
self._total_transferred += bytes_transferred
if thread.ident not in self.thread_info.keys():
self.thread_info[thread.ident] = bytes_transferred
else:
self.thread_info[thread.ident] += bytes_transferred
target = self._target_size * MB
sys.stdout.write(
f"\r{self._total_transferred} of {target} transferred "
f"({(self._total_transferred / target) * 100:.2f}%)."
)
sys.stdout.flush()
def upload_with_default_configuration(
local_file_path, bucket_name, object_key, file_size_mb
):
"""
Upload a file from a local folder to an Amazon S3 bucket, using the default
configuration.
"""
transfer_callback = TransferCallback(file_size_mb)
s3.Bucket(bucket_name).upload_file(
local_file_path, object_key, Callback=transfer_callback
)
return transfer_callback.thread_info
def upload_with_chunksize_and_meta(
local_file_path, bucket_name, object_key, file_size_mb, metadata=None
):
"""
Upload a file from a local folder to an Amazon S3 bucket, setting a
multipart chunk size and adding metadata to the Amazon S3 object.
The multipart chunk size controls the size of the chunks of data that are
sent in the request. A smaller chunk size typically results in the transfer
manager using more threads for the upload.
The metadata is a set of key-value pairs that are stored with the object
in Amazon S3.
"""
transfer_callback = TransferCallback(file_size_mb)
config = TransferConfig(multipart_chunksize=1 * MB)
extra_args = {"Metadata": metadata} if metadata else None
s3.Bucket(bucket_name).upload_file(
local_file_path,
object_key,
Config=config,
ExtraArgs=extra_args,
Callback=transfer_callback,
)
return transfer_callback.thread_info
def upload_with_high_threshold(local_file_path, bucket_name, object_key, file_size_mb):
"""
Upload a file from a local folder to an Amazon S3 bucket, setting a
multipart threshold larger than the size of the file.
Setting a multipart threshold larger than the size of the file results
in the transfer manager sending the file as a standard upload instead of
a multipart upload.
"""
transfer_callback = TransferCallback(file_size_mb)
config = TransferConfig(multipart_threshold=file_size_mb * 2 * MB)
s3.Bucket(bucket_name).upload_file(
local_file_path, object_key, Config=config, Callback=transfer_callback
)
return transfer_callback.thread_info
def upload_with_sse(
local_file_path, bucket_name, object_key, file_size_mb, sse_key=None
):
"""
Upload a file from a local folder to an Amazon S3 bucket, adding server-side
encryption with customer-provided encryption keys to the object.
When this kind of encryption is specified, Amazon S3 encrypts the object
at rest and allows downloads only when the expected encryption key is
provided in the download request.
"""
transfer_callback = TransferCallback(file_size_mb)
if sse_key:
extra_args = {"SSECustomerAlgorithm": "AES256", "SSECustomerKey": sse_key}
else:
extra_args = None
s3.Bucket(bucket_name).upload_file(
local_file_path, object_key, ExtraArgs=extra_args, Callback=transfer_callback
)
return transfer_callback.thread_info
def download_with_default_configuration(
bucket_name, object_key, download_file_path, file_size_mb
):
"""
Download a file from an Amazon S3 bucket to a local folder, using the
default configuration.
"""
transfer_callback = TransferCallback(file_size_mb)
s3.Bucket(bucket_name).Object(object_key).download_file(
download_file_path, Callback=transfer_callback
)
return transfer_callback.thread_info
def download_with_single_thread(
bucket_name, object_key, download_file_path, file_size_mb
):
"""
Download a file from an Amazon S3 bucket to a local folder, using a
single thread.
"""
transfer_callback = TransferCallback(file_size_mb)
config = TransferConfig(use_threads=False)
s3.Bucket(bucket_name).Object(object_key).download_file(
download_file_path, Config=config, Callback=transfer_callback
)
return transfer_callback.thread_info
def download_with_high_threshold(
bucket_name, object_key, download_file_path, file_size_mb
):
"""
Download a file from an Amazon S3 bucket to a local folder, setting a
multipart threshold larger than the size of the file.
Setting a multipart threshold larger than the size of the file results
in the transfer manager sending the file as a standard download instead
of a multipart download.
"""
transfer_callback = TransferCallback(file_size_mb)
config = TransferConfig(multipart_threshold=file_size_mb * 2 * MB)
s3.Bucket(bucket_name).Object(object_key).download_file(
download_file_path, Config=config, Callback=transfer_callback
)
return transfer_callback.thread_info
def download_with_sse(
bucket_name, object_key, download_file_path, file_size_mb, sse_key
):
"""
Download a file from an Amazon S3 bucket to a local folder, adding a
customer-provided encryption key to the request.
When this kind of encryption is specified, Amazon S3 encrypts the object
at rest and allows downloads only when the expected encryption key is
provided in the download request.
"""
transfer_callback = TransferCallback(file_size_mb)
if sse_key:
extra_args = {"SSECustomerAlgorithm": "AES256", "SSECustomerKey": sse_key}
else:
extra_args = None
s3.Bucket(bucket_name).Object(object_key).download_file(
download_file_path, ExtraArgs=extra_args, Callback=transfer_callback
)
return transfer_callback.thread_info
Demonstrate the transfer manager functions and report results.
import hashlib
import os
import platform
import shutil
import time
import boto3
from boto3.s3.transfer import TransferConfig
from botocore.exceptions import ClientError
from botocore.exceptions import ParamValidationError
from botocore.exceptions import NoCredentialsError
import file_transfer
MB = 1024 * 1024
# These configuration attributes affect both uploads and downloads.
CONFIG_ATTRS = (
"multipart_threshold",
"multipart_chunksize",
"max_concurrency",
"use_threads",
)
# These configuration attributes affect only downloads.
DOWNLOAD_CONFIG_ATTRS = ("max_io_queue", "io_chunksize", "num_download_attempts")
class TransferDemoManager:
"""
Manages the demonstration. Collects user input from a command line, reports
transfer results, maintains a list of artifacts created during the
demonstration, and cleans them up after the demonstration is completed.
"""
def __init__(self):
self._s3 = boto3.resource("s3")
self._chore_list = []
self._create_file_cmd = None
self._size_multiplier = 0
self.file_size_mb = 30
self.demo_folder = None
self.demo_bucket = None
self._setup_platform_specific()
self._terminal_width = shutil.get_terminal_size(fallback=(80, 80))[0]
def collect_user_info(self):
"""
Collect local folder and Amazon S3 bucket name from the user. These
locations are used to store files during the demonstration.
"""
while not self.demo_folder:
self.demo_folder = input(
"Which file folder do you want to use to store " "demonstration files? "
)
if not os.path.isdir(self.demo_folder):
print(f"{self.demo_folder} isn't a folder!")
self.demo_folder = None
while not self.demo_bucket:
self.demo_bucket = input(
"Which Amazon S3 bucket do you want to use to store "
"demonstration files? "
)
try:
self._s3.meta.client.head_bucket(Bucket=self.demo_bucket)
except ParamValidationError as err:
print(err)
self.demo_bucket = None
except ClientError as err:
print(err)
print(
f"Either {self.demo_bucket} doesn't exist or you don't "
f"have access to it."
)
self.demo_bucket = None
def demo(
self, question, upload_func, download_func, upload_args=None, download_args=None
):
"""Run a demonstration.
Ask the user if they want to run this specific demonstration.
If they say yes, create a file on the local path, upload it
using the specified upload function, then download it using the
specified download function.
"""
if download_args is None:
download_args = {}
if upload_args is None:
upload_args = {}
question = question.format(self.file_size_mb)
answer = input(f"{question} (y/n)")
if answer.lower() == "y":
local_file_path, object_key, download_file_path = self._create_demo_file()
file_transfer.TransferConfig = self._config_wrapper(
TransferConfig, CONFIG_ATTRS
)
self._report_transfer_params(
"Uploading", local_file_path, object_key, **upload_args
)
start_time = time.perf_counter()
thread_info = upload_func(
local_file_path,
self.demo_bucket,
object_key,
self.file_size_mb,
**upload_args,
)
end_time = time.perf_counter()
self._report_transfer_result(thread_info, end_time - start_time)
file_transfer.TransferConfig = self._config_wrapper(
TransferConfig, CONFIG_ATTRS + DOWNLOAD_CONFIG_ATTRS
)
self._report_transfer_params(
"Downloading", object_key, download_file_path, **download_args
)
start_time = time.perf_counter()
thread_info = download_func(
self.demo_bucket,
object_key,
download_file_path,
self.file_size_mb,
**download_args,
)
end_time = time.perf_counter()
self._report_transfer_result(thread_info, end_time - start_time)
def last_name_set(self):
"""Get the name set used for the last demo."""
return self._chore_list[-1]
def cleanup(self):
"""
Remove files from the demo folder, and uploaded objects from the
Amazon S3 bucket.
"""
print("-" * self._terminal_width)
for local_file_path, s3_object_key, downloaded_file_path in self._chore_list:
print(f"Removing {local_file_path}")
try:
os.remove(local_file_path)
except FileNotFoundError as err:
print(err)
print(f"Removing {downloaded_file_path}")
try:
os.remove(downloaded_file_path)
except FileNotFoundError as err:
print(err)
if self.demo_bucket:
print(f"Removing {self.demo_bucket}:{s3_object_key}")
try:
self._s3.Bucket(self.demo_bucket).Object(s3_object_key).delete()
except ClientError as err:
print(err)
def _setup_platform_specific(self):
"""Set up platform-specific command used to create a large file."""
if platform.system() == "Windows":
self._create_file_cmd = "fsutil file createnew {} {}"
self._size_multiplier = MB
elif platform.system() == "Linux" or platform.system() == "Darwin":
self._create_file_cmd = f"dd if=/dev/urandom of={{}} " f"bs={MB} count={{}}"
self._size_multiplier = 1
else:
raise EnvironmentError(
f"Demo of platform {platform.system()} isn't supported."
)
def _create_demo_file(self):
"""
Create a file in the demo folder specified by the user. Store the local
path, object name, and download path for later cleanup.
Only the local file is created by this method. The Amazon S3 object and
download file are created later during the demonstration.
Returns:
A tuple that contains the local file path, object name, and download
file path.
"""
file_name_template = "TestFile{}-{}.demo"
local_suffix = "local"
object_suffix = "s3object"
download_suffix = "downloaded"
file_tag = len(self._chore_list) + 1
local_file_path = os.path.join(
self.demo_folder, file_name_template.format(file_tag, local_suffix)
)
s3_object_key = file_name_template.format(file_tag, object_suffix)
downloaded_file_path = os.path.join(
self.demo_folder, file_name_template.format(file_tag, download_suffix)
)
filled_cmd = self._create_file_cmd.format(
local_file_path, self.file_size_mb * self._size_multiplier
)
print(
f"Creating file of size {self.file_size_mb} MB "
f"in {self.demo_folder} by running:"
)
print(f"{'':4}{filled_cmd}")
os.system(filled_cmd)
chore = (local_file_path, s3_object_key, downloaded_file_path)
self._chore_list.append(chore)
return chore
def _report_transfer_params(self, verb, source_name, dest_name, **kwargs):
"""Report configuration and extra arguments used for a file transfer."""
print("-" * self._terminal_width)
print(f"{verb} {source_name} ({self.file_size_mb} MB) to {dest_name}")
if kwargs:
print("With extra args:")
for arg, value in kwargs.items():
print(f'{"":4}{arg:<20}: {value}')
@staticmethod
def ask_user(question):
"""
Ask the user a yes or no question.
Returns:
True when the user answers 'y' or 'Y'; otherwise, False.
"""
answer = input(f"{question} (y/n) ")
return answer.lower() == "y"
@staticmethod
def _config_wrapper(func, config_attrs):
def wrapper(*args, **kwargs):
config = func(*args, **kwargs)
print("With configuration:")
for attr in config_attrs:
print(f'{"":4}{attr:<20}: {getattr(config, attr)}')
return config
return wrapper
@staticmethod
def _report_transfer_result(thread_info, elapsed):
"""Report the result of a transfer, including per-thread data."""
print(f"\nUsed {len(thread_info)} threads.")
for ident, byte_count in thread_info.items():
print(f"{'':4}Thread {ident} copied {byte_count} bytes.")
print(f"Your transfer took {elapsed:.2f} seconds.")
def main():
"""
Run the demonstration script for s3_file_transfer.
"""
demo_manager = TransferDemoManager()
demo_manager.collect_user_info()
# Upload and download with default configuration. Because the file is 30 MB
# and the default multipart_threshold is 8 MB, both upload and download are
# multipart transfers.
demo_manager.demo(
"Do you want to upload and download a {} MB file "
"using the default configuration?",
file_transfer.upload_with_default_configuration,
file_transfer.download_with_default_configuration,
)
# Upload and download with multipart_threshold set higher than the size of
# the file. This causes the transfer manager to use standard transfers
# instead of multipart transfers.
demo_manager.demo(
"Do you want to upload and download a {} MB file "
"as a standard (not multipart) transfer?",
file_transfer.upload_with_high_threshold,
file_transfer.download_with_high_threshold,
)
# Upload with specific chunk size and additional metadata.
# Download with a single thread.
demo_manager.demo(
"Do you want to upload a {} MB file with a smaller chunk size and "
"then download the same file using a single thread?",
file_transfer.upload_with_chunksize_and_meta,
file_transfer.download_with_single_thread,
upload_args={
"metadata": {
"upload_type": "chunky",
"favorite_color": "aqua",
"size": "medium",
}
},
)
# Upload using server-side encryption with customer-provided
# encryption keys.
# Generate a 256-bit key from a passphrase.
sse_key = hashlib.sha256("demo_passphrase".encode("utf-8")).digest()
demo_manager.demo(
"Do you want to upload and download a {} MB file using "
"server-side encryption?",
file_transfer.upload_with_sse,
file_transfer.download_with_sse,
upload_args={"sse_key": sse_key},
download_args={"sse_key": sse_key},
)
# Download without specifying an encryption key to show that the
# encryption key must be included to download an encrypted object.
if demo_manager.ask_user(
"Do you want to try to download the encrypted "
"object without sending the required key?"
):
try:
_, object_key, download_file_path = demo_manager.last_name_set()
file_transfer.download_with_default_configuration(
demo_manager.demo_bucket,
object_key,
download_file_path,
demo_manager.file_size_mb,
)
except ClientError as err:
print(
"Got expected error when trying to download an encrypted "
"object without specifying encryption info:"
)
print(f"{'':4}{err}")
# Remove all created and downloaded files, remove all objects from
# S3 storage.
if demo_manager.ask_user(
"Demonstration complete. Do you want to remove local files " "and S3 objects?"
):
demo_manager.cleanup()
if __name__ == "__main__":
try:
main()
except NoCredentialsError as error:
print(error)
print(
"To run this example, you must have valid credentials in "
"a shared credential file or set in environment variables."
)
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4