A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://riverqueue.com/docs/python below:

Inserting jobs from Python | River Docs

River supports inserting jobs from python and have them worked in Go, a feature that may be desirable in performance sensitive cases so that jobs can take advantage of Go's considerably faster runtime speed.

Insertion is supported through SQLAlchemy.

Basic usage

Your project should bundle the riverqueue package in its dependencies. How to go about this will depend on your toolchain, but for example in Rye, it'd look like:

Initialize a client with:

import riverqueue

from riverqueue.driver import riversqlalchemy

engine = sqlalchemy.create_engine("postgresql://...")

client = riverqueue.Client(riversqlalchemy.Driver(engine))

Define a job and insert it:

@dataclass

class SortArgs:

strings: list[str]

kind: str = "sort"

def to_json(self) -> str:

return json.dumps({"strings": self.strings})

insert_res = client.insert(

SortArgs(strings=["whale", "tiger", "bear"]),

)

insert_res.job # inserted job row

Job args should comply with the riverqueue.JobArgs protocol:

class JobArgs(Protocol):

kind: str

def to_json(self) -> str:

pass

They may also respond to insert_opts() with an instance of InsertOpts to define insertion options that'll be used for all jobs of the kind.

We recommend using dataclasses for job args since they should ideally be minimal sets of primitive properties with little other embellishment, and dataclasses provide a succinct way of accomplishing this.

Insertion options

Inserts take an insert_opts parameter to customize features of the inserted job:

insert_res = client.insert(

SortArgs(strings=["whale", "tiger", "bear"]),

insert_opts=riverqueue.InsertOpts(

max_attempts=17,

priority=3,

queue="my_queue",

tags=["custom"]

),

)

Inserting unique jobs

Unique jobs are supported through InsertOpts.unique_opts(), and can be made unique by args, period, queue, and state. If a job matching unique properties is found on insert, the insert is skipped and the existing job returned.

insert_res = client.insert(

SortArgs(strings=["whale", "tiger", "bear"]),

insert_opts=riverqueue.InsertOpts(

unique_opts=riverqueue.UniqueOpts(

by_args=True,

by_period=15*60,

by_queue=True,

by_state=[riverqueue.JobState.AVAILABLE]

)

),

)

# contains either a newly inserted job, or an existing one if insertion was skipped

insert_res.job

# true if insertion was skipped

insert_res.unique_skipped_as_duplicated

Inserting jobs in bulk

Use #insert_many to bulk insert jobs as a single operation for improved efficiency:

results = client.insert_many([

SimpleArgs(job_num=1),

SimpleArgs(job_num=2)

])

Or with InsertManyParams, which may include insertion options:

results = client.insert_many([

InsertManyParams(args=SimpleArgs.new(job_num=1), insert_opts=riverqueue.InsertOpts(max_attempts=5)),

InsertManyParams(args=SimpleArgs.new(job_num=2), insert_opts=riverqueue.InsertOpts(queue="high_priority"))

])

Inserting in a transaction

To insert jobs in a transaction, open one in your driver, and pass it as the first argument to insert_tx() or insert_many_tx():

with engine.begin() as session:

insert_res = client.insert_tx(

session,

SortArgs(strings=["whale", "tiger", "bear"]),

)

Asynchronous I/O (asyncio)

The package supports River's asyncio (asynchronous I/O) through an alternate AsyncClient and riversqlalchemy.AsyncDriver. You'll need to make sure to use SQLAlchemy's alternative async engine and an asynchronous Postgres driver like asyncpg, but otherwise usage looks very similar to use without async:

engine = sqlalchemy.ext.asyncio.create_async_engine("postgresql+asyncpg://...")

client = riverqueue.AsyncClient(riversqlalchemy.AsyncDriver(engine))

insert_res = await client.insert(

SortArgs(strings=["whale", "tiger", "bear"]),

)

With a transaction:

async with engine.begin() as session:

insert_res = await client.insert_tx(

session,

SortArgs(strings=["whale", "tiger", "bear"]),

)

MyPy and type checking

The package exports a py.typed file to indicate that it's typed, so you should be able to use MyPy to include it in static analysis.

Drivers SQLAlchemy

Our read is that SQLAlchemy is the dominant ORM in the Python ecosystem, so it's the only driver available for River. Under the hood of SQLAlchemy, projects will also need a Postgres driver like psycopg2 or asyncpg (for async).

River's driver system should enable integration with other ORMs, so let us know if there's a good reason you need one, and we'll consider it.


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4