Part 4: Desktop App for Document QA with RAG - Indexing and Search
A DIY style step-by-step guide to building your own cutting-edge GenAI-powered document QA desktop app with RAG. In Part 4 we conclude the Indexing and a basic Search flow.
September 2, 2024 · 16 min · 3266 words
In this series we have been working on building our own Local Document QA application. So far we have implemented stella_en_1.5B_v5 as our Embedding model, wrote our own mini vector store and in our previous post we integrated a model for document layout analysis and text extraction from .pdf files. Then we set the groundwork for our text generation with a LLaMA 3.x model.
In this post we glue them together and build our query-to-answer flow.
With the enum Event we represent the operations supported by our app. We initialize the struct App with a tauri::async_runtime::channel (mpsc channel) to facilitate communication from tauri handler to our backend App enabling non-blocking operations, the Sender is maintained with the App state while a listener on the Receiver of the channel acts as an executor of incoming commands.
// code omitted
implApp{/// Create a new instance of the app
pubfnnew(appdir: &Path,models_dir: &Path)-> Result<Self>{letstorage_dir=appdir.join("store");if!storage_dir.is_dir(){create_dir_all(&storage_dir)?;}let(send,recv)=channel::<Event>(32);letstore=Arc::new(RwLock::new(Store::load_from_file(storage_dir.as_path(),None,None,)?));letembed=Arc::new(Mutex::new(Embed::new(Path::new("../models"))?));letapp=Self{gen: Arc::new(Mutex::new(None)),send,store,embed,modeldir: models_dir.to_path_buf(),};letarced=Arc::new(app.clone());tauri::async_runtime::spawn(asyncmove{Self::listen(arced,recv).await;});Ok(app)}/// A method to `send` events/ commands to the app state
pubasyncfnsend(&self,e: Event)-> Result<()>{self.send.send(e).await?;Ok(())}// The internal listner - executes incoming tasks received
asyncfnlisten(app: Arc<Self>,recv: Receiver<Event>){letmutrecv=recv;whileletSome(evt)=recv.recv().await{matchevt{Event::Search((qry,cfg,w))=>{ifletErr(e)=app.search(&qry,&cfg,&w).await{eprintln!("Error while searching: {e:?}");}}Event::Index((dir,w))=>{ifletErr(e)=app.index(dir.as_path(),&w).await{eprintln!("Error while indexing {dir:?}: {e:?}");}}}}}// Triggers indexing workflow
asyncfnindex(&self,path: &Path,w: &Window)-> Result<()>{// we'll work on this in the next section: Phase I: Indexing
todo!()}// Triggers the `search / QA` workflow
asyncfnsearch(&self,qry: &str,cfg: &SearchConfig,res_send: &Window)-> Result<()>{// We'll tackle this in detail in Phase II: Search
todo!()}}
The first step of our RAG is to build the database of documents to retrieve from. For this project the flow is as follows:
User selects a directory containing a bunch of documents - .pdf and .txt files
The tauri clientemits the directory to the backend
The backend then creates an estimate of the number of pages to be processed and emits the estimate back to the client
The backend starts processing the files - with .pdf files requiring Layout Analysis and extraction
The progress is emitted to the client
Note the repeated use of emit in the flow above, the process of parsing will happen asynchronously, leaving client free to execute other tasks;
in our case is showing the progress.
Here’s the fn index( .. ) which serves as the entry point for the indexing operation.
src-tauri/src/lib.rs
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// imports omitted
// Learn more about Tauri commands at https://tauri.app/v1/guides/features/command
#[tauri::command]asyncfnindex(window: Window,app: tauri::State<'_,App>,dir: &str)-> Result<(),String>{letselected=PathBuf::from_str(dir).map_err(|e|e.to_string())?;if!selected.is_dir(){returnErr(format!("Selected `{dir}` is not a valid directory"));}app.send(app::Event::Index((selected,window))).await.map_err(|e|e.to_string())?;Ok(())}
Our tauri command handler accepts a tauri Window instance which is the current open window, the struct App state and the path to the user selected directory. When fired, the command handleremits an event triggering the indexing flow. Let’s elaborate on the method index( .. ).
// code omitted
implApp{// methods omitted
// Triggers indexing workflow
asyncfnindex(&self,path: &Path,w: &Window)-> Result<()>{{// Drop generator module to save some memory
lethas_gen={self.gen.lock().await.is_some()};ifhas_gen{letmutl=self.gen.lock().await;*l=None;}}println!("Initializing indexing ..");letmutto_index=vec![];// Create list of files
path.read_dir()?.filter_map(|f|{ifletOk(p)=f{ifp.metadata().map_or(false,|f|f.is_file()){Some(p)}else{None}}else{None}}).for_each(|f|{letpath=f.path();ifletSome(ext)=path.extension(){ifext=="txt"||ext=="pdf"{to_index.push(path);}}});letmutindexing=IndexStatus{msg: "Indexing",progress: 0.,pages: 0,files: to_index.len(),};Self::send_event(w,OpResult::Indexing(indexing.clone())).await?;letmutf2t=vec![];// For simplicity let's assume that processing each page is 50% of the total processing
{indexing.msg="Analyzing page layout and extracting pages!";let(send,recv)=std::sync::mpsc::channel();letmodel_dir=self.modeldir.clone();letto_index=to_index;std::thread::spawn(move||{letmodel_dir=model_dir;letto_index=to_index;letextractor=Extractor::new(&model_dir,&to_index[..]).unwrap();extractor.estimate(send.clone());ifletErr(e)=extractor.extract(send.clone()){eprintln!("App.index: trying to call extract: {e:?}");}});letmutpagesparsed=0.;whileletOk(d)=recv.recv(){matchd{ExtractorEvt::Estimate(p)=>{indexing.pages=p;Self::send_event(w,OpResult::Indexing(indexing.clone())).await?;}ExtractorEvt::Page=>{pagesparsed+=1.;indexing.progress=pagesparsed/indexing.pagesasf32*50.;Self::send_event(w,OpResult::Indexing(indexing.clone())).await?;}ExtractorEvt::Data(d)=>matchd{Ok(d)=>{f2t=d.concat();}Err(e)=>{eprintln!("App.index: error while parsing data: {e:?}");}},}}}letchunks=f2t.len()asf32;letmutchunks_done=0.;indexing.msg="Chunking, encoding and embedding extracted data!";letmutembed=self.embed.lock().await;letmutdata=vec![];for(txt,f)inf2t.iter(){lett=embed.split_text_and_encode(txt).iter().map(|(s,t)|(s.to_owned(),t.to_owned(),f.to_owned())).collect::<Vec<_>>();data=[data,t].concat();chunks_done+=1.;indexing.progress=((chunks_done/chunks)*50.)+50.;Self::send_event(w,OpResult::Indexing(indexing.clone())).await?;}letmutdata=data.drain(..);letmutwriter=self.store.write().await;let(mutf,_)=writer.files()?;writer.insert(&mutf,&mutdata)?;println!("Indexing done ..");Ok(())}}
Let’s break this down:
In the first part of this method, we remove the LLM from memory to save some RAM/VRAM, indexing and searching don’t happen in parallel!
Then we populate the files that need to be indexed in the to_index variable based on some simple conditions.
We emit an event for the client to let it know of commencement of indexing op.
Next, we start the layout analysis and extraction part of the flow, while at it we also keep sending status for the client to display some progress.
Once all the text is extracted, we assume a 50% completion of the total progress and we proceed to chunking and embedding generation part of our indexing flow.
Getting the Client right!
I’m deliberately avoiding detailing of the client-side flow which involves:
allowing users to select a directory
launching the tauri command to index the directory
listening to the events emitted by the backend and showing some progress
My version works with Svelte and if you need some inspirations for the same, refer this code begining with the function folderPicker().
Let’s run our app with cargo tauri dev.
Note
This video has been sped up! The indexing process is slow and takes about 10mins in my system. The slowest part of the indexing seems to be the Tensor::save() operation and we can’t do a lot over there!
That concludes our Indexing, on subsequent launches we should be ready with our indexed data ready for search.
For now, our struct SearchConfig is simple, it contains the ann_cutoff and max_result parameters we introduced while working on the ANN Search. This is good enough for now, but we’ll soon need this to be a lot more elaborate for better results and control.
Let’s code up a struct StatusData to represent the status messages we intend to emit to the client.
// .. Code omitted ..
implApp{// A separate method to spawn and send events, the main thread seems to be blocking
asyncfnsend_event(window: &Window,msg: OpResult)-> Result<()>{println!("Sending event to window!");matchmsg{OpResult::Status(s)=>window.emit("status",&s)?,OpResult::Result(s)=>window.emit("result",&s)?,OpResult::Error(e)=>window.emit("error",&e)?,OpResult::Indexing(m)=>window.emit("indexing",&m)?,}Ok(())}// Trigger the search flow - the search pipeline
asyncfnsearch(&self,qry: &str,cfg: &SearchConfig,res_send: &Window)-> Result<()>{letmutfinal_result=SearchResult{qry: qry.to_string(),..Default::default()};{letmutgen=self.gen.lock().await;ifgen.is_none(){Self::send_event(res_send,OpResult::Status(StatusData{head: "Loading LLaMA ..".to_string(),body: String::new(),..Default::default()}),).await?;letstart=Instant::now();*gen=Some(Generator::new(&self.modeldir,&select_device()?)?);Self::send_event(res_send,OpResult::Status(StatusData{head: "LLaMA Ready".to_string(),body: String::new(),time_s: Some((Instant::now()-start).as_secs_f32()),..Default::default()}),).await?;}}let(q_txt,q_tensor)={letqueries=vec![qry.to_string()];letmutemb=self.embed.lock().await;lett=emb.query(&queries)?;lettensor=(0..queries.len()).map(|i|{t.i(i).unwrap().to_device(&candle_core::Device::Cpu).unwrap().unsqueeze(0).unwrap()}).collect::<Vec<_>>();(queries,tensor)};// Step 2: Approximate nearest neighbor search
Self::send_event(res_send,OpResult::Status(StatusData{head: "Firing Approx. Nearest Neighbor search".to_string(),body: format!("<b>BM25:</b> false | <b>ANN Cutoff:</b> {}",cfg.ann_cutoff.map_or(0.,|c|c)),..Default::default()}),).await?;letstore=self.store.read().await;let(res,elapsed)={letstart=Instant::now();letres=store.search(&q_tensor,&[qry.to_string()],cfg.max_result,cfg.ann_cutoff,cfg.with_bm25,)?;(res,(Instant::now()-start).as_secs_f32())};Self::send_event(res_send,OpResult::Status(StatusData{head: format!("ANN Search yielded {} results",res.len()),body: String::new(),time_s: Some(elapsed),..Default::default()}),).await?;letctx={letcontext=res.iter().map(|(_,_,txt,_)|{txt.as_str()}).collect::<Vec<_>>().join("\n\n");context.trim().to_string()};println!("Final Context: {}",ctx);ifctx.is_empty()&&!cfg.allow_without_evidence{returnSelf::send_event(res_send,OpResult::Error("Nothing found!".to_string())).await;}// Step 5: Finally the answer
Self::send_event(res_send,OpResult::Status(StatusData{head: "Generating answer!".to_string(),body: String::new(),..Default::default()}),).await?;let(ans,elapsed)={letmutgen=self.gen.lock().await;letllm=ifletSome(gen)=gen.as_mut(){gen}else{returnErr(anyhow!("generator not found"));};letstart=Instant::now();letanswer=llm.answer(qry,&ctx)?;(answer,(Instant::now()-start).as_secs_f32())};Self::send_event(res_send,OpResult::Status(StatusData{head: "Finally, generated answer!".to_string(),body: String::new(),time_s: Some(elapsed),..Default::default()}),).await?;final_result.answer=ans.answer().to_string();Self::send_event(res_send,OpResult::Result(final_result)).await?;Ok(())}}
Breaking down the method App::search( .. ):
We initialize a variable to hold our final QA results.
Then we ensure that the LLM is ready for generation.
Then, we generate the embeddings from the user input query.
We fire the Approximate Nearest Neighbor Search from our store - note that we have already indexed a bunch of documents related to archaeology.
Then we iterate over the ANN search result documents and create a context - a string to be treated as a source of truth for our final answer generation.
We provide the user input query and the context to a method Generator::answer( .. ) for the final answer.
We send the answer to the client and display it.
We have not defined the method Generator::answer( .. ) yet! Let’s code it up.
// code omitted ..
#[derive(Debug, Deserialize)]pubstructGeneratedAnswer{answer: String,}implGeneratedAnswer{pubfnanswer(&self)-> &str{&self.answer}}implGenerator{/// Given a `query` string and a `context` returns a response
pubfnanswer(&mutself,query: &str,context: &str)-> Result<GeneratedAnswer>{letprompt=format!("<|start_header_id|>system<|end_header_id|>
You are a context-based question answering AI. You retrieve information from provided context to answer user queries. Based on the provided context, generate a informative, compete, relevant yet concise response to the given query by following the given requirments.<|eot_id|><|start_header_id|>user<|end_header_id|>
Text in the given context are extracted with approximate search of a text corpus. You want to find out information about Query only if present in the given context.
Context:
{context}Query:
{query}Requirements:
- Answer must be supported by at least one datapoint in the given context, extract the supporting text along with the associated source id as evidence.
- Use natural language summary for your answer and avoid copying from given context for your answer.
- Truthfully return empty string (\"\") for answer if the given context doesn't contain the answer to the query.
- Do not write an introduction or summary.
- Your response must be a valid json of the following Schema.
Schema:
{{ evidence: Array<{{source: int, text: string}}>,
answer: string
}}Your answer must be a valid json.<|eot_id|><|start_header_id|>assistant<|end_header_id|>
{{\"evidence\": [{{\"source\": ");letmuttk=self.generate(&prompt)?;if!tk.ends_with("}"){tk=format!("{tk}}}");}// println!("Op:\n{{\n \"evidence\": [{{\"source\": {tk}");
serde_json::from_str(format!("{{\n\"evidence\": [{{\"source\": {tk}").as_str()).map_err(|e|{println!("Answer.Error:\n{tk}");anyhow!(e)},)}}
We simply write an elaborate prompt (I got this written by Meta LLaMA chat version provided with WhatsApp😉) and nudge the model to generate a JSON for struct GeneratedAnswer.
Justifying `struct GeneratedAnswer`
A simple string would have sufficed instead of a JSON for our answer, but we intend to do a lot more in the next post with the answers generated - targeting accuracy and relevance of answers. Just ignore the evidence related details for now!
Let’s take the first version of our RAG for a spin!
cargo tauri dev -- --release
And!! Wallah .. our search works! 🥳🥳🎉🎉
On the right-hand side of the video you can see that I have println!()ed the final context being sent to the method Generator::answer( .. ) API and the search result shows up along with the logs on the left!
Note
This video has been sped up by 2x! Including the loading of the model (which would typically be a one-time event in real world usage) this flow took a total of ~50s on my Mac M1 Pro hardware.
Ignore the warning message displayed on the client side, they are just placeholders for stuff that we are going to be working on next!
The search results are very good and relevant for the first attempt! But there are a couple of observations and open questions:
We don’t know if the results are accurate - we have not identified the sources of the answer!
What happens if the context size blows up beyond the max sequence length of the model or the hardware we are using?
What can we do to give richer context and more information for the generator?
We’ll work on tackling some of these questions and more in Part 5 of this series. We’ll look at some of the SOTA retrieval and generation techniques outlined in this repo RAG Techniques and try and implement some of them in our app.