In this series we have been working on building our own Local Document QA application. So far we have implemented stella_en_1.5B_v5 as our Embedding model, wrote our own mini vector store and in our previous post we integrated a model for document layout analysis and text extraction from .pdf files. Then we set the groundwork for our text generation with a LLaMA 3.x model.

In this post we glue them together and build our query-to-answer flow.

Series Snapshot
  • Part 1: we implement Embedding generation from text data. We used Stella_en_1.5B_v5 and it’s Candle Transformers’ implementation as the embedding model and used the crate text-splitter to split our text into meaningful chunks.
  • Part 2: we build our own mini Vector Store inspired by Spotify’s ANNOY.
  • Part 3: we code up a pipeline to analyze and extract text from .pdf files and also set the foundation for text generation with a LLaMA Model.
  • Part 4 (this): we work on the retrieve-and-answer flow from our corpus.
  • Part 5: we implement and evaluate some techniques for a better RAG.

TL; DR

Github

Output

Note: This video has been sped up

The App

The struct App will represent the state of our application and bind all the independent components we have built so far together.

src-tauri/src/app.rs
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// code omitted

pub enum Event {
    Search((String, SearchConfig, Window)),
    Index((PathBuf, Window)),
}

#[derive(Clone)]
pub struct App {
    gen: Arc<Mutex<Option<Generator>>>,
    send: Sender<Event>,
    store: Arc<RwLock<Store>>,
    embed: Arc<Mutex<Embed>>,
    modeldir: PathBuf,
}

With the enum Event we represent the operations supported by our app. We initialize the struct App with a tauri::async_runtime::channel (mpsc channel) to facilitate communication from tauri handler to our backend App enabling non-blocking operations, the Sender is maintained with the App state while a listener on the Receiver of the channel acts as an executor of incoming commands.

src-tauri/src/app.rs
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// code omitted

impl App {
    /// Create a new instance of the app
    pub fn new(appdir: &Path, models_dir: &Path) -> Result<Self> {
        let storage_dir = appdir.join("store");
        if !storage_dir.is_dir() {
            create_dir_all(&storage_dir)?;
        }

        let (send, recv) = channel::<Event>(32);
        let store = Arc::new(RwLock::new(Store::load_from_file(
            storage_dir.as_path(),
            None,
            None,
        )?));
        let embed = Arc::new(Mutex::new(Embed::new(Path::new("../models"))?));

        let app = Self {
            gen: Arc::new(Mutex::new(None)),
            send,
            store,
            embed,
            modeldir: models_dir.to_path_buf(),
        };

        let arced = Arc::new(app.clone());
        tauri::async_runtime::spawn(async move {
            Self::listen(arced, recv).await;
        });

        Ok(app)
    }

    /// A method to `send` events/ commands to the app state
    pub async fn send(&self, e: Event) -> Result<()> {
        self.send.send(e).await?;

        Ok(())
    }


    // The internal listner - executes incoming tasks received
    async fn listen(app: Arc<Self>, recv: Receiver<Event>) {
        let mut recv = recv;
        while let Some(evt) = recv.recv().await {
            match evt {
                Event::Search((qry, cfg, w)) => {
                    if let Err(e) = app.search(&qry, &cfg, &w).await {
                        eprintln!("Error while searching: {e:?}");
                    }
                }
                Event::Index((dir, w)) => {
                    if let Err(e) = app.index(dir.as_path(), &w).await {
                        eprintln!("Error while indexing {dir:?}: {e:?}");
                    }
                }
            }
        }
    }

    // Triggers indexing workflow
    async fn index(&self, path: &Path, w: &Window) -> Result<()> {
        // we'll work on this in the next section: Phase I: Indexing

        todo!()
    }

    // Triggers the `search / QA` workflow
    async fn search(&self, qry: &str, cfg: &SearchConfig, res_send: &Window) -> Result<()> {
        // We'll tackle this in detail in Phase II: Search
        todo!()
    }

}

Phase I: Indexing

