A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://pathway.com/developers/api-docs/pathway-io below:

pw.io | Pathway

pw.io

In Pathway, accessing the data is done using connectors. This page provides their API documentation. See connector articles for an overview of their architecture.

class CsvParserSettings(delimiter=',', quote='"', escape=None, enable_double_quote_escapes=True, enable_quoting=True, comment_character=None)[source]

Class representing settings for the CSV parser.

class OnChangeCallback(*args, **kwargs)[source]

The callback to be called on every change in the table. It is required to be callable and to accept four parameters: the key, the row changed, the time of the change in milliseconds and the flag stating if the change had been an addition of the row.

class OnFinishCallback(*args, **kwargs)[source]

The callback function to be called when the stream of changes ends. It will be called on each engine worker separately.

register_input_synchronization_group(*columns, max_difference, name='default')

source Creates a synchronization group for a specified set of columns. The set must consist of at least two columns, each belonging to a different table. These tables must be read using one of the input connectors (they have to be input tables). Transformed tables cannot be used.

The synchronization group ensures that the engine reads data into the specified tables in such a way that the difference between the maximum read values from each column does not exceed max_difference.

All columns must have the same data type to allow for proper comparison, and max_difference must be the result of subtracting values from two columns.

The logic of synchronization group is the following:

Limitations:

Please note that all columns within the synchronization group must have the same type.

Example:

Suppose you have two data sources:

Each table contains a timestamp field that represents the number of seconds since the UNIX Epoch. You want to ensure that these tables are read simultaneously, with no more than a 10-minute (600-second) difference between their maximum timestamp values.

First, you need define the table schema:

import pathway as pw
class InputSchema(pw.Schema):
    event_id: str
    unix_timestamp: int
    data: pw.Json
    # Other relevant fields can be added here

Next, you read both tables from Kafka. Assuming the Kafka server runs on host "kafka" and port 8082:

login_events = pw.io.kafka.simple_read("kafka:8082", "logins", format="json", schema=InputSchema)
transactions = pw.io.kafka.simple_read("kafka:8082", "transactions", format="json", schema=InputSchema)

Finally, you can synchronize these two tables by creating a synchronization group:

pw.io.register_input_synchronization_group(
    login_events.unix_timestamp,
    transactions.unix_timestamp,
    max_difference=600,
)

This ensures that both topics are read in such a way that the difference between the maximum timestamp values at any moment does not exceed 600 seconds (10 minutes).

Note:

If all data sources have a gap larger than max_difference, the synchronization group will wait until data from all sources arrives. Once all sources move past the gap, the synchronization group will allow reading to proceed further.

Example scenario: Consider a synchronization group with two data sources, both tracking a timestamp column, and max_difference set to 600 seconds (10 minutes).

This behavior ensures that data gaps do not cause deadlocks but are properly detected and handled.

subscribe(table, on_change, on_end=lambda : ..., on_time_end=lambda time: ..., *, name=None, sort_by=None)

source Calls a callback function on_change on every change happening in table.

Example:

import pathway as pw

table = pw.debug.table_from_markdown('''
     | pet  | owner   | age | __time__ | __diff__
   1 | dog  | Alice   | 10  | 0        | 1
   2 | cat  | Alice   | 8   | 2        | 1
   3 | dog  | Bob     | 7   | 4        | 1
   2 | cat  | Alice   | 8   | 6        | -1
''')

def on_change(key: pw.Pointer, row: dict, time: int, is_addition: bool):
    print(f"{row}, {time}, {is_addition}")

def on_end():
    print("End of stream.")

pw.io.subscribe(table, on_change, on_end)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
{'pet': 'dog', 'owner': 'Alice', 'age': 10}, 0, True
{'pet': 'cat', 'owner': 'Alice', 'age': 8}, 2, True
{'pet': 'dog', 'owner': 'Bob', 'age': 7}, 4, True
{'pet': 'cat', 'owner': 'Alice', 'age': 8}, 6, False
End of stream.

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4