A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/aelassas/wexflow/wiki/Python-SSE-Client below:

Python SSE Client · aelassas/wexflow Wiki · GitHub

Here is a sample Python client sse.py:

import requests
import sseclient
import json

base_url = 'http://localhost:8000/api/v1'
username = 'admin'
password = 'wexflow2018'
workflow_id = 41

def login(user, passwd, stay_connected=False):
    url = f'{base_url}/login'
    headers = {'Content-Type': 'application/json'}
    payload = {
        'username': user,
        'password': passwd,
        'stayConnected': stay_connected
    }

    response = requests.post(url, json=payload, headers=headers)
    if response.status_code != 200:
        raise Exception(f'HTTP {response.status_code} - {response.reason}')
    
    return response.json()['access_token']

def start_workflow(token, workflow_id):
    url = f'{base_url}/start?w={workflow_id}'
    headers = {
        'Authorization': f'Bearer {token}'
    }

    response = requests.post(url, headers=headers)
    if response.status_code != 200:
        raise Exception(f'HTTP {response.status_code} - {response.reason}')
    
    return response.json()

def listen_to_sse(url, token):
    headers = {
        'Authorization': f'Bearer {token}',
        'Accept': 'text/event-stream'
    }
    response = requests.get(url, headers=headers, stream=True)

    client = sseclient.SSEClient(response)
    print('SSE connection opened')

    for event in client.events():
        try:
            data = json.loads(event.data)
            print('Received SSE JSON:')
            print(json.dumps(data, indent=2))
            break  # Stop after first message
        except Exception as e:
            print(f'Failed to parse SSE JSON: {e}')
            break

    print('SSE connection closed')

# Main execution
try:
    token = login(username, password)
    job_id = start_workflow(token, workflow_id)
    print(f'Workflow {workflow_id} started successfully. Job ID: {job_id}')

    sse_url = f'{base_url}/sse/{workflow_id}/{job_id}'
    listen_to_sse(sse_url, token)

except Exception as e:
    print(f'Error: {e}')

To run the client, use the following command:


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4