The first step of our RAG is to build the database of documents to retrieve from. For this project the flow is as follows:

  1. User selects a directory containing a bunch of documents - .pdf and .txt files
  2. The tauri client emits the directory to the backend
  3. The backend then creates an estimate of the number of pages to be processed and emits the estimate back to the client
  4. The backend starts processing the files - with .pdf files requiring Layout Analysis and extraction
  5. The progress is emitted to the client

Note the repeated use of emit in the flow above, the process of parsing will happen asynchronously, leaving client free to execute other tasks; in our case is showing the progress.

Here’s the fn index( .. ) which serves as the entry point for the indexing operation.

src-tauri/src/lib.rs
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// imports omitted

// Learn more about Tauri commands at https://tauri.app/v1/guides/features/command
#[tauri::command]
async fn index(window: Window, app: tauri::State<'_, App>, dir: &str) -> Result<(), String> {
    let selected = PathBuf::from_str(dir).map_err(|e| e.to_string())?;
    if !selected.is_dir() {
        return Err(format!("Selected `{dir}` is not a valid directory"));
    }

    app.send(app::Event::Index((selected, window)))
        .await
        .map_err(|e| e.to_string())?;

    Ok(())
}

Our tauri command handler accepts a tauri Window instance which is the current open window, the struct App state and the path to the user selected directory. When fired, the command handler emits an event triggering the indexing flow. Let’s elaborate on the method index( .. ).

src-tauri/src/lib.rs
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
// code omitted
impl App {
    // methods omitted
    // Triggers indexing workflow
    async fn index(&self, path: &Path, w: &Window) -> Result<()> {
        {
            // Drop generator module to save some memory
            let has_gen = { self.gen.lock().await.is_some() };

            if has_gen {
                let mut l = self.gen.lock().await;
                *l = None;
            }
        }

        println!("Initializing indexing ..");
        let mut to_index = vec![];
        // Create list of files
        path.read_dir()?
            .filter_map(|f| {
                if let Ok(p) = f {
                    if p.metadata().map_or(false, |f| f.is_file()) {
                        Some(p)
                    } else {
                        None
                    }
                } else {
                    None
                }
            })
            .for_each(|f| {
                let path = f.path();
                if let Some(ext) = path.extension() {
                    if ext == "txt" || ext == "pdf" {
                        to_index.push(path);
                    }
                }
            });

        let mut indexing = IndexStatus {
            msg: "Indexing",
            progress: 0.,
            pages: 0,
            files: to_index.len(),
        };

        Self::send_event(w, OpResult::Indexing(indexing.clone())).await?;

        let mut f2t = vec![];
        // For simplicity let's assume that processing each page is 50% of the total processing
        {
            indexing.msg = "Analyzing page layout and extracting pages!";
            let (send, recv) = std::sync::mpsc::channel();
            let model_dir = self.modeldir.clone();
            let to_index = to_index;

            std::thread::spawn(move || {
                let model_dir = model_dir;
                let to_index = to_index;

                let extractor = Extractor::new(&model_dir, &to_index[..]).unwrap();
                extractor.estimate(send.clone());

                if let Err(e) = extractor.extract(send.clone()) {
                    eprintln!("App.index: trying to call extract: {e:?}");
                }
            });

            let mut pagesparsed = 0.;

            while let Ok(d) = recv.recv() {
                match d {
                    ExtractorEvt::Estimate(p) => {
                        indexing.pages = p;
                        Self::send_event(w, OpResult::Indexing(indexing.clone())).await?;
                    }
                    ExtractorEvt::Page => {
                        pagesparsed += 1.;
                        indexing.progress = pagesparsed / indexing.pages as f32 * 50.;
                        Self::send_event(w, OpResult::Indexing(indexing.clone())).await?;
                    }
                    ExtractorEvt::Data(d) => match d {
                        Ok(d) => {
                            f2t = d.concat();
                        }
                        Err(e) => {
                            eprintln!("App.index: error while parsing data: {e:?}");
                        }
                    },
                }
            }
        }

        let chunks = f2t.len() as f32;
        let mut chunks_done = 0.;

        indexing.msg = "Chunking, encoding and embedding extracted data!";

        let mut embed = self.embed.lock().await;

        let mut data = vec![];
        for (txt, f) in f2t.iter() {
            let t = embed
                .split_text_and_encode(txt)
                .iter()
                .map(|(s, t)| (s.to_owned(), t.to_owned(), f.to_owned()))
                .collect::<Vec<_>>();

            data = [data, t].concat();

            chunks_done += 1.;
            indexing.progress = ((chunks_done / chunks) * 50.) + 50.;

            Self::send_event(w, OpResult::Indexing(indexing.clone())).await?;
        }

        let mut data = data.drain(..);

        let mut writer = self.store.write().await;
        let (mut f, _) = writer.files()?;

        writer.insert(&mut f, &mut data)?;

        println!("Indexing done ..");

        Ok(())
    }
}

