In Pathway, accessing the data is done using connectors. This page provides their API documentation. See connector articles for an overview of their architecture.
class CsvParserSettings(delimiter=',', quote='"', escape=None, enable_double_quote_escapes=True, enable_quoting=True, comment_character=None)[source]Class representing settings for the CSV parser.
The callback to be called on every change in the table. It is required to be callable and to accept four parameters: the key, the row changed, the time of the change in milliseconds and the flag stating if the change had been an addition of the row.
class OnFinishCallback(*args, **kwargs)[source]The callback function to be called when the stream of changes ends. It will be called on each engine worker separately.
register_input_synchronization_group(*columns, max_difference, name='default')source Creates a synchronization group for a specified set of columns. The set must consist of at least two columns, each belonging to a different table. These tables must be read using one of the input connectors (they have to be input tables). Transformed tables cannot be used.
The synchronization group ensures that the engine reads data into the specified tables in such a way that the difference between the maximum read values from each column does not exceed max_difference
.
All columns must have the same data type to allow for proper comparison, and max_difference
must be the result of subtracting values from two columns.
The logic of synchronization group is the following:
Limitations:
int
, DateTimeNaive
, DateTimeUtc
and Duration
field types are supported.Please note that all columns within the synchronization group must have the same type.
ColumnReference
) â A list of columns that will be monitored and synchronized. Each column must belong to a different table read from an input connector.Union
[None
, int
, float
, str
, bytes
, bool
, Pointer
, datetime
, timedelta
, ndarray
, Json
, dict
[str
, Any
], tuple
[Any
, ...
], Error
, Pending
]) â The maximum allowed difference between the highest values in the tracked columns at any given time. Must be derived from subtracting values of two columns specified before.str
) â The name of the synchronization group, used for logging and debugging purposes.Example:
Suppose you have two data sources:
login_events
, a table read from the Kafka topic "logins"
.transactions
, a table read from the Kafka topic "transactions"
.Each table contains a timestamp
field that represents the number of seconds since the UNIX Epoch. You want to ensure that these tables are read simultaneously, with no more than a 10-minute (600-second) difference between their maximum timestamp
values.
First, you need define the table schema:
import pathway as pw
class InputSchema(pw.Schema):
event_id: str
unix_timestamp: int
data: pw.Json
# Other relevant fields can be added here
Next, you read both tables from Kafka. Assuming the Kafka server runs on host "kafka"
and port 8082
:
login_events = pw.io.kafka.simple_read("kafka:8082", "logins", format="json", schema=InputSchema)
transactions = pw.io.kafka.simple_read("kafka:8082", "transactions", format="json", schema=InputSchema)
Finally, you can synchronize these two tables by creating a synchronization group:
pw.io.register_input_synchronization_group(
login_events.unix_timestamp,
transactions.unix_timestamp,
max_difference=600,
)
This ensures that both topics are read in such a way that the difference between the maximum timestamp
values at any moment does not exceed 600 seconds (10 minutes).
Note:
If all data sources have a gap larger than max_difference
, the synchronization group will wait until data from all sources arrives. Once all sources move past the gap, the synchronization group will allow reading to proceed further.
Example scenario: Consider a synchronization group with two data sources, both tracking a timestamp
column, and max_difference
set to 600 seconds (10 minutes).
T
.T + 1h
. This record is not yet forwarded for processing because it exceeds max_difference
.T + 1h
, the system detects a 1-hour gap. Since both sources have moved beyond T
, the synchronization group accepts T + 1h
as the new baseline and continues processing from there.T + 5m
, this record is processed normally. The system will continue waiting for the first source to catch up before advancing further.This behavior ensures that data gaps do not cause deadlocks but are properly detected and handled.
subscribe(table, on_change, on_end=lambda : ..., on_time_end=lambda time: ..., *, name=None, sort_by=None)source Calls a callback function on_change on every change happening in table.
OnChangeCallback
) â the callback to be called on every change in the table. The function is required to accept four parameters: the key, the row changed, the time of the change in microseconds and the flag stating if the change had been an addition of the row. These parameters of the callback are expected to have names key, row, time and is_addition respectively.OnFinishCallback
) â the callback to be called when the stream of changes ends.OnTimeEndCallback
) â the callback function to be called on each closed time of computation.str
| None
) â A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.Optional
[Iterable
[ColumnReference
]]) â If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.Example:
import pathway as pw
table = pw.debug.table_from_markdown('''
| pet | owner | age | __time__ | __diff__
1 | dog | Alice | 10 | 0 | 1
2 | cat | Alice | 8 | 2 | 1
3 | dog | Bob | 7 | 4 | 1
2 | cat | Alice | 8 | 6 | -1
''')
def on_change(key: pw.Pointer, row: dict, time: int, is_addition: bool):
print(f"{row}, {time}, {is_addition}")
def on_end():
print("End of stream.")
pw.io.subscribe(table, on_change, on_end)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
{'pet': 'dog', 'owner': 'Alice', 'age': 10}, 0, True
{'pet': 'cat', 'owner': 'Alice', 'age': 8}, 2, True
{'pet': 'dog', 'owner': 'Bob', 'age': 7}, 4, True
{'pet': 'cat', 'owner': 'Alice', 'age': 8}, 6, False
End of stream.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4