A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://pathway.com/developers/api-docs/pathway-io/csv below:

pw.io.csv | Pathway

pw.io.csv

All internal Pathway types can be serialized into CSV. The table below explains how the conversion is done. The values of the corresponding types can also be deserialized from CSV back into Pathway values.

Pathway types serialization into CSV Pathway type Serialization way bool Either True or False int Serialized as is float Serialized as is, with a dot (.) being a decimal separator pointer A string that can be deserialized back if the pw.Pointer type is specified in the Pathway table schema str Serialized as is bytes Base64-encoded data Naive DateTime A datetime in ISO-8601 format UTC DateTime A datetime in ISO-8601 format Duration An integer representing the number of nanoseconds JSON A string containing the serialized JSON value np.ndarray A string containing the serialized JSON object with two top-level fields: an integer array shape containing the shape of the array, and an array elements containing the flattened elements of the array tuple A string containing the serialized JSON array, where the order of elements matches their order in the tuple list A string containing the serialized JSON array, where the order of elements matches their order in the list pw.PyObjectWrapper A string that can be deserialized back if the pw.PyObjectWrapper type is specified in the Pathway table schema read(path, *, schema=None, csv_settings=None, mode='streaming', object_pattern='*', with_metadata=False, autocommit_duration_ms=1500, name=None, max_backlog_size=None, debug_data=None, **kwargs)

source Reads a table from one or several files with delimiter-separated values.

In case the folder is passed to the engine, the order in which files from the directory are processed is determined according to the modification time of files within this folder: they will be processed by ascending order of the modification time.

Example:

Consider you want to read a dataset, stored in the filesystem in a standard CSV format. The dataset contains data about pets and their owners.

For the sake of demonstration, you can prepare a small dataset by creating a CSV file via a unix command line tool:

printf "id,owner,pet\n1,Alice,dog\n2,Bob,dog\n3,Alice,cat\n4,Bob,dog" > dataset.csv

In order to read it into Pathway’s table, you can first do the import and then use the pw.io.csv.read method:

import pathway as pw
class InputSchema(pw.Schema):
  owner: str
  pet: str
t = pw.io.csv.read("dataset.csv", schema=InputSchema, mode="static")

Then, you can output the table in order to check the correctness of the read:

pw.debug.compute_and_print(t, include_id=False)
owner pet
Alice dog
  Bob dog
Alice cat
  Bob dog

Now let’s try something different. Consider you have site access logs stored in a separate folder in several files. For the sake of simplicity, a log entry contains an access ID, an IP address and the login of the user.

A dataset, corresponding to the format described above can be generated, thanks to the following set of unix commands:

mkdir logs
printf "id,ip,login\n1,127.0.0.1,alice\n2,8.8.8.8,alice" > logs/part_1.csv
printf "id,ip,login\n3,8.8.8.8,bob\n4,127.0.0.1,alice" > logs/part_2.csv

Now, let’s see how you can use the connector in order to read the content of this directory into a table:

class InputSchema(pw.Schema):
  ip: str
  login: str
t = pw.io.csv.read("logs/", schema=InputSchema, mode="static")

The only difference is that you specified the name of the directory instead of the file name, as opposed to what you had done in the previous example. It’s that simple!

But what if you are working with a real-time system, which generates logs all the time. The logs are being written and after a while they get into the log directory (this is also called “logs rotation”). Now, consider that there is a need to fetch the new files from this logs directory all the time. Would Pathway handle that? Sure!

The only difference would be in the usage of mode flag. So the code snippet will look as follows:

t = pw.io.csv.read("logs/", schema=InputSchema, mode="streaming")

With this method, you obtain a table updated dynamically. The changes in the logs would incur changes in the Business-Intelligence ‘BI’-ready data, namely, in the tables you would like to output. article.

write(table, filename, *, name=None, sort_by=None)

source Writes table’s stream of updates to a file in delimiter-separated values format.

Example:

In this simple example you can see how table output works. First, import Pathway and create a table:

import pathway as pw
t = pw.debug.table_from_markdown("age owner pet \n 1 10 Alice dog \n 2 9 Bob cat \n 3 8 Alice cat")

Consider you would want to output the stream of changes of this table. In order to do that you simply do:

pw.io.csv.write(t, "table.csv")

Now, let’s see what you have on the output:

age,owner,pet,time,diff
10,"Alice","dog",0,1
9,"Bob","cat",0,1
8,"Alice","cat",0,1

The first three columns clearly represent the data columns you have. The column time represents the number of operations minibatch, in which each of the rows was read. In this example, since the data is static: you have 0. The diff is another element of this stream of updates. In this context, it is 1 because all three rows were read from the input. All in all, the extra information in time and diff columns - in this case - shows us that in the initial minibatch (time = 0), you have read three rows and all of them were added to the collection (diff = 1).


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4