No files to process after Embedder ,Here are my code and issue , i need an guidance to sort this issue,please help me out

import weaviate

weaviate_url = ""
weaviate_key = ""

client = weaviate.connect_to_wcs(
    cluster_url=weaviate_url,
    auth_credentials=weaviate.auth.AuthApiKey(weaviate_key),
    headers={
        "X-OpenAI-Api-Key": openai.api_key  # Replace with your OpenAI key
    }
)

client.is_ready()

True

import weaviate.classes.config as wc
from weaviate.classes.config import ReferenceProperty

client.collections.create(
    name="UnstructuredDemo",
    vectorizer_config=wc.Configure.Vectorizer.text2vec_openai(
        model="ada",
        model_version="002",
        type_="text"
    ),
    generative_config=wc.Configure.Generative.openai(
        model="gpt-4"
    ),
    properties=[
        wc.Property(name="type", data_type=wc.DataType.TEXT),
        wc.Property(name="element_id", data_type=wc.DataType.TEXT, skip_vectorization=True),
        wc.Property(name="text", data_type=wc.DataType.TEXT),
        wc.Property(name="embeddings", data_type=wc.DataType.NUMBER_ARRAY, skip_vectorization=True),
    ]
)








from unstructured_ingest.connector.fsspec.s3 import S3AccessConfig, S3WriteConfig, SimpleS3Config
from unstructured_ingest.connector.local import SimpleLocalConfig
from unstructured_ingest.interfaces import (
    ChunkingConfig,
    EmbeddingConfig,
    PartitionConfig,
    ProcessorConfig,
    ReadConfig,
)
from unstructured_ingest.runner import LocalRunner
from unstructured_ingest.runner.writers.base_writer import Writer
from unstructured_ingest.runner.writers.fsspec.s3 import (
    S3Writer,
)

from unstructured_ingest.connector.local import SimpleLocalConfig
from unstructured_ingest.connector.weaviate import (
    SimpleWeaviateConfig,
    WeaviateAccessConfig,
    WeaviateWriteConfig,
)
from unstructured_ingest.interfaces import (
    ChunkingConfig,
    EmbeddingConfig,
    PartitionConfig,
    ProcessorConfig,
    ReadConfig,
)
from unstructured_ingest.runner import LocalRunner
from unstructured_ingest.runner.writers.base_writer import Writer
from unstructured_ingest.runner.writers.weaviate import (
    WeaviateWriter,
)

import os

from unstructured_ingest.runner.writers.weaviate import (
    WeaviateWriter,
)
from unstructured_ingest.connector.fsspec.s3 import S3AccessConfig, SimpleS3Config
from unstructured_ingest.connector.weaviate import (
    SimpleWeaviateConfig,
    WeaviateAccessConfig,
    WeaviateWriteConfig,
)
from unstructured_ingest.interfaces import (
    ChunkingConfig,
    PartitionConfig,
    ProcessorConfig,
    ReadConfig,
    EmbeddingConfig,
)
from unstructured_ingest.runner import S3Runner
from unstructured_ingest.runner.writers.base_writer import Writer
from unstructured_ingest.runner.writers.weaviate import (
    WeaviateWriter,
)


def get_writer() -> Writer:
    return WeaviateWriter(
        connector_config=SimpleWeaviateConfig(
            access_config=WeaviateAccessConfig(api_key= weaviate_key),
            host_url= weaviate_url,
            class_name="UnstructuredDemo",
        ),
        write_config=WeaviateWriteConfig(),
    )

writer = get_writer()


#output_path = "s3-output"
output_path = "s3://genairag/creditcard"

runner = S3Runner(
    processor_config=ProcessorConfig(
        verbose=True,
        output_dir=output_path,
        num_processes=40,
        reprocess=True,  # Ensures all files are reprocessed
    ),
    read_config=ReadConfig(),
    partition_config=PartitionConfig(
        partition_by_api=True,
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        partition_endpoint=os.getenv("UNSTRUCTURED_URL"),
    ),
    connector_config=SimpleS3Config(
        access_config=S3AccessConfig(
            key=os.getenv("AWS_KEY"),
            secret=os.getenv("AWS_SECRET"),
        ),
        remote_url="s3://genairag/",
    ),
    chunking_config=ChunkingConfig(
        chunk_elements=True,
        chunking_strategy="by_title",
        max_characters=8192,
        combine_text_under_n_chars=1000,
    ),
    embedding_config=EmbeddingConfig(
        provider="azure-openai",
        api_key=openai.api_key,
        model_name="text-embedding-ada-002",  # Ensure correct model
    ),
    writer=writer,
    writer_kwargs={},
)