Let’s break this down:

  • In the first part of this method, we remove the LLM from memory to save some RAM/VRAM, indexing and searching don’t happen in parallel!
  • Then we populate the files that need to be indexed in the to_index variable based on some simple conditions.
  • We emit an event for the client to let it know of commencement of indexing op.
  • Next, we start the layout analysis and extraction part of the flow, while at it we also keep sending status for the client to display some progress.
  • Once all the text is extracted, we assume a 50% completion of the total progress and we proceed to chunking and embedding generation part of our indexing flow.
Getting the Client right!

I’m deliberately avoiding detailing of the client-side flow which involves:

  • allowing users to select a directory
  • launching the tauri command to index the directory
  • listening to the events emitted by the backend and showing some progress

My version works with Svelte and if you need some inspirations for the same, refer this code begining with the function folderPicker().

Let’s run our app with cargo tauri dev.

Note
This video has been sped up! The indexing process is slow and takes about 10mins in my system. The slowest part of the indexing seems to be the Tensor::save() operation and we can’t do a lot over there!

That concludes our Indexing, on subsequent launches we should be ready with our indexed data ready for search.

Now we’ll set the foundation for our Search, which will comprise of the following steps:

  • The client will issue a command containing the user entered search query string along with some configurations.
  • The tauri Command handler will emit this to the backend App.
  • After pre-processing the input query, the ANN search would retrieve the relevant documents and data chunks.
  • We’ll create the search context from the retrieved document and pass it on to the Generator along with the query.
  • Results of the generator would be sent across to the client.
  • We keep emitting some status to the client across all the steps for a better user experience.

Here’s our simple tauri Command handler:

src-tauri/src/lib.rs
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// .. code omitted

#[tauri::command]
async fn search(
    window: Window,
    app: tauri::State<'_, App>,
    qry: &str,
    cfg: SearchConfig,
) -> Result<(), String> {
    app.send(app::Event::Search((qry.to_string(), cfg, window)))
        .await
        .map_err(|e| e.to_string())?;

    Ok(())
}

Now, we’ll define a bunch of structs to represent our SearchConfig and results.

src-tauri/src/app.rs
249
250
251
252
253
254
255
256
257
258
259
260
261
// code omitted ..

#[derive(Debug, Deserialize)]
pub struct SearchConfig {
    max_result: usize,
    ann_cutoff: Option<f32>,
}

#[derive(Debug, Serialize, Default)]
pub struct SearchResult {
    qry: String,
    answer: String,
}

For now, our struct SearchConfig is simple, it contains the ann_cutoff and max_result parameters we introduced while working on the ANN Search. This is good enough for now, but we’ll soon need this to be a lot more elaborate for better results and control.

Let’s code up a struct StatusData to represent the status messages we intend to emit to the client.

src-tauri/src/app.rs
39
40
41
42
43
44
45
46
47
// code omitted ..

#[derive(Clone, Default, Serialize)]
pub struct StatusData {
    head: String,
    hint: Option<String>,
    body: String,
    time_s: Option<f32>,
}

Let’s modify our method App::search( .. ) to do some actual work!

src-tauri/src/app.rs
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
// .. Code omitted ..

impl App {
    // A separate method to spawn and send events, the main thread seems to be blocking
    async fn send_event(window: &Window, msg: OpResult) -> Result<()> {
        println!("Sending event to window!");
        match msg {
            OpResult::Status(s) => window.emit("status", &s)?,
            OpResult::Result(s) => window.emit("result", &s)?,
            OpResult::Error(e) => window.emit("error", &e)?,
            OpResult::Indexing(m) => window.emit("indexing", &m)?,
        }

        Ok(())
    }

