A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/iterative/datachain below:

iterative/datachain: ETL, Analytics, Versioning for Unstructured Data

DataChain

DataChain is a Python-based AI-data warehouse for transforming and analyzing unstructured data like images, audio, videos, text and PDFs. It integrates with external storage (e.g. S3) to process data efficiently without data duplication and manages metadata in an internal database for easy and efficient querying.

  1. ETL. Pythonic framework for describing and running unstructured data transformations and enrichments, applying models to data, including LLMs.
  2. Analytics. DataChain dataset is a table that combines all the information about data objects in one place + it provides dataframe-like API and vectorized engine to do analytics on these tables at scale.
  3. Versioning. DataChain doesn't store, require moving or copying data (unlike DVC). Perfect use case is a bucket with thousands or millions of images, videos, audio, PDFs.
  4. Incremental Processing. DataChain's delta and retry features allow for efficient processing workflows:

Visit Quick Start and Docs to get started with DataChain and learn more.

Example: Download Subset of Files Based on Metadata

Sometimes users only need to download a specific subset of files from cloud storage, rather than the entire dataset. For example, you could use a JSON file's metadata to download just cat images with high confidence scores.

import datachain as dc

meta = dc.read_json("gs://datachain-demo/dogs-and-cats/*json", column="meta", anon=True)
images = dc.read_storage("gs://datachain-demo/dogs-and-cats/*jpg", anon=True)

images_id = images.map(id=lambda file: file.path.split('.')[-2])
annotated = images_id.merge(meta, on="id", right_on="meta.id")

likely_cats = annotated.filter((dc.Column("meta.inference.confidence") > 0.93) \
                               & (dc.Column("meta.inference.class_") == "cat"))
likely_cats.to_storage("high-confidence-cats/", signal="file")
Example: Incremental Processing with Error Handling

This example shows how to use both delta and retry processing for efficient handling of large datasets that evolve over time and may occasionally have processing errors.

import datachain as dc
from datachain import C, File

def process_file(file: File):
    """Process a file, which may occasionally fail."""
    try:
        # Your processing logic here
        content = file.read_text()
        result = analyze_content(content)
        return {
            "content": content,
            "result": result,
            "error": None  # No error
        }
    except Exception as e:
        # Return an error that will trigger reprocessing next time
        return {
            "content": None,
            "result": None,
            "error": str(e)  # Error field will trigger retry
        }

# Process files efficiently with delta and retry
chain = (
    dc.read_storage(
        "data/",
        update=True,
        delta=True,              # Process only new/changed files
        delta_on="file.path",    # Identify files by path
        retry_on="error"         # Field that indicates errors
    )
    .map(processed_result=process_file)
    .mutate(
        content=C("processed_result.content"),
        result=C("processed_result.result"),
        error=C("processed_result.error")
    )
    .save(name="processed_data")
)
Example: LLM based text-file evaluation

In this example, we evaluate chatbot conversations stored in text files using LLM based evaluation.

$ pip install mistralai # Requires version >=1.0.0
$ export MISTRAL_API_KEY=_your_key_

Python code:

import os
from mistralai import Mistral
import datachain as dc

PROMPT = "Was this dialog successful? Answer in a single word: Success or Failure."

def eval_dialogue(file: dc.File) -> bool:
     client = Mistral(api_key = os.environ["MISTRAL_API_KEY"])
     response = client.chat.complete(
         model="open-mixtral-8x22b",
         messages=[{"role": "system", "content": PROMPT},
                   {"role": "user", "content": file.read()}])
     result = response.choices[0].message.content
     return result.lower().startswith("success")

chain = (
   dc.read_storage("gs://datachain-demo/chatbot-KiT/", column="file", anon=True)
   .settings(parallel=4, cache=True)
   .map(is_success=eval_dialogue)
   .save("mistral_files")
)

successful_chain = chain.filter(dc.Column("is_success") == True)
successful_chain.to_storage("./output_mistral")

print(f"{successful_chain.count()} files were exported")

With the instruction above, the Mistral model considers 31/50 files to hold the successful dialogues:

$ ls output_mistral/datachain-demo/chatbot-KiT/
1.txt  15.txt 18.txt 2.txt  22.txt 25.txt 28.txt 33.txt 37.txt 4.txt  41.txt ...
$ ls output_mistral/datachain-demo/chatbot-KiT/ | wc -l
31
📂 Multimodal Dataset Versioning.
🐍 Python-friendly.
🧠 Data Enrichment and Processing.

Contributions are very welcome. To learn more, see the Contributor Guide.

DataChain Studio Platform

DataChain Studio is a proprietary solution for teams that offers:


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4