Happy new year
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2009 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 #include <stdlib.h>
21 #include <string.h>
22 #include <yaz/log.h>
23 #include <yaz/xmalloc.h>
24 #include <idzebra/isamb.h>
25 #include <assert.h>
26
27 #ifndef ISAMB_DEBUG
28 #define ISAMB_DEBUG 0
29 #endif
30
31
32 #define ISAMB_MAJOR_VERSION 3
33 #define ISAMB_MINOR_VERSION_NO_ROOT 0
34 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
35
36 struct ISAMB_head {
37     zint first_block;
38     zint last_block;
39     zint free_list;
40     zint no_items;
41     int block_size;
42     int block_max;
43     int block_offset;
44 };
45
46 /* if 1, upper nodes items are encoded; 0 if not encoded */
47 #define INT_ENCODE 1
48
49 /* maximum size of encoded buffer */
50 #define DST_ITEM_MAX 5000
51
52 /* max page size for _any_ isamb use */
53 #define ISAMB_MAX_PAGE 32768
54
55 #define ISAMB_MAX_LEVEL 10
56 /* approx 2*max page + max size of item */
57 #define DST_BUF_SIZE (2*ISAMB_MAX_PAGE+DST_ITEM_MAX+100)
58
59 /* should be maximum block size of multiple thereof */
60 #define ISAMB_CACHE_ENTRY_SIZE ISAMB_MAX_PAGE
61
62 /* CAT_MAX: _must_ be power of 2 */
63 #define CAT_MAX 4
64 #define CAT_MASK (CAT_MAX-1)
65 /* CAT_NO: <= CAT_MAX */
66 #define CAT_NO 4
67
68 /* Smallest block size */
69 #define ISAMB_MIN_SIZE 32
70 /* Size factor */
71 #define ISAMB_FAC_SIZE 4
72
73 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
74 #define ISAMB_PTR_CODEC 1
75
76 struct ISAMB_cache_entry {
77     ISAM_P pos;
78     unsigned char *buf;
79     int dirty;
80     int hits;
81     struct ISAMB_cache_entry *next;
82 };
83
84 struct ISAMB_file {
85     BFile bf;
86     int head_dirty;
87     struct ISAMB_head head;
88     struct ISAMB_cache_entry *cache_entries;
89 };
90
91 struct ISAMB_s {
92     BFiles bfs;
93     ISAMC_M *method;
94
95     struct ISAMB_file *file;
96     int no_cat;
97     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
98     int log_io;        /* log level for bf_read/bf_write calls */
99     int log_freelist;  /* log level for freelist handling */
100     zint skipped_numbers; /* on a leaf node */
101     zint returned_numbers; 
102     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
103     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
104     zint number_of_int_splits;
105     zint number_of_leaf_splits;
106     int enable_int_count; /* whether we count nodes (or not) */
107     int cache_size; /* size of blocks to cache (if cache=1) */
108     int minor_version;
109     zint root_ptr;
110 };
111
112 struct ISAMB_block {
113     ISAM_P pos;
114     int cat;
115     int size;
116     int leaf;
117     int dirty;
118     int deleted;
119     int offset;
120     zint no_items;  /* number of nodes in this + children */
121     char *bytes;
122     char *cbuf;
123     unsigned char *buf;
124     void *decodeClientData;
125     int log_rw;
126 };
127
128 struct ISAMB_PP_s {
129     ISAMB isamb;
130     ISAM_P pos;
131     int level;
132     int maxlevel; /* total depth */
133     zint total_size;
134     zint no_blocks;
135     zint skipped_numbers; /* on a leaf node */
136     zint returned_numbers; 
137     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
138     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
139     struct ISAMB_block **block;
140     int scope;  /* on what level we forward */
141 };
142
143
144 #define encode_item_len encode_ptr
145 #if ISAMB_PTR_CODEC
146 static void encode_ptr(char **dst, zint pos)
147 {
148     unsigned char *bp = (unsigned char*) *dst;
149
150     while (pos > 127)
151     {
152         *bp++ = (unsigned char) (128 | (pos & 127));
153         pos = pos >> 7;
154     }
155     *bp++ = (unsigned char) pos;
156     *dst = (char *) bp;
157 }
158 #else
159 static void encode_ptr(char **dst, zint pos)
160 {
161     memcpy(*dst, &pos, sizeof(pos));
162     (*dst) += sizeof(pos);
163 }
164 #endif
165
166 #define decode_item_len decode_ptr
167 #if ISAMB_PTR_CODEC
168 static void decode_ptr(const char **src, zint *pos)
169 {
170     zint d = 0;
171     unsigned char c;
172     unsigned r = 0;
173
174     while (((c = *(const unsigned char *)((*src)++)) & 128))
175     {
176         d += ((zint) (c & 127) << r);
177         r += 7;
178     }
179     d += ((zint) c << r);
180     *pos = d;
181 }
182 #else
183 static void decode_ptr(const char **src, zint *pos)
184 {
185     memcpy(pos, *src, sizeof(*pos));
186     (*src) += sizeof(*pos);
187 }
188 #endif
189
190
191 void isamb_set_int_count(ISAMB b, int v)
192 {
193     b->enable_int_count = v;
194 }
195
196 void isamb_set_cache_size(ISAMB b, int v)
197 {
198     b->cache_size = v;
199 }
200
201 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
202                   int cache, int no_cat, int *sizes, int use_root_ptr)
203 {
204     ISAMB isamb = xmalloc(sizeof(*isamb));
205     int i;
206
207     assert(no_cat <= CAT_MAX);
208
209     isamb->bfs = bfs;
210     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
211     memcpy(isamb->method, method, sizeof(*method));
212     isamb->no_cat = no_cat;
213     isamb->log_io = 0;
214     isamb->log_freelist = 0;
215     isamb->cache = cache;
216     isamb->skipped_numbers = 0;
217     isamb->returned_numbers = 0;
218     isamb->number_of_int_splits = 0;
219     isamb->number_of_leaf_splits = 0;
220     isamb->enable_int_count = 1;
221     isamb->cache_size = 40;
222
223     if (use_root_ptr)
224         isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
225     else
226         isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
227
228     isamb->root_ptr = 0;
229
230     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
231         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
232
233     if (cache == -1)
234     {
235         yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
236     }
237     else
238     {
239         assert(cache == 0 || cache == 1);
240     }
241     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
242
243     for (i = 0; i < isamb->no_cat; i++)
244     {
245         isamb->file[i].bf = 0;
246         isamb->file[i].head_dirty = 0;
247         isamb->file[i].cache_entries = 0;
248     }
249
250     for (i = 0; i < isamb->no_cat; i++)
251     {
252         char fname[DST_BUF_SIZE];
253         char hbuf[DST_BUF_SIZE];
254
255         sprintf(fname, "%s%c", name, i+'A');
256         if (cache)
257             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
258                                         writeflag);
259         else
260             isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
261
262         if (!isamb->file[i].bf)
263         {
264             isamb_close(isamb);
265             return 0;
266         }
267
268         /* fill-in default values (for empty isamb) */
269         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
270         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
271         isamb->file[i].head.block_size = sizes[i];
272         assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
273 #if ISAMB_PTR_CODEC
274         if (i == isamb->no_cat-1 || sizes[i] > 128)
275             isamb->file[i].head.block_offset = 8;
276         else
277             isamb->file[i].head.block_offset = 4;
278 #else
279         isamb->file[i].head.block_offset = 11;
280 #endif
281         isamb->file[i].head.block_max =
282             sizes[i] - isamb->file[i].head.block_offset;
283         isamb->file[i].head.free_list = 0;
284         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
285         {
286             /* got header assume "isamb"major minor len can fit in 16 bytes */
287             zint zint_tmp;
288             int major, minor, len, pos = 0;
289             int left;
290             const char *src = 0;
291             if (memcmp(hbuf, "isamb", 5))
292             {
293                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
294                 isamb_close(isamb);
295                 return 0;
296             }
297             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
298             {
299                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
300                 isamb_close(isamb);
301                 return 0;
302             }
303             if (major != ISAMB_MAJOR_VERSION)
304             {
305                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
306                         fname, major, ISAMB_MAJOR_VERSION);
307                 isamb_close(isamb);
308                 return 0;
309             }
310             for (left = len - sizes[i]; left > 0; left = left - sizes[i])
311             {
312                 pos++;
313                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
314                 {
315                     yaz_log(YLOG_WARN, "truncated isamb header for " 
316                             "file=%s len=%d pos=%d",
317                             fname, len, pos);
318                     isamb_close(isamb);
319                     return 0;
320                 }
321             }
322             src = hbuf + 16;
323             decode_ptr(&src, &isamb->file[i].head.first_block);
324             decode_ptr(&src, &isamb->file[i].head.last_block);
325             decode_ptr(&src, &zint_tmp);
326             isamb->file[i].head.block_size = (int) zint_tmp;
327             decode_ptr(&src, &zint_tmp);
328             isamb->file[i].head.block_max = (int) zint_tmp;
329             decode_ptr(&src, &isamb->file[i].head.free_list);
330             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
331                 decode_ptr(&src, &isamb->root_ptr);
332         }
333         assert(isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
334         /* must rewrite the header if root ptr is in use (bug #1017) */
335         if (use_root_ptr && writeflag)
336             isamb->file[i].head_dirty = 1;
337         else
338             isamb->file[i].head_dirty = 0;
339         assert(isamb->file[i].head.block_size == sizes[i]);
340     }
341 #if ISAMB_DEBUG
342     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
343 #endif
344     return isamb;
345 }
346
347 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
348                  int cache)
349 {
350     int sizes[CAT_NO];
351     int i, b_size = ISAMB_MIN_SIZE;
352     
353     for (i = 0; i<CAT_NO; i++)
354     {
355         sizes[i] = b_size;
356         b_size = b_size * ISAMB_FAC_SIZE;
357     }
358     return isamb_open2(bfs, name, writeflag, method, cache,
359                        CAT_NO, sizes, 0);
360 }
361
362 static void flush_blocks(ISAMB b, int cat)
363 {
364     while (b->file[cat].cache_entries)
365     {
366         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
367         b->file[cat].cache_entries = ce_this->next;
368
369         if (ce_this->dirty)
370         {
371             yaz_log(b->log_io, "bf_write: flush_blocks");
372             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
373         }
374         xfree(ce_this->buf);
375         xfree(ce_this);
376     }
377 }
378
379 static int cache_block(ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
380 {
381     int cat = (int) (pos&CAT_MASK);
382     int off = (int) (((pos/CAT_MAX) & 
383                       (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
384                      * b->file[cat].head.block_size);
385     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
386     int no = 0;
387     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
388
389     if (!b->cache)
390         return 0;
391
392     assert(ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
393     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
394     {
395         ce_last = ce;
396         if ((*ce)->pos == norm)
397         {
398             ce_this = *ce;
399             *ce = (*ce)->next;   /* remove from list */
400             
401             ce_this->next = b->file[cat].cache_entries;  /* move to front */
402             b->file[cat].cache_entries = ce_this;
403             
404             if (wr)
405             {
406                 memcpy(ce_this->buf + off, userbuf, 
407                        b->file[cat].head.block_size);
408                 ce_this->dirty = 1;
409             }
410             else
411                 memcpy(userbuf, ce_this->buf + off,
412                        b->file[cat].head.block_size);
413             return 1;
414         }
415     }
416     if (no >= b->cache_size)
417     {
418         assert(ce_last && *ce_last);
419         ce_this = *ce_last;
420         *ce_last = 0;  /* remove the last entry from list */
421         if (ce_this->dirty)
422         {
423             yaz_log(b->log_io, "bf_write: cache_block");
424             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
425         }
426         xfree(ce_this->buf);
427         xfree(ce_this);
428     }
429     ce_this = xmalloc(sizeof(*ce_this));
430     ce_this->next = b->file[cat].cache_entries;
431     b->file[cat].cache_entries = ce_this;
432     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
433     ce_this->pos = norm;
434     yaz_log(b->log_io, "bf_read: cache_block");
435     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
436         memset(ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
437     if (wr)
438     {
439         memcpy(ce_this->buf + off, userbuf, b->file[cat].head.block_size);
440         ce_this->dirty = 1;
441     }
442     else
443     {
444         ce_this->dirty = 0;
445         memcpy(userbuf, ce_this->buf + off, b->file[cat].head.block_size);
446     }
447     return 1;
448 }
449
450
451 void isamb_close(ISAMB isamb)
452 {
453     int i;
454     for (i = 0; isamb->accessed_nodes[i]; i++)
455         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
456                 ZINT_FORMAT" skipped",
457                 i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
458     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
459             "skipped "ZINT_FORMAT,
460             isamb->skipped_numbers, isamb->returned_numbers);
461
462     for (i = 0; i<isamb->no_cat; i++)
463     {
464         flush_blocks(isamb, i);
465         if (isamb->file[i].head_dirty)
466         {
467             char hbuf[DST_BUF_SIZE];
468             int major = ISAMB_MAJOR_VERSION;
469             int len = 16;
470             char *dst = hbuf + 16;
471             int pos = 0, left;
472             int b_size = isamb->file[i].head.block_size;
473
474             encode_ptr(&dst, isamb->file[i].head.first_block);
475             encode_ptr(&dst, isamb->file[i].head.last_block);
476             encode_ptr(&dst, isamb->file[i].head.block_size);
477             encode_ptr(&dst, isamb->file[i].head.block_max);
478             encode_ptr(&dst, isamb->file[i].head.free_list);
479
480             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
481                 encode_ptr(&dst, isamb->root_ptr);
482
483             memset(dst, '\0', b_size); /* ensure no random bytes are written */
484
485             len = dst - hbuf;
486
487             /* print exactly 16 bytes (including trailing 0) */
488             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
489                     isamb->minor_version, len);
490
491             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
492
493             for (left = len - b_size; left > 0; left = left - b_size)
494             {
495                 pos++;
496                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
497             }
498         }
499         if (isamb->file[i].bf)
500             bf_close (isamb->file[i].bf);
501     }
502     xfree(isamb->file);
503     xfree(isamb->method);
504     xfree(isamb);
505 }
506
507 /* open_block: read one block at pos.
508    Decode leading sys bytes .. consisting of
509    Offset:Meaning
510    0: leader byte, != 0 leaf, == 0, non-leaf
511    1-2: used size of block
512    3-7*: number of items and all children
513    
514    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
515    of items. We can thus have at most 2^40 nodes. 
516 */
517 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
518 {
519     int cat = (int) (pos&CAT_MASK);
520     const char *src;
521     int offset = b->file[cat].head.block_offset;
522     struct ISAMB_block *p;
523     if (!pos)
524         return 0;
525     p = xmalloc(sizeof(*p));
526     p->pos = pos;
527     p->cat = (int) (pos & CAT_MASK);
528     p->buf = xmalloc(b->file[cat].head.block_size);
529     p->cbuf = 0;
530
531     if (!cache_block (b, pos, p->buf, 0))
532     {
533         yaz_log(b->log_io, "bf_read: open_block");
534         if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
535         {
536             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
537                     (long) pos, (long) pos/CAT_MAX);
538             zebra_exit("isamb:open_block");
539         }
540     }
541     p->bytes = (char *)p->buf + offset;
542     p->leaf = p->buf[0];
543     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
544     if (p->size < 0)
545     {
546         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
547                 p->size, pos);
548     }
549     assert(p->size >= 0);
550     src = (char*) p->buf + 3;
551     decode_ptr(&src, &p->no_items);
552
553     p->offset = 0;
554     p->dirty = 0;
555     p->deleted = 0;
556     p->decodeClientData = (*b->method->codec.start)();
557     return p;
558 }
559
560 struct ISAMB_block *new_block(ISAMB b, int leaf, int cat)
561 {
562     struct ISAMB_block *p;
563
564     p = xmalloc(sizeof(*p));
565     p->buf = xmalloc(b->file[cat].head.block_size);
566
567     if (!b->file[cat].head.free_list)
568     {
569         zint block_no;
570         block_no = b->file[cat].head.last_block++;
571         p->pos = block_no * CAT_MAX + cat;
572     }
573     else
574     {
575         p->pos = b->file[cat].head.free_list;
576         assert((p->pos & CAT_MASK) == cat);
577         if (!cache_block(b, p->pos, p->buf, 0))
578         {
579             yaz_log(b->log_io, "bf_read: new_block");
580             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
581             {
582                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
583                         (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
584                 zebra_exit("isamb:new_block");
585             }
586         }
587         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
588                 cat, p->pos/CAT_MAX);
589         memcpy(&b->file[cat].head.free_list, p->buf, sizeof(zint));
590     }
591     p->cat = cat;
592     b->file[cat].head_dirty = 1;
593     memset(p->buf, 0, b->file[cat].head.block_size);
594     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
595     p->leaf = leaf;
596     p->size = 0;
597     p->dirty = 1;
598     p->deleted = 0;
599     p->offset = 0;
600     p->no_items = 0;
601     p->decodeClientData = (*b->method->codec.start)();
602     return p;
603 }
604
605 struct ISAMB_block *new_leaf(ISAMB b, int cat)
606 {
607     return new_block(b, 1, cat);
608 }
609
610
611 struct ISAMB_block *new_int(ISAMB b, int cat)
612 {
613     return new_block(b, 0, cat);
614 }
615
616 static void check_block(ISAMB b, struct ISAMB_block *p)
617 {
618     assert(b); /* mostly to make the compiler shut up about unused b */
619     if (p->leaf)
620     {
621         ;
622     }
623     else
624     {
625         /* sanity check */
626         char *startp = p->bytes;
627         const char *src = startp;
628         char *endp = p->bytes + p->size;
629         ISAM_P pos;
630         void *c1 = (*b->method->codec.start)();
631             
632         decode_ptr(&src, &pos);
633         assert((pos&CAT_MASK) == p->cat);
634         while (src != endp)
635         {
636 #if INT_ENCODE
637             char file_item_buf[DST_ITEM_MAX];
638             char *file_item = file_item_buf;
639             (*b->method->codec.reset)(c1);
640             (*b->method->codec.decode)(c1, &file_item, &src);
641 #else
642             zint item_len;
643             decode_item_len(&src, &item_len);
644             assert(item_len > 0 && item_len < 128);
645             src += item_len;
646 #endif
647             decode_ptr(&src, &pos);
648             if ((pos&CAT_MASK) != p->cat)
649             {
650                 assert((pos&CAT_MASK) == p->cat);
651             }
652         }
653         (*b->method->codec.stop)(c1);
654     }
655 }
656
657 void close_block(ISAMB b, struct ISAMB_block *p)
658 {
659     if (!p)
660         return;
661     if (p->deleted)
662     {
663         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
664                 p->pos, p->cat, p->pos/CAT_MAX);
665         memcpy(p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
666         b->file[p->cat].head.free_list = p->pos;
667         if (!cache_block(b, p->pos, p->buf, 1))
668         {
669             yaz_log(b->log_io, "bf_write: close_block (deleted)");
670             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
671         }
672     }
673     else if (p->dirty)
674     {
675         int offset = b->file[p->cat].head.block_offset;
676         int size = p->size + offset;
677         char *dst =  (char*)p->buf + 3;
678         assert(p->size >= 0);
679         
680         /* memset becuase encode_ptr usually does not write all bytes */
681         memset(p->buf, 0, b->file[p->cat].head.block_offset);
682         p->buf[0] = p->leaf;
683         p->buf[1] = size & 255;
684         p->buf[2] = size >> 8;
685         encode_ptr(&dst, p->no_items);
686         check_block(b, p);
687         if (!cache_block(b, p->pos, p->buf, 1))
688         {
689             yaz_log(b->log_io, "bf_write: close_block");
690             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
691         }
692     }
693     (*b->method->codec.stop)(p->decodeClientData);
694     xfree(p->buf);
695     xfree(p);
696 }
697
698 int insert_sub(ISAMB b, struct ISAMB_block **p,
699                void *new_item, int *mode,
700                ISAMC_I *stream,
701                struct ISAMB_block **sp,
702                void *sub_item, int *sub_size,
703                const void *max_item);
704
705 int insert_int(ISAMB b, struct ISAMB_block *p, void *lookahead_item,
706                int *mode,
707                ISAMC_I *stream, struct ISAMB_block **sp,
708                void *split_item, int *split_size, const void *last_max_item)
709 {
710     char *startp = p->bytes;
711     const char *src = startp;
712     char *endp = p->bytes + p->size;
713     ISAM_P pos;
714     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
715     char sub_item[DST_ITEM_MAX];
716     int sub_size;
717     int more = 0;
718     zint diff_terms = 0;
719     void *c1 = (*b->method->codec.start)();
720
721     *sp = 0;
722
723     assert(p->size >= 0);
724     decode_ptr(&src, &pos);
725     while (src != endp)
726     {
727         int d;
728         const char *src0 = src;
729 #if INT_ENCODE
730         char file_item_buf[DST_ITEM_MAX];
731         char *file_item = file_item_buf;
732         (*b->method->codec.reset)(c1);
733         (*b->method->codec.decode)(c1, &file_item, &src);
734         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
735         if (d > 0)
736         {
737             sub_p1 = open_block(b, pos);
738             assert(sub_p1);
739             diff_terms -= sub_p1->no_items;
740             more = insert_sub(b, &sub_p1, lookahead_item, mode,
741                               stream, &sub_p2, 
742                               sub_item, &sub_size, file_item_buf);
743             diff_terms += sub_p1->no_items;
744             src = src0;
745             break;
746         }
747 #else
748         zint item_len;
749         decode_item_len(&src, &item_len);
750         d = (*b->method->compare_item)(src, lookahead_item);
751         if (d > 0)
752         {
753             sub_p1 = open_block(b, pos);
754             assert(sub_p1);
755             diff_terms -= sub_p1->no_items;
756             more = insert_sub(b, &sub_p1, lookahead_item, mode,
757                               stream, &sub_p2, 
758                               sub_item, &sub_size, src);
759             diff_terms += sub_p1->no_items;
760             src = src0;
761             break;
762         }
763         src += item_len;
764 #endif
765         decode_ptr(&src, &pos);
766     }
767     if (!sub_p1)
768     {
769         /* we reached the end. So lookahead > last item */
770         sub_p1 = open_block(b, pos);
771         assert(sub_p1);
772         diff_terms -= sub_p1->no_items;
773         more = insert_sub(b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
774                           sub_item, &sub_size, last_max_item);
775         diff_terms += sub_p1->no_items;
776     }
777     if (sub_p2)
778         diff_terms += sub_p2->no_items;
779     if (diff_terms)
780     {
781         p->dirty = 1;
782         p->no_items += diff_terms;
783     }
784     if (sub_p2)
785     {
786         /* there was a split - must insert pointer in this one */
787         char dst_buf[DST_BUF_SIZE];
788         char *dst = dst_buf;
789 #if INT_ENCODE
790         const char *sub_item_ptr = sub_item;
791 #endif
792         assert(sub_size < DST_ITEM_MAX && sub_size > 1);
793
794         memcpy(dst, startp, src - startp);
795                 
796         dst += src - startp;
797
798 #if INT_ENCODE
799         (*b->method->codec.reset)(c1);
800         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
801 #else
802         encode_item_len(&dst, sub_size);      /* sub length and item */
803         memcpy(dst, sub_item, sub_size);
804         dst += sub_size;
805 #endif
806
807         encode_ptr(&dst, sub_p2->pos);   /* pos */
808
809         if (endp - src)                   /* remaining data */
810         {
811             memcpy(dst, src, endp - src);
812             dst += endp - src;
813         }
814         p->size = dst - dst_buf;
815         assert(p->size >= 0);
816
817         if (p->size <= b->file[p->cat].head.block_max)
818         {
819             /* it fits OK in this block */
820             memcpy(startp, dst_buf, dst - dst_buf);
821
822             close_block(b, sub_p2);
823         }
824         else
825         {
826             /* must split _this_ block as well .. */
827             struct ISAMB_block *sub_p3;
828 #if INT_ENCODE
829             char file_item_buf[DST_ITEM_MAX];
830             char *file_item = file_item_buf;
831 #else
832             zint split_size_tmp;
833 #endif
834             zint no_items_first_half = 0;
835             int p_new_size;
836             const char *half;
837             src = dst_buf;
838             endp = dst;
839             
840             b->number_of_int_splits++;
841
842             p->dirty = 1;
843             close_block(b, sub_p2);
844
845             half = src + b->file[p->cat].head.block_size/2;
846             decode_ptr(&src, &pos);
847
848             if (b->enable_int_count)
849             {
850                 /* read sub block so we can get no_items for it */
851                 sub_p3 = open_block(b, pos);
852                 no_items_first_half += sub_p3->no_items;
853                 close_block(b, sub_p3);
854             }
855
856             while (src <= half)
857             {
858 #if INT_ENCODE
859                 file_item = file_item_buf;
860                 (*b->method->codec.reset)(c1);
861                 (*b->method->codec.decode)(c1, &file_item, &src);
862 #else
863                 decode_item_len(&src, &split_size_tmp);
864                 *split_size = (int) split_size_tmp;
865                 src += *split_size;
866 #endif
867                 decode_ptr(&src, &pos);
868
869                 if (b->enable_int_count)
870                 {
871                     /* read sub block so we can get no_items for it */
872                     sub_p3 = open_block(b, pos);
873                     no_items_first_half += sub_p3->no_items;
874                     close_block(b, sub_p3);
875                 }
876             }
877             /*  p is first half */
878             p_new_size = src - dst_buf;
879             memcpy(p->bytes, dst_buf, p_new_size);
880
881 #if INT_ENCODE
882             file_item = file_item_buf;
883             (*b->method->codec.reset)(c1);
884             (*b->method->codec.decode)(c1, &file_item, &src);
885             *split_size = file_item - file_item_buf;
886             memcpy(split_item, file_item_buf, *split_size);
887 #else
888             decode_item_len(&src, &split_size_tmp);
889             *split_size = (int) split_size_tmp;
890             memcpy(split_item, src, *split_size);
891             src += *split_size;
892 #endif
893             /*  *sp is second half */
894             *sp = new_int(b, p->cat);
895             (*sp)->size = endp - src;
896             memcpy((*sp)->bytes, src, (*sp)->size);
897
898             p->size = p_new_size;
899
900             /*  adjust no_items in first&second half */
901             (*sp)->no_items = p->no_items - no_items_first_half;
902             p->no_items = no_items_first_half;
903         }
904         p->dirty = 1;
905     }
906     close_block(b, sub_p1);
907     (*b->method->codec.stop)(c1);
908     return more;
909 }
910
911 int insert_leaf(ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
912                 int *lookahead_mode, ISAMC_I *stream,
913                 struct ISAMB_block **sp2,
914                 void *sub_item, int *sub_size,
915                 const void *max_item)
916 {
917     struct ISAMB_block *p = *sp1;
918     char *endp = 0;
919     const char *src = 0;
920     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
921     int new_size;
922     void *c1 = (*b->method->codec.start)();
923     void *c2 = (*b->method->codec.start)();
924     int more = 1;
925     int quater = b->file[b->no_cat-1].head.block_max / 4;
926     char *mid_cut = dst_buf + quater * 2;
927     char *tail_cut = dst_buf + quater * 3;
928     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
929     char *half1 = 0;
930     char *half2 = 0;
931     char cut_item_buf[DST_ITEM_MAX];
932     int cut_item_size = 0;
933     int no_items = 0;    /* number of items (total) */
934     int no_items_1 = 0;  /* number of items (first half) */
935     int inserted_dst_bytes = 0;
936
937     if (p && p->size)
938     {
939         char file_item_buf[DST_ITEM_MAX];
940         char *file_item = file_item_buf;
941             
942         src = p->bytes;
943         endp = p->bytes + p->size;
944         (*b->method->codec.decode)(c1, &file_item, &src);
945         while (1)
946         {
947             const char *dst_item = 0; /* resulting item to be inserted */
948             char *lookahead_next;
949             char *dst_0 = dst;
950             int d = -1;
951             
952             if (lookahead_item)
953                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
954             
955             /* d now holds comparison between existing file item and
956                lookahead item 
957                d = 0: equal
958                d > 0: lookahead before file
959                d < 0: lookahead after file
960             */
961             if (d > 0)
962             {
963                 /* lookahead must be inserted */
964                 dst_item = lookahead_item;
965                 /* if this is not an insertion, it's really bad .. */
966                 if (!*lookahead_mode)
967                 {
968                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
969                     assert(*lookahead_mode);
970                 }
971             }
972             else if (d == 0 && *lookahead_mode == 2)
973             {
974                 /* For mode == 2, we insert the new key anyway - even 
975                    though the comparison is 0. */
976                 dst_item = lookahead_item;
977                 p->dirty = 1;
978             }
979             else
980                 dst_item = file_item_buf;
981
982             if (d == 0 && !*lookahead_mode)
983             {
984                 /* it's a deletion and they match so there is nothing
985                    to be inserted anyway .. But mark the thing dirty
986                    (file item was part of input.. The item will not be
987                    part of output */
988                 p->dirty = 1;
989             }
990             else if (!half1 && dst > mid_cut)
991             {
992                 /* we have reached the splitting point for the first time */
993                 const char *dst_item_0 = dst_item;
994                 half1 = dst; /* candidate for splitting */
995
996                 /* encode the resulting item */
997                 (*b->method->codec.encode)(c2, &dst, &dst_item);
998                 
999                 cut_item_size = dst_item - dst_item_0;
1000                 assert(cut_item_size > 0);
1001                 memcpy(cut_item_buf, dst_item_0, cut_item_size);
1002                 
1003                 half2 = dst;
1004                 no_items_1 = no_items;
1005                 no_items++;
1006             }
1007             else
1008             {
1009                 /* encode the resulting item */
1010                 (*b->method->codec.encode)(c2, &dst, &dst_item);
1011                 no_items++;
1012             }
1013
1014             /* now move "pointers" .. result has been encoded .. */
1015             if (d > 0)  
1016             {
1017                 /* we must move the lookahead pointer */
1018
1019                 inserted_dst_bytes += (dst - dst_0);
1020                 if (inserted_dst_bytes >=  quater)
1021                     /* no more room. Mark lookahead as "gone".. */
1022                     lookahead_item = 0;
1023                 else
1024                 {
1025                     /* move it really.. */
1026                     lookahead_next = lookahead_item;
1027                     if (!(*stream->read_item)(stream->clientData,
1028                                               &lookahead_next,
1029                                               lookahead_mode))
1030                     {
1031                         /* end of stream reached: no "more" and no lookahead */
1032                         lookahead_item = 0;
1033                         more = 0;
1034                     }
1035                     if (lookahead_item && max_item &&
1036                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1037                     {
1038                         /* the lookahead goes beyond what we allow in this
1039                            leaf. Mark it as "gone" */
1040                         lookahead_item = 0;
1041                     }
1042                     
1043                     p->dirty = 1;
1044                 }
1045             }
1046             else if (d == 0)
1047             {
1048                 /* exact match .. move both pointers */
1049
1050                 lookahead_next = lookahead_item;
1051                 if (!(*stream->read_item)(stream->clientData,
1052                                           &lookahead_next, lookahead_mode))
1053                 {
1054                     lookahead_item = 0;
1055                     more = 0;
1056                 }
1057                 if (src == endp)
1058                     break;  /* end of file stream reached .. */
1059                 file_item = file_item_buf; /* move file pointer */
1060                 (*b->method->codec.decode)(c1, &file_item, &src);
1061             }
1062             else
1063             {
1064                 /* file pointer must be moved */
1065                 if (src == endp)
1066                     break;
1067                 file_item = file_item_buf;
1068                 (*b->method->codec.decode)(c1, &file_item, &src);
1069             }
1070         }
1071     }
1072
1073     /* this loop runs when we are "appending" to a leaf page. That is
1074        either it's empty (new) or all file items have been read in
1075        previous loop */
1076
1077     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1078     while (lookahead_item)
1079     {
1080         char *dst_item;
1081         const char *src = lookahead_item;
1082         char *dst_0 = dst;
1083         
1084         /* if we have a lookahead item, we stop if we exceed the value of it */
1085         if (max_item &&
1086             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1087         {
1088             /* stop if we have reached the value of max item */
1089             break;
1090         }
1091         if (!*lookahead_mode)
1092         {
1093             /* this is append. So a delete is bad */
1094             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1095             assert(*lookahead_mode);
1096         }
1097         else if (!half1 && dst > tail_cut)
1098         {
1099             const char *src_0 = src;
1100             half1 = dst; /* candidate for splitting */
1101             
1102             (*b->method->codec.encode)(c2, &dst, &src);
1103             
1104             cut_item_size = src - src_0;
1105             assert(cut_item_size > 0);
1106             memcpy(cut_item_buf, src_0, cut_item_size);
1107             
1108             no_items_1 = no_items;
1109             half2 = dst;
1110         }
1111         else
1112             (*b->method->codec.encode)(c2, &dst, &src);
1113
1114         if (dst > maxp)
1115         {
1116             dst = dst_0;
1117             break;
1118         }
1119         no_items++;
1120         if (p)
1121             p->dirty = 1;
1122         dst_item = lookahead_item;
1123         if (!(*stream->read_item)(stream->clientData, &dst_item,
1124                                   lookahead_mode))
1125         {
1126             lookahead_item = 0;
1127             more = 0;
1128         }
1129     }
1130     new_size = dst - dst_buf;
1131     if (p && p->cat != b->no_cat-1 && 
1132         new_size > b->file[p->cat].head.block_max)
1133     {
1134         /* non-btree block will be removed */
1135         p->deleted = 1;
1136         close_block(b, p);
1137         /* delete it too!! */
1138         p = 0; /* make a new one anyway */
1139     }
1140     if (!p)
1141     {   /* must create a new one */
1142         int i;
1143         for (i = 0; i < b->no_cat; i++)
1144             if (new_size <= b->file[i].head.block_max)
1145                 break;
1146         if (i == b->no_cat)
1147             i = b->no_cat - 1;
1148         p = new_leaf(b, i);
1149     }
1150     if (new_size > b->file[p->cat].head.block_max)
1151     {
1152         char *first_dst;
1153         const char *cut_item = cut_item_buf;
1154
1155         assert(half1);
1156         assert(half2);
1157
1158         assert(cut_item_size > 0);
1159         
1160         /* first half */
1161         p->size = half1 - dst_buf;
1162         assert(p->size <=  b->file[p->cat].head.block_max);
1163         memcpy(p->bytes, dst_buf, half1 - dst_buf);
1164         p->no_items = no_items_1;
1165
1166         /* second half */
1167         *sp2 = new_leaf(b, p->cat);
1168
1169         (*b->method->codec.reset)(c2);
1170
1171         b->number_of_leaf_splits++;
1172
1173         first_dst = (*sp2)->bytes;
1174
1175         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1176
1177         memcpy(first_dst, half2, dst - half2);
1178
1179         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1180         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1181         (*sp2)->no_items = no_items - no_items_1;
1182         (*sp2)->dirty = 1;
1183         p->dirty = 1;
1184         memcpy(sub_item, cut_item_buf, cut_item_size);
1185         *sub_size = cut_item_size;
1186     }
1187     else
1188     {
1189         memcpy(p->bytes, dst_buf, dst - dst_buf);
1190         p->size = new_size;
1191         p->no_items = no_items;
1192     }
1193     (*b->method->codec.stop)(c1);
1194     (*b->method->codec.stop)(c2);
1195     *sp1 = p;
1196     return more;
1197 }
1198
1199 int insert_sub(ISAMB b, struct ISAMB_block **p, void *new_item,
1200                int *mode,
1201                ISAMC_I *stream,
1202                struct ISAMB_block **sp,
1203                void *sub_item, int *sub_size,
1204                const void *max_item)
1205 {
1206     if (!*p || (*p)->leaf)
1207         return insert_leaf(b, p, new_item, mode, stream, sp, sub_item, 
1208                            sub_size, max_item);
1209     else
1210         return insert_int(b, *p, new_item, mode, stream, sp, sub_item,
1211                           sub_size, max_item);
1212 }
1213
1214 int isamb_unlink(ISAMB b, ISAM_P pos)
1215 {
1216     struct ISAMB_block *p1;
1217
1218     if (!pos)
1219         return 0;
1220     p1 = open_block(b, pos);
1221     p1->deleted = 1;
1222     if (!p1->leaf)
1223     {
1224         zint sub_p;
1225         const char *src = p1->bytes + p1->offset;
1226 #if INT_ENCODE
1227         void *c1 = (*b->method->codec.start)();
1228 #endif
1229         decode_ptr(&src, &sub_p);
1230         isamb_unlink(b, sub_p);
1231         
1232         while (src != p1->bytes + p1->size)
1233         {
1234 #if INT_ENCODE
1235             char file_item_buf[DST_ITEM_MAX];
1236             char *file_item = file_item_buf;
1237             (*b->method->codec.reset)(c1);
1238             (*b->method->codec.decode)(c1, &file_item, &src);
1239 #else
1240             zint item_len;
1241             decode_item_len(&src, &item_len);
1242             src += item_len;
1243 #endif
1244             decode_ptr(&src, &sub_p);
1245             isamb_unlink(b, sub_p);
1246         }
1247 #if INT_ENCODE
1248         (*b->method->codec.stop)(c1);
1249 #endif
1250     }
1251     close_block(b, p1);
1252     return 0;
1253 }
1254
1255 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1256 {
1257     char item_buf[DST_ITEM_MAX];
1258     char *item_ptr;
1259     int i_mode;
1260     int more;
1261     int must_delete = 0;
1262
1263     if (b->cache < 0)
1264     {
1265         int more = 1;
1266         while (more)
1267         {
1268             item_ptr = item_buf;
1269             more =
1270                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1271         }
1272         *pos = 1;
1273         return;
1274     }
1275     item_ptr = item_buf;
1276     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1277     while (more)
1278     {
1279         struct ISAMB_block *p = 0, *sp = 0;
1280         char sub_item[DST_ITEM_MAX];
1281         int sub_size;
1282         
1283         if (*pos)
1284             p = open_block(b, *pos);
1285         more = insert_sub(b, &p, item_buf, &i_mode, stream, &sp,
1286                           sub_item, &sub_size, 0);
1287         if (sp)
1288         {    /* increase level of tree by one */
1289             struct ISAMB_block *p2 = new_int(b, p->cat);
1290             char *dst = p2->bytes + p2->size;
1291 #if INT_ENCODE
1292             void *c1 = (*b->method->codec.start)();
1293             const char *sub_item_ptr = sub_item;
1294 #endif
1295
1296             encode_ptr(&dst, p->pos);
1297             assert(sub_size < DST_ITEM_MAX && sub_size > 1);
1298 #if INT_ENCODE
1299             (*b->method->codec.reset)(c1);
1300             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1301 #else
1302             encode_item_len(&dst, sub_size);
1303             memcpy(dst, sub_item, sub_size);
1304             dst += sub_size;
1305 #endif
1306             encode_ptr(&dst, sp->pos);
1307             
1308             p2->size = dst - p2->bytes;
1309             p2->no_items = p->no_items + sp->no_items;
1310             *pos = p2->pos;  /* return new super page */
1311             close_block(b, sp);
1312             close_block(b, p2);
1313 #if INT_ENCODE
1314             (*b->method->codec.stop)(c1);
1315 #endif
1316         }
1317         else
1318         {
1319             *pos = p->pos;   /* return current one (again) */
1320         }
1321         if (p->no_items == 0)
1322             must_delete = 1;
1323         else
1324             must_delete = 0;
1325         close_block(b, p);
1326     }
1327     if (must_delete)
1328     {
1329         isamb_unlink(b, *pos);
1330         *pos = 0;
1331     }
1332 }
1333
1334 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1335 {
1336     ISAMB_PP pp = xmalloc(sizeof(*pp));
1337     int i;
1338
1339     assert(pos);
1340
1341     pp->isamb = isamb;
1342     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1343
1344     pp->pos = pos;
1345     pp->level = 0;
1346     pp->maxlevel = 0;
1347     pp->total_size = 0;
1348     pp->no_blocks = 0;
1349     pp->skipped_numbers = 0;
1350     pp->returned_numbers = 0;
1351     pp->scope = scope;
1352     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1353         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1354     while (1)
1355     {
1356         struct ISAMB_block *p = open_block(isamb, pos);
1357         const char *src = p->bytes + p->offset;
1358         pp->block[pp->level] = p;
1359
1360         pp->total_size += p->size;
1361         pp->no_blocks++;
1362         if (p->leaf)
1363             break;
1364         decode_ptr(&src, &pos);
1365         p->offset = src - p->bytes;
1366         pp->level++;
1367         pp->accessed_nodes[pp->level]++; 
1368     }
1369     pp->block[pp->level+1] = 0;
1370     pp->maxlevel = pp->level;
1371     if (level)
1372         *level = pp->level;
1373     return pp;
1374 }
1375
1376 ISAMB_PP isamb_pp_open(ISAMB isamb, ISAM_P pos, int scope)
1377 {
1378     return isamb_pp_open_x(isamb, pos, 0, scope);
1379 }
1380
1381 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1382 {
1383     int i;
1384     if (!pp)
1385         return;
1386     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1387             "skipped "ZINT_FORMAT,
1388             pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1389     for (i = pp->maxlevel; i>=0; i--)
1390         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1391             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1392                     ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1393                     pp->accessed_nodes[i], pp->skipped_nodes[i]);
1394     pp->isamb->skipped_numbers += pp->skipped_numbers;
1395     pp->isamb->returned_numbers += pp->returned_numbers;
1396     for (i = pp->maxlevel; i>=0; i--)
1397     {
1398         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1399         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1400     }
1401     if (size)
1402         *size = pp->total_size;
1403     if (blocks)
1404         *blocks = pp->no_blocks;
1405     for (i = 0; i <= pp->level; i++)
1406         close_block(pp->isamb, pp->block[i]);
1407     xfree(pp->block);
1408     xfree(pp);
1409 }
1410
1411 int isamb_block_info(ISAMB isamb, int cat)
1412 {
1413     if (cat >= 0 && cat < isamb->no_cat)
1414         return isamb->file[cat].head.block_size;
1415     return -1;
1416 }
1417
1418 void isamb_pp_close(ISAMB_PP pp)
1419 {
1420     isamb_pp_close_x(pp, 0, 0);
1421 }
1422
1423 /* simple recursive dumper .. */
1424 static void isamb_dump_r(ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1425                          int level)
1426 {
1427     char buf[1024];
1428     char prefix_str[1024];
1429     if (pos)
1430     {
1431         struct ISAMB_block *p = open_block(b, pos);
1432         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1433                 ZINT_FORMAT, level*2, "",
1434                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1435                 p->no_items);
1436         (*pr)(prefix_str);
1437         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1438         if (p->leaf)
1439         {
1440             while (p->offset < p->size)
1441             {
1442                 const char *src = p->bytes + p->offset;
1443                 char *dst = buf;
1444                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1445                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1446                 p->offset = src - (char*) p->bytes;
1447             }
1448             assert(p->offset == p->size);
1449         }
1450         else
1451         {
1452             const char *src = p->bytes + p->offset;
1453             ISAM_P sub;
1454
1455             decode_ptr(&src, &sub);
1456             p->offset = src - (char*) p->bytes;
1457
1458             isamb_dump_r(b, sub, pr, level+1);
1459             
1460             while (p->offset < p->size)
1461             {
1462 #if INT_ENCODE
1463                 char file_item_buf[DST_ITEM_MAX];
1464                 char *file_item = file_item_buf;
1465                 void *c1 = (*b->method->codec.start)();
1466                 (*b->method->codec.decode)(c1, &file_item, &src);
1467                 (*b->method->codec.stop)(c1);
1468                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1469 #else
1470                 zint item_len;
1471                 decode_item_len(&src, &item_len);
1472                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1473                 src += item_len;
1474 #endif
1475                 decode_ptr(&src, &sub);
1476                 
1477                 p->offset = src - (char*) p->bytes;
1478                 
1479                 isamb_dump_r(b, sub, pr, level+1);
1480             }           
1481         }
1482         close_block(b, p);
1483     }
1484 }
1485
1486 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1487 {
1488     isamb_dump_r(b, pos, pr, 0);
1489 }
1490
1491 int isamb_pp_read(ISAMB_PP pp, void *buf)
1492 {
1493     return isamb_pp_forward(pp, buf, 0);
1494 }
1495
1496
1497 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1498 { /* return an estimate of the current position and of the total number of */
1499   /* occureences in the isam tree, based on the current leaf */
1500     assert(total);
1501     assert(current);
1502     
1503     /* if end-of-stream PP may not be leaf */
1504
1505     *total = (double) (pp->block[0]->no_items);
1506     *current = (double) pp->returned_numbers;
1507 #if ISAMB_DEBUG
1508     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1509             ZINT_FORMAT, *current, *total, pp->returned_numbers);
1510 #endif
1511 }
1512
1513 int isamb_pp_forward(ISAMB_PP pp, void *buf, const void *untilb)
1514 {
1515     char *dst = buf;
1516     const char *src;
1517     struct ISAMB_block *p = pp->block[pp->level];
1518     ISAMB b = pp->isamb;
1519     if (!p)
1520         return 0;
1521  again:
1522     while (p->offset == p->size)
1523     {
1524         ISAM_P pos;
1525 #if INT_ENCODE
1526         const char *src_0;
1527         void *c1;
1528         char file_item_buf[DST_ITEM_MAX];
1529         char *file_item = file_item_buf;
1530 #else
1531         zint item_len;
1532 #endif
1533         while (p->offset == p->size)
1534         {
1535             if (pp->level == 0)
1536                 return 0;
1537             close_block(pp->isamb, pp->block[pp->level]);
1538             pp->block[pp->level] = 0;
1539             (pp->level)--;
1540             p = pp->block[pp->level];
1541             assert(!p->leaf);  
1542         }
1543
1544         assert(!p->leaf);
1545         src = p->bytes + p->offset;
1546        
1547 #if INT_ENCODE
1548         c1 = (*b->method->codec.start)();
1549         (*b->method->codec.decode)(c1, &file_item, &src);
1550 #else
1551         decode_ptr(&src, &item_len);
1552         src += item_len;
1553 #endif        
1554         decode_ptr(&src, &pos);
1555         p->offset = src - (char*) p->bytes;
1556
1557         src = p->bytes + p->offset;
1558
1559         while(1)
1560         {
1561             if (!untilb || p->offset == p->size)
1562                 break;
1563             assert(p->offset < p->size);
1564 #if INT_ENCODE
1565             src_0 = src;
1566             file_item = file_item_buf;
1567             (*b->method->codec.reset)(c1);
1568             (*b->method->codec.decode)(c1, &file_item, &src);
1569             if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1570             {
1571                 src = src_0;
1572                 break;
1573             }
1574 #else
1575             decode_item_len(&src, &item_len);
1576             if ((*b->method->compare_item)(untilb, src) < pp->scope)
1577                 break;
1578             src += item_len;
1579 #endif
1580             decode_ptr(&src, &pos);
1581             p->offset = src - (char*) p->bytes;
1582         }
1583
1584         pp->level++;
1585
1586         while (1)
1587         {
1588             pp->block[pp->level] = p = open_block(pp->isamb, pos);
1589
1590             pp->total_size += p->size;
1591             pp->no_blocks++;
1592             
1593             if (p->leaf) 
1594             {
1595                 break;
1596             }
1597             
1598             src = p->bytes + p->offset;
1599             while(1)
1600             {
1601                 decode_ptr(&src, &pos);
1602                 p->offset = src - (char*) p->bytes;
1603                 
1604                 if (!untilb || p->offset == p->size)
1605                     break;
1606                 assert(p->offset < p->size);
1607 #if INT_ENCODE
1608                 src_0 = src;
1609                 file_item = file_item_buf;
1610                 (*b->method->codec.reset)(c1);
1611                 (*b->method->codec.decode)(c1, &file_item, &src);
1612                 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1613                 {
1614                     src = src_0;
1615                     break;
1616                 }
1617 #else
1618                 decode_ptr(&src, &item_len);
1619                 if ((*b->method->compare_item)(untilb, src) <= pp->scope)
1620                     break;
1621                 src += item_len;
1622 #endif
1623             }
1624             pp->level++;
1625         }
1626 #if INT_ENCODE
1627         (*b->method->codec.stop)(c1);
1628 #endif
1629     }
1630     assert(p->offset < p->size);
1631     assert(p->leaf);
1632     while(1)
1633     {
1634         char *dst0 = dst;
1635         src = p->bytes + p->offset;
1636         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1637         p->offset = src - (char*) p->bytes;
1638         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) < pp->scope)
1639             break;
1640         dst = dst0;
1641         if (p->offset == p->size) goto again;
1642     }
1643     pp->returned_numbers++; 
1644     return 1;
1645 }
1646
1647 zint isamb_get_int_splits(ISAMB b)
1648 {
1649     return b->number_of_int_splits;
1650 }
1651
1652 zint isamb_get_leaf_splits(ISAMB b)
1653 {
1654     return b->number_of_leaf_splits;
1655 }
1656
1657 zint isamb_get_root_ptr(ISAMB b)
1658 {
1659     return b->root_ptr;
1660 }
1661
1662 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
1663 {
1664     b->root_ptr = root_ptr;
1665 }
1666
1667
1668 /*
1669  * Local variables:
1670  * c-basic-offset: 4
1671  * indent-tabs-mode: nil
1672  * End:
1673  * vim: shiftwidth=4 tabstop=8 expandtab
1674  */
1675