    // Trigger the search flow - the search pipeline
    async fn search(&self, qry: &str, cfg: &SearchConfig, res_send: &Window) -> Result<()> {
        let mut final_result = SearchResult {
            qry: qry.to_string(),
            ..Default::default()
        };

        {
            let mut gen = self.gen.lock().await;  
            if gen.is_none() {
                Self::send_event(
                    res_send,
                    OpResult::Status(StatusData {
                        head: "Loading LLaMA ..".to_string(),
                        body: String::new(),
                        ..Default::default()
                    }),
                )
                .await?;

                let start = Instant::now();
                *gen = Some(Generator::new(&self.modeldir, &select_device()?)?);
                Self::send_event(
                    res_send,
                    OpResult::Status(StatusData {
                        head: "LLaMA Ready".to_string(),
                        body: String::new(),
                        time_s: Some((Instant::now() - start).as_secs_f32()),
                        ..Default::default()
                    }),
                )
                .await?;
            }
        }

        let (q_txt, q_tensor) = {
            let queries = vec![qry.to_string()];
            let mut emb = self.embed.lock().await;
            let t = emb.query(&queries)?;

            let tensor = (0..queries.len())
                .map(|i| {
                    t.i(i)
                        .unwrap()
                        .to_device(&candle_core::Device::Cpu)
                        .unwrap()
                        .unsqueeze(0)
                        .unwrap()
                })
                .collect::<Vec<_>>();

            (queries, tensor)
        };

        // Step 2: Approximate nearest neighbor search
        Self::send_event(
            res_send,
            OpResult::Status(StatusData {
                head: "Firing Approx. Nearest Neighbor search".to_string(),
                body: format!(
                    "<b>BM25:</b> false | <b>ANN Cutoff:</b> {}",
                    cfg.ann_cutoff.map_or(0., |c| c)
                ),
                ..Default::default()
            }),
        )
        .await?;

        let store = self.store.read().await;
        let (res, elapsed) = {
            let start = Instant::now();

            let res = store.search(
                &q_tensor,
                &[qry.to_string()],
                cfg.max_result,
                cfg.ann_cutoff,
                cfg.with_bm25,
            )?;

            (res, (Instant::now() - start).as_secs_f32())
        };

        Self::send_event(
            res_send,
            OpResult::Status(StatusData {
                head: format!("ANN Search yielded {} results", res.len()),
                body: String::new(),
                time_s: Some(elapsed),
                ..Default::default()
            }),
        )
        .await?;

        let ctx = {
            let context = res
                .iter()
                .map(|(_, _, txt, _)| {
                    txt.as_str()
                })
                .collect::<Vec<_>>()
                .join("\n\n");

            context.trim().to_string()
        };

        println!("Final Context: {}", ctx);

        if ctx.is_empty() && !cfg.allow_without_evidence {
            return Self::send_event(res_send, OpResult::Error("Nothing found!".to_string())).await;
        }

        // Step 5: Finally the answer
        Self::send_event(
            res_send,
            OpResult::Status(StatusData {
                head: "Generating answer!".to_string(),
                body: String::new(),
                ..Default::default()
            }),
        )
        .await?;

        let (ans, elapsed) = {
            let mut gen = self.gen.lock().await;
            let llm = if let Some(gen) = gen.as_mut() {
                gen
            } else {
                return Err(anyhow!("generator not found"));
            };

            let start = Instant::now();
            let answer = llm.answer(qry, &ctx)?;

            (answer, (Instant::now() - start).as_secs_f32())
        };

        Self::send_event(
            res_send,
            OpResult::Status(StatusData {
                head: "Finally, generated answer!".to_string(),
                body: String::new(),
                time_s: Some(elapsed),
                ..Default::default()
            }),
        )
        .await?;

        final_result.answer = ans.answer().to_string();

        Self::send_event(res_send, OpResult::Result(final_result)).await?;

        Ok(())
    }
}

