A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/aws/aws-sdk-pandas/discussions/1815 below:

AWS SDK for pandas on Ray

AWS SDK for pandas on Ray - Runbook

The following runbook is a collection of tips, common errors and performance issues related to using AWS SDK for pandas on Ray.

Table of contents Tips Overriding the Ray runtime initialization

In distributed mode with Ray, AWS SDK for pandas attempts to detect an existing Ray cluster or to start a runtime by executing ray.init. Advanced users can override this process however by running the same command before the awswrangler import command.

import ray

# The responsibility for starting the Ray runtime is delegated to the user
ray.init("auto")

# As a result, the library detects that a Ray runtime already exists and connects to it
import awswrangler as wr

This approach gives more flexibility to the user when it comes to configuring the Ray runtime. For instance, they can decide whether to include a dashboard or to turn logging on/off:

import ray

# User can override the default init arguments used in awswrangler
ray.init("auto", include_dashboard=True, configure_logging=True, log_to_driver=False)

Please note that other libraries might also start the Ray runtime on import. Chief among them is Modin:

import modin.pandas as pd # Ray runtime is started here
import awswrangler as wr

In that case, AWS SDK for pandas does not attempt to start the runtime and just connects to the existing one instead.

Tune partitioning in Modin

Modin data frames rely on the concept of partitioning. Advanced users can achieve better performance by understanding how to tune them. More details are available in the Modin documentation.

Common Errors API Layer Objects
ValueError: Only API Layer objects may be passed in here, got <class 'pandas.core.frame.DataFrame'> instead.

.venv/lib/python3.9/site-packages/modin/distributed/dataframe/pandas/partitions.py:49: ValueError

Solution
When AWS SDK for pandas is running in distributed mode with Ray, APIs require Modin Data Frames instead of pandas Data Frames as inputs.

Rather than working with pandas Data Frames, you can change your import statement so that your script works with Modin Frames by default:

import modin.pandas as pd

df = pd.DataFrame(...) # Modin Data Frame

Alternatively, you can transform your existing Data Frame into a Modin Data Frame:

import modin.pandas as modin_pd
import pandas as pd


df = pd.DataFrame(...)
modin_df = modin_pd.DataFrame(df)
Unsupported arguments

Most AWS SDK for pandas calls support passing the boto3_session argument. While this is acceptable for an application running in a single process, distributed applications require the session to be serialized and passed to the worker nodes in the cluster. This constitutes a security risk. As a result, passing boto3_session when using the Ray runtime is not supported.

Performance Avoid writing data to a single S3 object

When using AWS SDK for pandas without the distributed mode, it’s common to tell a function such as to_parquet to write to a path pointing to a single file. However, when using the distributed/Ray mode, the data in a Modin Data Frame may be stored across multiple nodes in your cluster. Telling to_parquet to write to a single object in S3 means that the data from the DataFrame may need to be transferred onto a single node, so that it can be written into a single file. Instead, it’s more efficient to provide to_parquet with an S3 path and provide the parameter dataset=True.

# Data is moved onto a single node, and written into a single file.
# This can also cause memory issues, as an individual node may not be able to store
# the entire Data Frame
wr.s3.to_parquet(df, "s3://my-bucket/my-data-file.parquet")

# Each node writes the data which it's currently storing.
# This is faster as the data does not need to be moved onto a single node.
wr.s3.to_parquet(df, "s3://my-bucket/my-data-path/", dataset=True)

When trying to write a distributed Modin Data Frame into a single file, the following warning is surfaced:

Repartitioning frame to single partition as a strict path was defined: s3://my-bucket/my-path.parquet. This operation is inefficient for large datasets.
Writing bucketed data or using max_rows_by_file causes repartitioning

Setting bucketing_info or max_rows_by_file as input arguments to write functions such as to_parquet or to_csv might lead to data movement within the cluster. This is because a repartitioning of the Data Frame is required to honor the desired bucketing or max rows by file configuration.

# Repartitioning is needed to output files with no more than 100,000 rows
# This leads to a performance hit caused by a readjustment in the number of blocks in the cluster
wr.s3.to_parquet(df, "s3://my-bucket/my-data-path/", max_rows_by_file=100000)

# Likewise, creating buckets based on a list of columns slows things down due to blocks being shuffled 
wr.s3.to_csv(df, "s3://my-bucket/my-data-path/", dataset=True, bucketing_info=(["col2"], 2))
Using pandas flags leads to slower performance when reading or writing CSV
PyArrow method unavailable, defaulting to Pandas I/O functions. This will result in slower performance of the write operations.

