The Pathway programming framework is organized around work with data tables. This page contains reference for the Pathway Table class.
Collection of named columns over identical universes.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
isinstance(t1, pw.Table)
property C: ColumnNamespace
Returns the namespace of all the columns of a joinable. Allows accessing column names that might otherwise be a reserved methods.
import pathway as pw
tab = pw.debug.table_from_markdown('''
age | owner | pet | filter
10 | Alice | dog | True
9 | Bob | dog | True
8 | Alice | cat | False
7 | Bob | dog | True
''')
isinstance(tab.C.age, pw.ColumnReference)
pw.debug.compute_and_print(tab.filter(tab.C.filter), include_id=False)
age | owner | pet | filter
7 | Bob | dog | True
9 | Bob | dog | True
10 | Alice | dog | True
assert_append_only()
source Sets the append_only property of all columns from a table to True
.
Sometimes Pathway canât automatically deduce that a table is append only. If you know that the table is append-only (contains only insertions), you can tell Pathway about it by using this method. At runtime Pathway will check if the table is really append-only and exit with an error otherwise.
True
.Example:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
a | b | __time__ | __diff__
1 | 2 | 2 | 1
3 | 4 | 2 | 1
5 | 6 | 4 | 1
3 | 4 | 4 | -1
3 | 5 | 4 | 1
''',
id_from=["a"],
) # t is not append only due to the update (row with a=3)
t.is_append_only
t_filtered = t.filter(pw.this.a != 3)
t_append_only = t_filtered.assert_append_only()
t_append_only.is_append_only
await_futures()
source Waits for the results of asynchronous computation.
It strips the Future
wrapper from table columns where applicable. In practice, it filters out the Pending
values and produces a column with a data type that was the argument of Future.
Columns of Future data type are produced by fully asynchronous UDFs. Columns of this type can be propagated further, but canât be used in most expressions (e.g. arithmetic operations). You can wait for their results using this method and later use the results in expressions you want.
Example:
import pathway as pw
import asyncio
t = pw.debug.table_from_markdown(
'''
a | b
1 | 2
3 | 4
5 | 6
'''
)
@pw.udf(executor=pw.udfs.fully_async_executor())
async def long_running_async_function(a: int, b: int) -> int:
c = a * b
await asyncio.sleep(0.1 * c)
return c
result = t.with_columns(res=long_running_async_function(pw.this.a, pw.this.b))
print(result.schema)
id | a | b | res
ANY_POINTER | INT | INT | Future(INT)
awaited_result = result.await_futures()
print(awaited_result.schema)
id | a | b | res
ANY_POINTER | INT | INT | INT
pw.debug.compute_and_print(awaited_result, include_id=False)
a | b | res
1 | 2 | 2
3 | 4 | 12
5 | 6 | 30
buffer(time_column, threshold)
source Buffers the values until the condition time_column <= max(time_column) - threshold
is met.
This is a stateful operator. It stores the entries if their time_column > max(time_column) - threshold
. Otherwise the entries can pass immediately. Once the current time (defined as max over all time_column
values so far) advances and some of the stored entries start to satisfy the condition, they are sent for further processing.
ColumnExpression
) â ColumnExpression
that specifies the event time.Union
[int
, float
, timedelta
]) â value used to determine which entries are old enough to be sent for further processing. Should match the type of the time_column
(int -> int
, float -> float
, datetime -> timedelta
).Example:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
t | v | __time__
1 | 1 | 2
2 | 2 | 4
5 | 3 | 6
2 | 4 | 8
7 | 5 | 10
'''
)
res = t.buffer(pw.this.t, 3)
pw.debug.compute_and_print_update_stream(res)
| t | v | __time__ | __diff__
^X1MXHYY... | 1 | 1 | 6 | 1
^YYY4HAB... | 2 | 2 | 6 | 1
^3CZ78B4... | 2 | 4 | 8 | 1
^Z3QWT29... | 5 | 3 | 18446744073709551614 | 1
^3HN31E1... | 7 | 5 | 18446744073709551614 | 1
The values of processing time for rows with event time 5, 7 are equal to 18446744073709551614 because thereâs no more input and they are released only at the end of the processing. 18446744073709551614 is the maximum possible time.
cast_to_types(**kwargs)source Casts columns to types.
concat(*others)source Concats self with every other â others.
Semantics:
if self.id and other.id collide, throws an exception.
Requires:
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''')
pw.universes.promise_are_pairwise_disjoint(t1, t2)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 1
11 | Alice | 30
12 | Tom | 40
concat_reindex(*tables)
source Concatenate contents of several tables.
This is similar to PySpark union. All tables must have the same schema. Each row is reindexed.
Table
) â List of tables to concatenate. All tables must have the same schema.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.table_from_markdown('''
| pet
1 | Manul
8 | Octopus
''')
t3 = t1.concat_reindex(t2)
pw.debug.compute_and_print(t3, include_id=False)
pet
Cat
Dog
Manul
Octopus
copy()
source Returns a copy of a table.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.copy()
pw.debug.compute_and_print(t2, include_id=False)
age | owner | pet
7 | Bob | dog
8 | Alice | cat
9 | Bob | dog
10 | Alice | dog
deduplicate(*, value, instance=None, acceptor, name=None)
source Deduplicates rows in self on value column using acceptor function.
It keeps rows which where accepted by the acceptor function. Acceptor operates on two arguments - CURRENT value and PREVIOUS value.
Union
[ColumnExpression
, None
, int
, float
, str
, bytes
, bool
, Pointer
, datetime
, timedelta
, ndarray
, Json
, dict
[str
, Any
], tuple
[Any
, ...
], Error
, Pending
]) â column expression used for deduplication.ColumnExpression
| None
) â Grouping column. For rows with different values in this column, deduplication will be performed separately. Defaults to None.Callable
[[TypeVar
(T
, bound= Union
[None
, int
, float
, str
, bytes
, bool
, Pointer
, datetime
, timedelta
, ndarray
, Json
, dict
[str
, Any
], tuple
[Any
, ...
], Error
, Pending
]), TypeVar
(T
, bound= Union
[None
, int
, float
, str
, bytes
, bool
, Pointer
, datetime
, timedelta
, ndarray
, Json
, dict
[str
, Any
], tuple
[Any
, ...
], Error
, Pending
])], bool
]) â callback telling whether two values are different.str
| None
) â An identifier, under which the state of the table will be persisted or None
, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their name
. This way itâs possible to configure the start of computations from the moment they were terminated last time.Example:
import pathway as pw
table = pw.debug.table_from_markdown(
'''
val | __time__
1 | 2
2 | 4
3 | 6
4 | 8
'''
)
def acceptor(new_value, old_value) -> bool:
return new_value >= old_value + 2
result = table.deduplicate(value=pw.this.val, acceptor=acceptor)
pw.debug.compute_and_print_update_stream(result, include_id=False)
val | __time__ | __diff__
1 | 2 | 1
1 | 6 | -1
3 | 6 | 1
table = pw.debug.table_from_markdown(
'''
val | instance | __time__
1 | 1 | 2
2 | 1 | 4
3 | 2 | 6
4 | 1 | 8
4 | 2 | 8
5 | 1 | 10
'''
)
def acceptor(new_value, old_value) -> bool:
return new_value >= old_value + 2
result = table.deduplicate(
value=pw.this.val, instance=pw.this.instance, acceptor=acceptor
)
pw.debug.compute_and_print_update_stream(result, include_id=False)
val | instance | __time__ | __diff__
1 | 1 | 2 | 1
3 | 2 | 6 | 1
1 | 1 | 8 | -1
4 | 1 | 8 | 1
diff(timestamp, *values, instance=None)
source Compute the difference between the values in the values
columns and the previous values according to the order defined by the column timestamp
.
pw.ColumnReference[int | float | datetime | str | bytes]
) â The column reference to the timestamp
column on which the order is computed.pw.ColumnReference[int | float | datetime]
) â Variable-length argument representing the column references to the values
columns.pw.ColumnReference
) â Can be used to group the values. The difference is only computed between rows with the same instance
value.Table
â A new table where each column is replaced with a new column containing the difference and whose name is the concatenation of diff_ and the former name.NOTE: * The value of the âfirstâ value (the row with the lowest value in the timestamp
column) is None
.
Example:
import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values
1 | 1
2 | 2
3 | 4
4 | 7
5 | 11
6 | 16
''')
table += table.diff(pw.this.timestamp, pw.this.values)
pw.debug.compute_and_print(table, include_id=False)
timestamp | values | diff_values
1 | 1 |
2 | 2 | 1
3 | 4 | 2
4 | 7 | 3
5 | 11 | 4
6 | 16 | 5
table = pw.debug.table_from_markdown(
'''
timestamp | instance | values
1 | 0 | 1
2 | 1 | 2
3 | 1 | 4
3 | 0 | 7
6 | 1 | 11
6 | 0 | 16
'''
)
table += table.diff(pw.this.timestamp, pw.this.values, instance=pw.this.instance)
pw.debug.compute_and_print(table, include_id=False)
timestamp | instance | values | diff_values
1 | 0 | 1 |
2 | 1 | 2 |
3 | 0 | 7 | 6
3 | 1 | 4 | 2
6 | 0 | 16 | 9
6 | 1 | 11 | 7
difference(other)
source Restrict self universe to keys not appearing in the other table.
Table
) â table with ids to remove from self.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
| cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.difference(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
10 | Alice | 1
empty()
source Creates an empty table with a schema specified by kwargs.
DType
) â Dict whose keys are column names and values are column types.Example:
import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
pw.debug.compute_and_print(t1, include_id=False)
filter(filter_expression)
source Filter a table according to filter_expression condition.
ColumnExpression
) â ColumnExpression that specifies the filtering condition.Example:
import pathway as pw
vertices = pw.debug.table_from_markdown('''
label outdegree
1 3
7 0
''')
filtered = vertices.filter(vertices.outdegree == 0)
pw.debug.compute_and_print(filtered, include_id=False)
flatten(to_flatten, *, origin_id=None)
source Performs a flatmap operation on a column or expression given as a first argument. Datatype of this column or expression has to be iterable or Json array. Other columns of the table are duplicated as many times as the length of the iterable.
It is possible to get ids of source rows by passing origin_id argument, which is a new name of the column with the source ids.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| pet | age
1 | Dog | 2
7 | Cat | 5
''')
t2 = t1.flatten(t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
pet | age
C | 5
D | 2
a | 5
g | 2
o | 2
t | 5
forget(time_column, threshold)
source Remove old entries when they start to satisfy time_column <= max(time_column) - threshold
.
This operator is useful for removing old entries from the stateful operators downstream (like joins, groupbys etc.). It stores the entries and when the current time (defined as max over all time_column
values so far) reaches their time plus threshold
, a deletion of entries is emitted.
ColumnExpression
) â ColumnExpression
that specifies the event time.Union
[int
, float
, timedelta
]) â value used to determine which entries are old enough to be removed. Should match the type of the time_column
(int -> int
, float -> float
, datetime -> timedelta
).Example:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
t | v | __time__
1 | 1 | 2
2 | 1 | 2
4 | 2 | 4
3 | 3 | 6
'''
)
t_with_forgetting = t.forget(pw.this.t, 3)
s = pw.debug.table_from_markdown(
'''
v | a | __time__
1 | 1 | 2
2 | 2 | 4
1 | 3 | 8
'''
)
res = t_with_forgetting.join(s, pw.left.v == pw.right.v).select(
pw.left.t, pw.left.v, pw.right.a
)
pw.debug.compute_and_print_update_stream(res)
| t | v | a | __time__ | __diff__
^YYYD8ZW... | 1 | 1 | 1 | 2 | 1
^YYY47FZ... | 2 | 1 | 1 | 2 | 1
^Z3QTSKY... | 4 | 2 | 2 | 4 | 1
^YYYD8ZW... | 1 | 1 | 1 | 6 | -1
^YYY822X... | 2 | 1 | 3 | 8 | 1
The entry t=1,v=1
is forgotten at the processing time 6. It gets removed from the join. When at the processing time 8, thereâs a new entry with the join key equal to 1, it only gets joined with t=2,v=1
entry because the other entry was already removed.
The removal of t=1,v=1
entry resulted in the retraction of all its results from a join (only t=1,v=1,a=1
in this case). If you would like to filter out retractions, you can do to_stream().filter(pw.this.is_upsert)
on the result of a join.
source Build a table from columns.
All columns must have the same ids. Columnsâ names must be pairwise distinct.
ColumnReference
) â List of columns.ColumnReference
) â Columns with their new names.Example:
import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
t2 = pw.Table.empty(foo=float, bar=float).with_universe_of(t1)
t3 = pw.Table.from_columns(t1.pet, qux=t2.foo)
pw.debug.compute_and_print(t3, include_id=False)
from_streams(deletion_stream)
source Converts streams of changes (updates and deletions) into a table.
This method reconstructs the current state of the table from such streams by applying the updates and deletions in order. It is a stateful operation: the operator keeps track of the latest value for each id. If there are multiple events for a single id in a single batch in the input streams, the order of applying the actions is not specified.
Table
) â A stream with deletions. Only ids in this stream are important. The columns donât have to be compatible with the updates stream.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
id | pet | age | __time__
1 | cat | 3 | 2
2 | dog | 11 | 2
1 | cat | 4 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
id | pet | __time__
2 | dog | 4
'''
)
t3 = pw.Table.from_streams(t1, t2)
pw.debug.compute_and_print_update_stream(t3, include_id=False)
pet | age | __time__ | __diff__
cat | 3 | 2 | 1
dog | 11 | 2 | 1
cat | 3 | 4 | -1
dog | 11 | 4 | -1
cat | 4 | 4 | 1
groupby(*args, id=None, sort_by=None, instance=None, )
source Groups table by columns from args.
NOTE: Usually followed by .reduce() that aggregates the result and returns a table.
ColumnReference
) â columns to group by.ColumnReference
| None
) â if provided, is the column used to set idâs of the rows of the resultColumnReference
| None
) â if provided, column values are used as sorting keys for particular reducersColumnReference
| None
) â optional argument describing partitioning of the data into separate instancesExample:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | ageagg
Alice | cat | 8
Alice | dog | 10
Bob | dog | 16
property id: ColumnReference
Get reference to pseudocolumn containing idâs of a table.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.select(ids = t1.id)
t2.typehints()['ids']
<class 'pathway.engine.Pointer'>
pw.debug.compute_and_print(t2.select(test=t2.id == t2.ids), include_id=False)
ignore_late(time_column, threshold)
source Filter out entries that satisfy time_column <= max(time_column) - threshold
.
In contrast to forget
, this operator doesnât store the entries. It just checks if the entries match the condition and, if they do, allows them to pass. The only value stored by this operator is the current time (defined as max over all time_column
values so far).
Please note that if the table is non-append-only and thereâs a difference in processing time between an insertion and a deletion for some key, the insertion may pass through but the deletion may be filtered out. Itâll happen if the max value in time_column
advanced between the insertion and deletion and the insertion didnât satisfy the filtering-out criterion but the deletion did.
ColumnExpression
) â ColumnExpression
that specifies the event time.Union
[int
, float
, timedelta
]) â value used to determine which entries should be filtered out. Should match the type of the time_column
(int -> int
, float -> float
, datetime -> timedelta
).Example:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
t | v | __time__
1 | 1 | 2
2 | 2 | 4
5 | 3 | 6
2 | 4 | 8
7 | 5 | 10
'''
)
res = t.ignore_late(pw.this.t, 3)
pw.debug.compute_and_print_update_stream(res)
| t | v | __time__ | __diff__
^X1MXHYY... | 1 | 1 | 2 | 1
^YYY4HAB... | 2 | 2 | 4 | 1
^Z3QWT29... | 5 | 3 | 6 | 1
^3HN31E1... | 7 | 5 | 10 | 1
inactivity_detection(allowed_inactivity_period, refresh_rate=Timedelta('0 days 00:00:01'), instance=None)
source Monitor append only table additions to detect inactivity periods and identify when activity resumes, optionally with instance argument.
This function periodically checks for table additions according to the provided refresh rate. It is limited to append only tables since the function is mostly intended to monitor input data streams. Inactivity periods that exceed the specified threshold are reported. The output table lists the inactivity periods with the UTC timestamp of the last detected activity before the threshold was exceeded and the UTC timestamp of the first detected activity that ends the inactivity period, or None if the inactivity period not yet ended.
Note: the inactivity period limits may differ from the actual values when the refresh rate is lower than the table update rate. It is also assumed that the system latency is neglectable compared to the specified threshold. When used with instance, an inactivity period since the stream start (i.e. no incoming data) is reported with a None value in the instance column.
pw.Duration
) â maximum allowed inactivity duration. If no activity occurs within this duration, an inactivity period is flagged.pw.Duration, optional
) â frequency with which table activities are checked to detect an inactivity period. Defaults to 1 second.pw.ColumnExpression | None, optional
) â group column to detect inactivity periods separately. Defaults to None.source Interpolates missing values in a column using the previous and next values based on a timestamps column.
ColumnReference
) â Reference to the column containing timestamps.ColumnReference
) â References to the columns containing values to be interpolated.InterpolateMode, optional
) â The interpolation mode. Currently, only InterpolateMode.LINEAR is supported. Default is InterpolateMode.LINEAR.NOTE: * The interpolation is performed based on linear interpolation between the previous and next values.
Example:
import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values_a | values_b
1 | 1 | 10
2 | |
3 | 3 |
4 | |
5 | |
6 | 6 | 60
''')
table = table.interpolate(pw.this.timestamp, pw.this.values_a, pw.this.values_b)
pw.debug.compute_and_print(table, include_id=False)
timestamp | values_a | values_b
1 | 1.0 | 10.0
2 | 2.0 | 20.0
3 | 3.0 | 30.0
4 | 4.0 | 40.0
5 | 5.0 | 50.0
6 | 6.0 | 60.0
intersect(*tables)
source Restrict self universe to keys appearing in all of the tables.
Table
) â tables keys of which are used to restrict universe.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
| cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.intersect(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
ix(expression, *, optional=False, context=None, allow_misses=False)
source Reindexes the table using expression values as keys. Uses keys from context, or tries to infer proper context from the expression. If optional is True, then None in expression values result in None values in the result columns. Missing values in table keys result in RuntimeError. If allow_misses
is set to True, they result in None value on the output.
Context can be anything that allows for select or reduce, or pathway.this construct (latter results in returning a delayed operation, and should be only used when using ix inside join().select() or groupby().reduce() sequence).
Example:
import pathway as pw
t_animals = pw.debug.table_from_markdown('''
| epithet | genus
1 | upupa | epops
2 | acherontia | atropos
3 | bubo | scandiacus
4 | dynastes | hercules
''')
t_birds = pw.debug.table_from_markdown('''
| desc
2 | hoopoe
4 | owl
''')
ret = t_birds.select(t_birds.desc, latin=t_animals.ix(t_birds.id).genus)
pw.debug.compute_and_print(ret, include_id=False)
desc | latin
hoopoe | atropos
owl | hercules
ix_ref(*args, optional=False, context=None, instance=None, allow_misses=False)
source Reindexes the table using expressions as primary keys. Uses keys from context, or tries to infer proper context from the expression. If optional
is True, then None in expression values result in None values in the result columns. Missing values in table keys result in RuntimeError. If allow_misses
is set to True, they result in None value on the output.
Context can be anything that allows for select or reduce, or pathway.this construct (latter results in returning a delayed operation, and should be only used when using ix inside join().select() or groupby().reduce() sequence).
Union
[ColumnExpression
, None
, int
, float
, str
, bytes
, bool
, Pointer
, datetime
, timedelta
, ndarray
, Json
, dict
[str
, Any
], tuple
[Any
, ...
], Error
, Pending
]) â Column references.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
name | pet
Alice | dog
Bob | cat
Carole | cat
David | dog
''')
t2 = t1.with_id_from(pw.this.name)
t2 = t2.select(*pw.this, new_value=pw.this.ix_ref("Alice").pet)
pw.debug.compute_and_print(t2, include_id=False)
name | pet | new_value
Alice | dog | dog
Bob | cat | dog
Carole | cat | dog
David | dog | dog
Tables obtained by a groupby/reduce scheme always have primary keys:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
name | pet
Alice | dog
Bob | cat
Carole | cat
David | cat
''')
t2 = t1.groupby(pw.this.pet).reduce(pw.this.pet, count=pw.reducers.count())
t3 = t1.select(*pw.this, new_value=t2.ix_ref(t1.pet).count)
pw.debug.compute_and_print(t3, include_id=False)
name | pet | new_value
Alice | dog | 1
Bob | cat | 3
Carole | cat | 3
David | cat | 3
Single-row tables can be accessed via ix_ref():
import pathway as pw
t1 = pw.debug.table_from_markdown('''
name | pet
Alice | dog
Bob | cat
Carole | cat
David | cat
''')
t2 = t1.reduce(count=pw.reducers.count())
t3 = t1.select(*pw.this, new_value=t2.ix_ref(context=t1).count)
pw.debug.compute_and_print(t3, include_id=False)
name | pet | new_value
Alice | dog | 4
Bob | cat | 4
Carole | cat | 4
David | cat | 4
join(other, *on, id=None, how=JoinMode.INNER, left_instance=None, right_instance=None, left_exactly_once=False, right_exactly_once=False)
source Join self with other using the given join expression.
Joinable
) â the right side of the join, Table
or JoinResult
.ColumnExpression
) â a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.ColumnReference
| None
) â optional argument for id of result, can be only self.id or other.idJoinMode
) â by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.bool
) â if you can guarantee that each row on the left side of the join will be joined at most once, then you can set this parameter to True
. Then each row after getting a match is removed from the join state. As a result, less memory is needed.bool
) â if you can guarantee that each row on the right side of the join will be joined at most once, then you can set this parameter to True
. Then each row after getting a match is removed from the join state. As a result, less memory is needed.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(
t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9 | Bob | L
join_inner(other, *on, id=None, left_instance=None, right_instance=None, left_exactly_once=False, right_exactly_once=False)
source Inner-joins two tables or join results.
Joinable
) â the right side of the join, Table
or JoinResult
.ColumnExpression
) â a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.ColumnReference
| None
) â optional argument for id of result, can be only self.id or other.idbool
) â if you can guarantee that each row on the left side of the join will be joined at most once, then you can set this parameter to True
. Then each row after getting a match is removed from the join state. As a result, less memory is needed.bool
) â if you can guarantee that each row on the right side of the join will be joined at most once, then you can set this parameter to True
. Then each row after getting a match is removed from the join state. As a result, less memory is needed.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join_inner(t2, t1.pet == t2.pet, t1.owner == t2.owner).select(
age=t1.age, owner_name=t2.owner, size=t2.size
)
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9 | Bob | L
join_left(other, *on, id=None, left_instance=None, right_instance=None, left_exactly_once=False, right_exactly_once=False)
source Left-joins two tables or join results.
Joinable
) â the right side of the join, Table
or JoinResult
.ColumnExpression
) â Columns to join, syntax self.col1 == other.col2ColumnReference
| None
) â optional id column of the resultbool
) â if you can guarantee that each row on the left side of the join will be joined at most once, then you can set this parameter to True
. Then each row after getting a match is removed from the join state. As a result, less memory is needed.bool
) â if you can guarantee that each row on the right side of the join will be joined at most once, then you can set this parameter to True
. Then each row after getting a match is removed from the join state. As a result, less memory is needed.Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| a | b
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| c | d
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_left(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t2.id)),
include_id=False)
a | t2_c | s
11 | 11 | 322
12 | 12 | 324
13 | |
13 | |
join_outer(other, *on, id=None, left_instance=None, right_instance=None, left_exactly_once=False, right_exactly_once=False)
source Outer-joins two tables or join results.
Joinable
) â the right side of the join, Table
or JoinResult
.ColumnExpression
) â Columns to join, syntax self.col1 == other.col2ColumnReference
| None
) â optional id column of the resultbool
) â if you can guarantee that each row on the left side of the join will be joined at most once, then you can set this parameter to True
. Then each row after getting a match is removed from the join state. As a result, less memory is needed.bool
) â if you can guarantee that each row on the right side of the join will be joined at most once, then you can set this parameter to True
. Then each row after getting a match is removed from the join state. As a result, less memory is needed.Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| a | b
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| c | d
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_outer(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t1.id, t2.id)),
include_id=False)
a | t2_c | s
| 14 |
| 14 |
11 | 11 | 322
12 | 12 | 324
13 | |
13 | |
join_right(other, *on, id=None, left_instance=None, right_instance=None, left_exactly_once=False, right_exactly_once=False)
source Outer-joins two tables or join results.
Joinable
) â the right side of the join, Table
or JoinResult
.ColumnExpression
) â Columns to join, syntax self.col1 == other.col2ColumnReference
| None
) â optional id column of the resultbool
) â if you can guarantee that each row on the left side of the join will be joined at most once, then you can set this parameter to True
. Then each row after getting a match is removed from the join state. As a result, less memory is needed.bool
) â if you can guarantee that each row on the right side of the join will be joined at most once, then you can set this parameter to True
. Then each row after getting a match is removed from the join state. As a result, less memory is needed.Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| a | b
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| c | d
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_right(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(pw.coalesce(t1.b,0) + t2.d,t1.id)),
include_id=False)
a | t2_c | s
| 14 |
| 14 |
11 | 11 | 322
12 | 12 | 324
source Allows for plotting contents of the table visually in e.g. jupyter. If the table depends only on the bounded data sources, the plot will be generated right away. Otherwise (in streaming scenario), the plot will be auto-updating after running pw.run()
pw.Table
) â a table serving as a source of dataCallable[[ColumnDataSource], Plot]
) â function for creating plot from ColumnDataSourceExample:
import pathway as pw
from bokeh.plotting import figure
def func(source):
plot = figure(height=400, width=400, title="CPU usage over time")
plot.scatter('a', 'b', source=source, line_width=3, line_alpha=0.6)
return plot
viz = pw.debug.table_from_pandas(pd.DataFrame({"a":[1,2,3],"b":[3,1,2]})).plot(func)
type(viz)
<class 'panel.layout.base.Column'>
pointer_from(*args, optional=False, instance=None)
source Pseudo-random hash of its argument. Produces pointer types. Applied column-wise.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age owner pet
1 10 Alice dog
2 9 Bob dog
3 8 Alice cat
4 7 Bob dog''')
g = t1.groupby(t1.owner).reduce(refcol = t1.pointer_from(t1.owner)) # g.id == g.refcol
pw.debug.compute_and_print(g.select(test = (g.id == g.refcol)), include_id=False)
promise_universe_is_equal_to(other)
source Asserts to Pathway that an universe of self is a subset of universe of each of the others.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
NOTE: The assertion works in place.
Example:
import pathway as pw
import pytest
t1 = pw.debug.table_from_markdown(
'''
| age | owner | pet
1 | 8 | Alice | cat
2 | 9 | Bob | dog
3 | 15 | Alice | tortoise
4 | 99 | Bob | seahorse
'''
).filter(pw.this.age<30)
t2 = pw.debug.table_from_markdown(
'''
| age | owner
1 | 11 | Alice
2 | 12 | Tom
3 | 7 | Eve
'''
)
t3 = t2.filter(pw.this.age > 10)
with pytest.raises(ValueError):
t1.update_cells(t3)
t1 = t1.promise_universe_is_equal_to(t2)
result = t1.update_cells(t3)
pw.debug.compute_and_print(result, include_id=False)
age | owner | pet
11 | Alice | cat
12 | Tom | dog
15 | Alice | tortoise
promise_universe_is_subset_of(other)
source Asserts to Pathway that an universe of self is a subset of universe of each of the other.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
NOTE: The assertion works in place.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
| age | owner | pet
1 | 10 | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 30
promise_universes_are_disjoint(other)
source Asserts to Pathway that an universe of self is disjoint from universe of other.
Semantics: Used in situations where Pathway cannot deduce universes are disjoint.
NOTE: The assertion works in place.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 1
11 | Alice | 30
12 | Tom | 40
reduce(*args, **kwargs)
source Reduce a table to a single row.
Equivalent to self.groupby().reduce(*args, **kwargs).
ColumnReference
) â reducer to reduce the table withColumnExpression
) â reducer to reduce the table with. Its key is the new name of a column.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.reduce(ageagg=pw.reducers.argmin(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
t3 = t2.select(t1.ix(t2.ageagg).age, t1.ix(t2.ageagg).pet)
pw.debug.compute_and_print(t3, include_id=False)
remove_errors()
source Filters out rows that contain errors.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
a | b
3 | 3
4 | 0
5 | 5
6 | 2
'''
)
t2 = t1.with_columns(x=pw.this.a // pw.this.b)
res = t2.remove_errors()
pw.debug.compute_and_print(res, include_id=False, terminate_on_error=False)
a | b | x
3 | 3 | 1
5 | 5 | 1
6 | 2 | 3
rename(names_mapping=None, **kwargs)
source Rename columns according either a dictionary or kwargs.
If a mapping is provided using a dictionary, rename_by_dict
will be used. Otherwise, rename_columns
will be used with kwargs. Columns not in keys(kwargs) are not changed. New name of a column must not be id
.
dict
[str
| ColumnReference
, str
] | None
) â mapping from old column names to new names.ColumnExpression
) â mapping from old column names to new names.source Rename columns according to a dictionary.
Columns not in keys(kwargs) are not changed. New name of a column must not be id.
dict
[str
| ColumnReference
, str
]) â mapping from old column names to new names.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.rename_by_dict({"age": "years_old", t1.pet: "animal"})
pw.debug.compute_and_print(t2, include_id=False)
owner | years_old | animal
Alice | 8 | 2
Alice | 10 | 1
Bob | 9 | 1
rename_columns(**kwargs)
source Rename columns according to kwargs.
Columns not in keys(kwargs) are not changed. New name of a column must not be id.
str
| ColumnReference
) â mapping from old column names to new names.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.rename_columns(years_old=t1.age, animal=t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
owner | years_old | animal
Alice | 8 | 2
Alice | 10 | 1
Bob | 9 | 1
restrict(other)
source Restrict self universe to keys appearing in other.
TableLike
) â table which universe is used to restrict universe of self.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
'''
)
t2 = pw.debug.table_from_markdown(
'''
| cost
2 | 100
3 | 200
'''
)
t2.promise_universe_is_subset_of(t1)
<pathway.Table schema={'cost': <class 'int'>}>
t3 = t1.restrict(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
property schema: type[pathway.internals.schema.Schema]
Get schema of the table.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t1.schema
<pathway.Schema types={'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>}, id_type=<class 'pathway.engine.Pointer'>>
select(*args, **kwargs)
source Build a new table with columns specified by kwargs.
Output columnsâ names are keys(kwargs). values(kwargs) can be raw values, boxed values, columns. Assigning to id reindexes the table.
ColumnReference
) â Column references.Any
) â Column expressions with their new assigned names.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
pet
Dog
Cat
''')
t2 = t1.select(animal=t1.pet, desc="fluffy")
pw.debug.compute_and_print(t2, include_id=False)
animal | desc
Cat | fluffy
Dog | fluffy
show(*, snapshot=True, include_id=True, short_pointers=True, sorters=None)
source Allows for displaying table visually in e.g. jupyter. If the table depends only on the bounded data sources, the table preview will be generated right away. Otherwise (in streaming scenario), the table will be auto-updating after running pw.run()
pw.Table
) â a table to be displayedbool, optional
) â whether only current snapshot or all changes to the table should be displayed. Defaults to True.bool, optional
) â whether to show ids of rows. Defaults to True.bool, optional
) â whether to shorten printed ids. Defaults to True.Example:
import pathway as pw
table_viz = pw.debug.table_from_pandas(pd.DataFrame({"a":[1,2,3],"b":[3,1,2]})).show()
type(table_viz)
<class 'panel.layout.base.Column'>
property slice: TableSlice
Creates a collection of references to self columns. Supports basic column manipulation methods.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t1.slice.without("age")
TableSlice({'owner': <table1>.owner, 'pet': <table1>.pet})
sort(key, instance=None)
source Sorts a table by the specified keys.
ColumnExpression
[int | float | datetime | str | bytes]
) â An expression to sort by.ColumnExpression
| None
) â ColumnReference or None An expression with instance. Rows are sorted within an instance. prev
and next
columns will only point to rows that have the same instance.prev
and next
, containing the pointers to the previous and next rows.Example:
import pathway as pw
table = pw.debug.table_from_markdown('''
name | age | score
Alice | 25 | 80
Bob | 20 | 90
Charlie | 30 | 80
''')
table = table.with_id_from(pw.this.name)
table += table.sort(key=pw.this.age)
pw.debug.compute_and_print(table, include_id=True)
| name | age | score | prev | next
^GBSDEEW... | Alice | 25 | 80 | ^EDPSSB1... | ^DS9AT95...
^EDPSSB1... | Bob | 20 | 90 | | ^GBSDEEW...
^DS9AT95... | Charlie | 30 | 80 | ^GBSDEEW... |
table = pw.debug.table_from_markdown('''
name | age | score
Alice | 25 | 80
Bob | 20 | 90
Charlie | 30 | 80
David | 35 | 90
Eve | 15 | 80
''')
table = table.with_id_from(pw.this.name)
table += table.sort(key=pw.this.age, instance=pw.this.score)
pw.debug.compute_and_print(table, include_id=True)
| name | age | score | prev | next
^GBSDEEW... | Alice | 25 | 80 | ^T0B95XH... | ^DS9AT95...
^EDPSSB1... | Bob | 20 | 90 | | ^RT0AZWX...
^DS9AT95... | Charlie | 30 | 80 | ^GBSDEEW... |
^RT0AZWX... | David | 35 | 90 | ^EDPSSB1... |
^T0B95XH... | Eve | 15 | 80 | | ^GBSDEEW...
split(split_expression)
source Split a table according to split_expression condition.
ColumnExpression
) â ColumnExpression that specifies the split condition.Example:
import pathway as pw
vertices = pw.debug.table_from_markdown('''
label outdegree
1 3
7 0
''')
positive, negative = vertices.split(vertices.outdegree == 0)
pw.debug.compute_and_print(positive, include_id=False)
pw.debug.compute_and_print(negative, include_id=False)
stream_to_table(is_upsert)
source Converts a stream of changes (updates and deletions) into a table.
In Pathway, a stream is a sequence of row changes, where each row has an id and a boolean column (e.g., âis_upsertâ) indicating whether the row is an update (True
) or a deletion (False
).
This method reconstructs the current state of the table from such a stream by applying the updates and deletions in order. It is a stateful operation: the operator keeps track of the latest value for each id. If there are multiple events for a single id in a single batch in a stream, the order of applying the actions is not specified. For deletions, only ids are important. The values in columns are ignored.
ColumnExpression
) â An expression that evaluates to a boolean value. True
means the row is an upsert (insert or update), False
means the row is a deletion.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
id | pet | age | is_upsert | __time__
1 | cat | 3 | True | 2
2 | dog | 11 | True | 2
1 | cat | 4 | True | 4
2 | dog | 0 | False | 4
'''
)
t2 = t1.stream_to_table(pw.this.is_upsert)
pw.debug.compute_and_print_update_stream(t2, include_id=False)
pet | age | is_upsert | __time__ | __diff__
cat | 3 | True | 2 | 1
dog | 11 | True | 2 | 1
cat | 3 | True | 4 | -1
dog | 11 | True | 4 | -1
cat | 4 | True | 4 | 1
to_stream(upsert_column_name='is_upsert')
source Converts a table to a stream of changes.
If in a given batch there is:
True
in the update_column_name
column is producedFalse
in the update_column_name
column is produced.The values in all other columns are kept. This is a stateless operation.
str
) â name of the boolean column that will be added to the table and contain information about the type of action.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
id | age | owner | pet | __time__ | __diff__
1 | 10 | Alice | dog | 2 | 1
2 | 9 | Bob | cat | 2 | 1
1 | 10 | Alice | dog | 4 | -1
1 | 11 | Alice | dog | 4 | 1
2 | 9 | Bob | cat | 4 | -1
2 | 10 | Bob | cat | 4 | 1
1 | 11 | Alice | dog | 6 | -1
1 | 12 | Alice | dog | 6 | 1
2 | 10 | Bob | cat | 6 | -1
''')
t2 = t1.to_stream()
pw.debug.compute_and_print_update_stream(t2, include_id=False)
age | owner | pet | is_upsert | __time__ | __diff__
9 | Bob | cat | True | 2 | 1
10 | Alice | dog | True | 2 | 1
10 | Bob | cat | True | 4 | 1
11 | Alice | dog | True | 4 | 1
10 | Bob | cat | False | 6 | 1
12 | Alice | dog | True | 6 | 1
typehints()
source Return the types of the columns as a dictionary.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t1.typehints()
mappingproxy({'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>})
update_cells(other, )
source Updates cells of self, breaking ties in favor of the values in other.
Semantics:
* result.columns == self.columns
* result.id == self.id
* conflicts are resolved preferring otherâs values
Requires:
* other.columns â self.columns
* other.id â self.id
Table
) â the other table.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
age | owner | pet
1 | 10 | Alice | 30
''')
pw.universes.promise_is_subset_of(t2, t1)
t3 = t1.update_cells(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 30
update_rows(other)
source Updates rows of self, breaking ties in favor for the rows in other.
Semantics:
Requires:
Table
[TypeVar
(TSchema
, bound= Schema
)]) â the other table.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
| age | owner | pet
1 | 10 | Alice | 30
12 | 12 | Tom | 40
''')
t3 = t1.update_rows(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 30
12 | Tom | 40
update_types(**kwargs)
source Updates types in schema. Has no effect on the runtime.
with_columns(*args, **kwargs)source Updates columns of self, according to args and kwargs. See table.select specification for evaluation of args and kwargs.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
| owner | pet | size
1 | Tom | 1 | 10
2 | Bob | 1 | 9
3 | Tom | 2 | 8
''')
t3 = t1.with_columns(*t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet | size
8 | Tom | 2 | 8
9 | Bob | 1 | 9
10 | Tom | 1 | 10
with_id(new_index)
source Set new ids based on another column containing id-typed values.
To generate ids based on arbitrary valued columns, use with_id_from.
Values assigned must be row-wise unique. The uniqueness is not checked by pathway. Failing to provide unique ids can cause unexpected errors downstream.
Example:
import pytest; pytest.xfail("with_id is hard to test")
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
| new_id
1 | 2
2 | 3
3 | 4
''')
t3 = t1.promise_universe_is_subset_of(t2).with_id(t2.new_id)
pw.debug.compute_and_print(t3)
age owner pet
^2 10 Alice 1
^3 9 Bob 1
^4 8 Alice 2
with_id_from(*args, instance=None)
source Compute new ids based on values in columns. Ids computed from columns must be row-wise unique. The uniqueness is not checked by pathway. Failing to provide unique ids can cause unexpected errors downstream.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = t1 + t1.select(old_id=t1.id)
t3 = t2.with_id_from(t2.age)
pw.debug.compute_and_print(t3)
| age | owner | pet | old_id
^... | 8 | Alice | 2 | ^...
^... | 9 | Bob | 1 | ^...
^... | 10 | Alice | 1 | ^...
t4 = t3.select(t3.age, t3.owner, t3.pet, same_as_old=(t3.id == t3.old_id),
same_as_new=(t3.id == t3.pointer_from(t3.age)))
pw.debug.compute_and_print(t4)
| age | owner | pet | same_as_old | same_as_new
^... | 8 | Alice | 2 | False | True
^... | 9 | Bob | 1 | False | True
^... | 10 | Alice | 1 | False | True
with_prefix(prefix)
source Rename columns by adding prefix to each name of column.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.with_prefix("u_")
pw.debug.compute_and_print(t2, include_id=False)
u_age | u_owner | u_pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 1
with_suffix(suffix)
source Rename columns by adding suffix to each name of column.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.with_suffix("_current")
pw.debug.compute_and_print(t2, include_id=False)
age_current | owner_current | pet_current
8 | Alice | 2
9 | Bob | 1
10 | Alice | 1
with_universe_of(other)
source Returns a copy of self with exactly the same universe as others.
Semantics: Required precondition self.universe == other.universe Used in situations where Pathway cannot deduce equality of universes, but those are equal as verified during runtime.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.table_from_markdown('''
| age
1 | 10
7 | 3
8 | 100
''')
t3 = t2.filter(pw.this.age < 30).with_universe_of(t1)
t4 = t1 + t3
pw.debug.compute_and_print(t4, include_id=False)
pet | age
Cat | 3
Dog | 10
without(*columns)
source Selects all columns without named column references.
str
| ColumnReference
) â columns to be dropped provided by table.column_name notation.Example:
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.without(t1.age, pw.this.pet)
pw.debug.compute_and_print(t2, include_id=False)
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4