Breaking down the method App::search( .. ):

  1. We initialize a variable to hold our final QA results.
  2. Then we ensure that the LLM is ready for generation.
  3. Then, we generate the embeddings from the user input query.
  4. We fire the Approximate Nearest Neighbor Search from our store - note that we have already indexed a bunch of documents related to archaeology.
  5. Then we iterate over the ANN search result documents and create a context - a string to be treated as a source of truth for our final answer generation.
  6. We provide the user input query and the context to a method Generator::answer( .. ) for the final answer.
  7. We send the answer to the client and display it.

We have not defined the method Generator::answer( .. ) yet! Let’s code it up.

src-tauri/src/gen.rs
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
// code omitted ..

#[derive(Debug, Deserialize)]
pub struct GeneratedAnswer {
    answer: String,
}

impl GeneratedAnswer {
    pub fn answer(&self) -> &str {
        &self.answer
    }
}

impl Generator {
    /// Given a `query` string and a `context` returns a response
    pub fn answer(&mut self, query: &str, context: &str) -> Result<GeneratedAnswer> {
        let prompt = format!(
"<|start_header_id|>system<|end_header_id|>

You are a context-based question answering AI. You retrieve information from provided context to answer user queries. Based on the provided context, generate a informative, compete, relevant yet concise response to the given query by following the given requirments.<|eot_id|><|start_header_id|>user<|end_header_id|>

Text in the given context are extracted with approximate search of a text corpus. You want to find out information about Query only if present in the given context.


Context:

{context}


Query:

{query}


Requirements:
- Answer must be supported by at least one datapoint in the given context, extract the supporting text along with the associated source id as evidence.
- Use natural language summary for your answer and avoid copying from given context for your answer.
- Truthfully return empty string (\"\") for answer if the given context doesn't contain the answer to the query.
- Do not write an introduction or summary.
- Your response must be a valid json of the following Schema.


Schema:

{{
    evidence: Array<{{source: int, text: string}}>,
    answer: string
}}
    
Your answer must be a valid json.<|eot_id|><|start_header_id|>assistant<|end_header_id|>

{{
    \"evidence\": [{{\"source\": "
        );

        let mut tk = self.generate(&prompt)?;

        if !tk.ends_with("}") {
            tk = format!("{tk}}}");
        }

        // println!("Op:\n{{\n  \"evidence\": [{{\"source\": {tk}");

        serde_json::from_str(format!("{{\n  \"evidence\": [{{\"source\": {tk}").as_str()).map_err(
            |e| {
                println!("Answer.Error:\n{tk}");
                anyhow!(e)
            },
        )
    }
}

We simply write an elaborate prompt (I got this written by Meta LLaMA chat version provided with WhatsApp😉) and nudge the model to generate a JSON for struct GeneratedAnswer.

Justifying `struct GeneratedAnswer`
A simple string would have sufficed instead of a JSON for our answer, but we intend to do a lot more in the next post with the answers generated - targeting accuracy and relevance of answers. Just ignore the evidence related details for now!

Let’s take the first version of our RAG for a spin!

cargo tauri dev -- --release

And!! Wallah .. our search works! 🥳🥳🎉🎉

On the right-hand side of the video you can see that I have println!()ed the final context being sent to the method Generator::answer( .. ) API and the search result shows up along with the logs on the left!

Note
  • This video has been sped up by 2x! Including the loading of the model (which would typically be a one-time event in real world usage) this flow took a total of ~50s on my Mac M1 Pro hardware.
  • Ignore the warning message displayed on the client side, they are just placeholders for stuff that we are going to be working on next!

The search results are very good and relevant for the first attempt! But there are a couple of observations and open questions:

  1. We don’t know if the results are accurate - we have not identified the sources of the answer!
  2. What happens if the context size blows up beyond the max sequence length of the model or the hardware we are using?
  3. What can we do to give richer context and more information for the generator?

Next Steps:

We’ll work on tackling some of these questions and more in Part 5 of this series. We’ll look at some of the SOTA retrieval and generation techniques outlined in this repo RAG Techniques and try and implement some of them in our app.

Ride along …