runner.run()

2025-01-31 17:59:10,059 MainProcess DEBUG updating download directory to: C:\Users\dhanu.cache\unstructured\ingest\s3\62e5a70d24
2025-01-31 17:59:10,061 MainProcess INFO running pipeline: DocFactory → Reader → Partitioner → Chunker → Embedder → Writer → Copier with config: {“reprocess”: true, “verbose”: true, “work_dir”: “C:\Users\dhanu\.cache\unstructured\ingest\pipeline”, “output_dir”: “s3://genairag/creditcard”, “num_processes”: 40, “raise_on_error”: false}
2025-01-31 17:59:10,212 MainProcess INFO Running doc factory to generate ingest docs. Source connector: {“processor_config”: {“reprocess”: true, “verbose”: true, “work_dir”: “C:\Users\dhanu\.cache\unstructured\ingest\pipeline”, “output_dir”: “s3://genairag/creditcard”, “num_processes”: 40, “raise_on_error”: false}, “read_config”: {“download_dir”: “C:\Users\dhanu\.cache\unstructured\ingest\s3\62e5a70d24”, “re_download”: false, “preserve_downloads”: false, “download_only”: false, “max_docs”: null}, “connector_config”: {“remote_url”: “s3://genairag/”, “uncompress”: false, “recursive”: false, “file_glob”: null, “access_config”: {“anonymous”: false, “endpoint_url”: null, “key”: null, “secret”: null, “token”: null}, “protocol”: “s3”, “path_without_protocol”: “genairag/”, “dir_path”: “genairag”, “file_path”: “”}}
2025-01-31 17:59:10,229 MainProcess INFO processing 4 docs via 40 processes
2025-01-31 17:59:10,233 MainProcess INFO calling Reader with 4 docs
2025-01-31 17:59:10,236 MainProcess INFO Running source node to download data associated with ingest docs
2025-01-31 17:59:18,125 MainProcess INFO calling Partitioner with 4 docs
2025-01-31 17:59:18,127 MainProcess INFO Running partition node to extract content from json files. Config: {“pdf_infer_table_structure”: false, “strategy”: “auto”, “ocr_languages”: null, “encoding”: null, “additional_partition_args”: {}, “skip_infer_table_types”: null, “fields_include”: [“element_id”, “text”, “type”, “metadata”, “embeddings”], “flatten_metadata”: false, “metadata_exclude”: , “metadata_include”: , “partition_endpoint”: “https://api.unstructuredapp.io”, “partition_by_api”: true, “api_key”: “", “hi_res_model_name”: null}, partition kwargs: {}]
2025-01-31 17:59:18,130 MainProcess INFO creating C:\Users\dhanu.cache\unstructured\ingest\pipeline\partitioned
2025-01-31 17:59:38,178 MainProcess INFO calling Chunker with 4 docs
2025-01-31 17:59:38,178 MainProcess INFO Running chunking node. Chunking config: {“chunking_strategy”: “by_title”, “combine_text_under_n_chars”: 1000, “include_orig_elements”: null, “max_characters”: 8192, “multipage_sections”: null, “new_after_n_chars”: null, “overlap”: null, “overlap_all”: null}]
2025-01-31 17:59:38,180 MainProcess INFO creating C:\Users\dhanu.cache\unstructured\ingest\pipeline\chunked
2025-01-31 17:59:47,338 MainProcess INFO calling Embedder with 4 docs
2025-01-31 17:59:47,340 MainProcess INFO Running embedding node. Embedding config: {“provider”: “azure-openai”, “api_key”: "
”, “model_name”: “text-embedding-ada-002”, “aws_access_key_id”: null, “aws_secret_access_key”: null, “aws_region”: null}]
2025-01-31 17:59:47,343 MainProcess INFO creating C:\Users\dhanu.cache\unstructured\ingest\pipeline\embedded.py
2025-01-31 17:59:49,345 MainProcess INFO no files to process after Embedder

Hi @DhanushKumar_R,

I see that you’re using this integration:

:link: GitHub - Unstructured Ingest

I haven’t come across this integration before, but it looks interesting. You might need a deeper look to fully understand how it interacts with Weaviate, as the error itself doesn’t seem to be coming from Weaviate DB.

“MainProcess INFO no files to process after Embedder”, I did try to dig over the error that the Embedder step didn’t receive any files to process, but no solid information and the ingest pipeline typically involves several steps, including indexing, downloading, partitioning, chunking, embedding, and uploading.

You may need to add debugging and print out some logging along the code to try to narrow it down.

@DudaNogueira, what do you think about this?

Regards,
Mohamed Shahin
Weaviate Support