DataChain is a Python-based AI-data warehouse for transforming and analyzing unstructured data like images, audio, videos, text and PDFs. It integrates with external storage (e.g. S3) to process data efficiently without data duplication and manages metadata in an internal database for easy and efficient querying.
Visit Quick Start and Docs to get started with DataChain and learn more.
Example: Download Subset of Files Based on MetadataSometimes users only need to download a specific subset of files from cloud storage, rather than the entire dataset. For example, you could use a JSON file's metadata to download just cat images with high confidence scores.
import datachain as dc meta = dc.read_json("gs://datachain-demo/dogs-and-cats/*json", column="meta", anon=True) images = dc.read_storage("gs://datachain-demo/dogs-and-cats/*jpg", anon=True) images_id = images.map(id=lambda file: file.path.split('.')[-2]) annotated = images_id.merge(meta, on="id", right_on="meta.id") likely_cats = annotated.filter((dc.Column("meta.inference.confidence") > 0.93) \ & (dc.Column("meta.inference.class_") == "cat")) likely_cats.to_storage("high-confidence-cats/", signal="file")Example: Incremental Processing with Error Handling
This example shows how to use both delta and retry processing for efficient handling of large datasets that evolve over time and may occasionally have processing errors.
import datachain as dc from datachain import C, File def process_file(file: File): """Process a file, which may occasionally fail.""" try: # Your processing logic here content = file.read_text() result = analyze_content(content) return { "content": content, "result": result, "error": None # No error } except Exception as e: # Return an error that will trigger reprocessing next time return { "content": None, "result": None, "error": str(e) # Error field will trigger retry } # Process files efficiently with delta and retry chain = ( dc.read_storage( "data/", update=True, delta=True, # Process only new/changed files delta_on="file.path", # Identify files by path retry_on="error" # Field that indicates errors ) .map(processed_result=process_file) .mutate( content=C("processed_result.content"), result=C("processed_result.result"), error=C("processed_result.error") ) .save(name="processed_data") )Example: LLM based text-file evaluation
In this example, we evaluate chatbot conversations stored in text files using LLM based evaluation.
$ pip install mistralai # Requires version >=1.0.0 $ export MISTRAL_API_KEY=_your_key_
Python code:
import os from mistralai import Mistral import datachain as dc PROMPT = "Was this dialog successful? Answer in a single word: Success or Failure." def eval_dialogue(file: dc.File) -> bool: client = Mistral(api_key = os.environ["MISTRAL_API_KEY"]) response = client.chat.complete( model="open-mixtral-8x22b", messages=[{"role": "system", "content": PROMPT}, {"role": "user", "content": file.read()}]) result = response.choices[0].message.content return result.lower().startswith("success") chain = ( dc.read_storage("gs://datachain-demo/chatbot-KiT/", column="file", anon=True) .settings(parallel=4, cache=True) .map(is_success=eval_dialogue) .save("mistral_files") ) successful_chain = chain.filter(dc.Column("is_success") == True) successful_chain.to_storage("./output_mistral") print(f"{successful_chain.count()} files were exported")
With the instruction above, the Mistral model considers 31/50 files to hold the successful dialogues:
$ ls output_mistral/datachain-demo/chatbot-KiT/ 1.txt 15.txt 18.txt 2.txt 22.txt 25.txt 28.txt 33.txt 37.txt 4.txt 41.txt ... $ ls output_mistral/datachain-demo/chatbot-KiT/ | wc -l 31
Contributions are very welcome. To learn more, see the Contributor Guide.
DataChain Studio is a proprietary solution for teams that offers:
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4