The read_csv and to_csv function definitions both have an argument named pandas_kwargs which allows the customer to pass any pandas argument for reading/writing Data frames, without AWS SDK for pandas needing to explicitly support it. When it comes to distributing code using Ray, PyArrow’s I/O functions offer significantly better performance than the equivalent functions in pandas.

In order to preserve the ability to use any parameter used by pandas, while also wanting to optimize performance, the implementations for the s3.read_csv and s3.to_csv attempt to use the underlying PyArrow I/O functions wherever possible. When this is not possible, the functions default to the slower Pandas I/O functions, and output the warning above.

Writing bucketed leads to slower performance

Writing bucketed data (i.e. splitting the data into the user-specified number of buckets based on a hash) requires a groupby operation. As a result, data blocks must be shuffled over the entire cluster which is a highly inefficient operation, especially when the data set size cannot fit into memory.

The size of input files is too large for the target block size
WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 
9.0x larger than the target block size of 512 MiB. This may lead to out-of-memory
errors during processing. Consider reducing the size of input files

Input files to read methods may throw the above warning when their size is higher than the target block size (512 MiB by default). It’s important to note that a block size refers to the size in memory, not on disk. Indeed an S3 object is likely to be compressed and increase significantly in size when represented in memory as a block.

The above can be mitigated by reducing the size of input files or by modifying the DEFAULT_TARGET_MAX_BLOCK_SIZE Ray environment variable.

WARNING: Partitions of this data frame are detected to be split along column axis...

Modin data frames can be split across both column and row axes based on the provided configuration and number of columns in the data set. Some operations in AWS SDK for pandas require the data frame to be split across the row axis only to ensure blocks contain the complete set of columns. For those cases, when processing a Modin data frame, AWS SDK for pandas will automatically repartition the data frame along the row axis and display the warning below:

Partitions of this data frame are detected to be split along column axis. 
The dataframe will be automatically repartitioned along row axis to ensure 
each partition can be processed independently.

This might be an expensive operation. To avoid repartitioning, it is recommended to configure modin parameters to fit your data. See an example below:

import os

# Changing value of `NPartitions` and `MinPartitionSize` using env variables
os.environ["MODIN_NPARTITIONS"] = "12"
os.environ["MODIN_MIN_PARTITION_SIZE"] = "32"

import modin.config

print(modin.config.NPartitions.get()) # prints '12'
print(modin.config.MinPartitionSize.get()) # prints '32'

# Changing value of `NPartitions` and `MinPartitionSize` using modin config API
modin.config.NPartitions.put(16)
modin.config.MinPartitionSize.put(64)
print(modin.config.NPartitions.get()) # prints '16'
print(modin.config.MinPartitionSize.get()) # prints '64'

For a complete list of Modin configuration options refer to the Modin documentation.

DataFrame columns with undefined type impact performance as data type must be inferred

When writing data frames using s3.to_parquet and there are columns with undefined types in the data frame, the library attempts to infer the dtype of columns which can have a negative performance impact. This latency will increase as data size increases.

To mitigate this, s3.to_parquetallows passing the dtype argument which allows type casting of columns before writing to S3. See an example below:

# Dictionary containing column names and Athena types to cast to
# See https://docs.aws.amazon.com/athena/latest/ug/data-types.html 
column_dtypes = {
  "A": "int",
  "B": "bigint",
  "C": "float",
  "D": "char"
}

wr.s3.to_parquet(df, path=path, dtype=column_dtypes)
A RayOutOfMemoryError exception is raised

The error is raised when 95% of memory on the machine/cluster is reached. The ray memory cli command can help debug and identify the source of the exception.

Ray attempts to mitigate memory errors by spilling objects to disk. A warning is raised to inform the user that this process will impact performance. If you know that spilling to disk is likely, it’s recommended to use SSD storage instead of

Bucketing and partitioning

Bucketing requires a hash value to be calculated for the column that you wish to bucket by, which is then used to shuffle the data across M buckets. This means that bucketing requires a full data reshuffle, thus foregoing the parallelization benefit from distributed computing.

On the other hand, partitioning is implemented in such a way that each node is in charge of it’s own partitioning. If node A contains a block which is to be divided into two partitions, then node A will write two files to S3. Other nodes on the cluster will do the same, without needed to reshuffle any of the data.

Please node that AWS SDK for pandas supports both bucketing and partitioning in a single write API call. This allows you so bucket by one column and partition on another. In this scenario, the data will need to be reshuffled due to the bucketing.

S3FS

In distributed mode, S3Fs is used instead of boto3 for certain API calls.
These include listing a large number of S3 objects for example. This choice was made for performance reasons as a boto3 implementation can be much slower in some cases.

As a side effect, users won't be able to use the s3_additional_kwargs input parameter as it's currently not supported by S3Fs.


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4