1 /* This file is part of the Zebra server.
2 Copyright (C) 1994-2009 Index Data
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include <yaz/xmalloc.h>
24 #include <idzebra/isamb.h>
32 #define ISAMB_MAJOR_VERSION 3
33 #define ISAMB_MINOR_VERSION_NO_ROOT 0
34 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
46 /* if 1, upper nodes items are encoded; 0 if not encoded */
49 /* maximum size of encoded buffer */
50 #define DST_ITEM_MAX 5000
52 /* max page size for _any_ isamb use */
53 #define ISAMB_MAX_PAGE 32768
55 #define ISAMB_MAX_LEVEL 10
56 /* approx 2*max page + max size of item */
57 #define DST_BUF_SIZE (2*ISAMB_MAX_PAGE+DST_ITEM_MAX+100)
59 /* should be maximum block size of multiple thereof */
60 #define ISAMB_CACHE_ENTRY_SIZE ISAMB_MAX_PAGE
62 /* CAT_MAX: _must_ be power of 2 */
64 #define CAT_MASK (CAT_MAX-1)
65 /* CAT_NO: <= CAT_MAX */
68 /* Smallest block size */
69 #define ISAMB_MIN_SIZE 32
71 #define ISAMB_FAC_SIZE 4
73 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
74 #define ISAMB_PTR_CODEC 1
76 struct ISAMB_cache_entry {
81 struct ISAMB_cache_entry *next;
87 struct ISAMB_head head;
88 struct ISAMB_cache_entry *cache_entries;
95 struct ISAMB_file *file;
97 int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
98 int log_io; /* log level for bf_read/bf_write calls */
99 int log_freelist; /* log level for freelist handling */
100 zint skipped_numbers; /* on a leaf node */
101 zint returned_numbers;
102 zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
103 zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
104 zint number_of_int_splits;
105 zint number_of_leaf_splits;
106 int enable_int_count; /* whether we count nodes (or not) */
107 int cache_size; /* size of blocks to cache (if cache=1) */
120 zint no_items; /* number of nodes in this + children */
124 void *decodeClientData;
132 int maxlevel; /* total depth */
135 zint skipped_numbers; /* on a leaf node */
136 zint returned_numbers;
137 zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
138 zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
139 struct ISAMB_block **block;
140 int scope; /* on what level we forward */
144 #define encode_item_len encode_ptr
146 static void encode_ptr(char **dst, zint pos)
148 unsigned char *bp = (unsigned char*) *dst;
152 *bp++ = (unsigned char) (128 | (pos & 127));
155 *bp++ = (unsigned char) pos;
159 static void encode_ptr(char **dst, zint pos)
161 memcpy(*dst, &pos, sizeof(pos));
162 (*dst) += sizeof(pos);
166 #define decode_item_len decode_ptr
168 static void decode_ptr(const char **src, zint *pos)
174 while (((c = *(const unsigned char *)((*src)++)) & 128))
176 d += ((zint) (c & 127) << r);
179 d += ((zint) c << r);
183 static void decode_ptr(const char **src, zint *pos)
185 memcpy(pos, *src, sizeof(*pos));
186 (*src) += sizeof(*pos);
191 void isamb_set_int_count(ISAMB b, int v)
193 b->enable_int_count = v;
196 void isamb_set_cache_size(ISAMB b, int v)
201 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
202 int cache, int no_cat, int *sizes, int use_root_ptr)
204 ISAMB isamb = xmalloc(sizeof(*isamb));
207 assert(no_cat <= CAT_MAX);
210 isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
211 memcpy(isamb->method, method, sizeof(*method));
212 isamb->no_cat = no_cat;
214 isamb->log_freelist = 0;
215 isamb->cache = cache;
216 isamb->skipped_numbers = 0;
217 isamb->returned_numbers = 0;
218 isamb->number_of_int_splits = 0;
219 isamb->number_of_leaf_splits = 0;
220 isamb->enable_int_count = 1;
221 isamb->cache_size = 40;
224 isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
226 isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
230 for (i = 0; i<ISAMB_MAX_LEVEL; i++)
231 isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
235 yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
239 assert(cache == 0 || cache == 1);
241 isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
243 for (i = 0; i < isamb->no_cat; i++)
245 isamb->file[i].bf = 0;
246 isamb->file[i].head_dirty = 0;
247 isamb->file[i].cache_entries = 0;
250 for (i = 0; i < isamb->no_cat; i++)
252 char fname[DST_BUF_SIZE];
253 char hbuf[DST_BUF_SIZE];
255 sprintf(fname, "%s%c", name, i+'A');
257 isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
260 isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
262 if (!isamb->file[i].bf)
268 /* fill-in default values (for empty isamb) */
269 isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
270 isamb->file[i].head.last_block = isamb->file[i].head.first_block;
271 isamb->file[i].head.block_size = sizes[i];
272 assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
274 if (i == isamb->no_cat-1 || sizes[i] > 128)
275 isamb->file[i].head.block_offset = 8;
277 isamb->file[i].head.block_offset = 4;
279 isamb->file[i].head.block_offset = 11;
281 isamb->file[i].head.block_max =
282 sizes[i] - isamb->file[i].head.block_offset;
283 isamb->file[i].head.free_list = 0;
284 if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
286 /* got header assume "isamb"major minor len can fit in 16 bytes */
288 int major, minor, len, pos = 0;
291 if (memcmp(hbuf, "isamb", 5))
293 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
297 if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
299 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
303 if (major != ISAMB_MAJOR_VERSION)
305 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
306 fname, major, ISAMB_MAJOR_VERSION);
310 for (left = len - sizes[i]; left > 0; left = left - sizes[i])
313 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
315 yaz_log(YLOG_WARN, "truncated isamb header for "
316 "file=%s len=%d pos=%d",
323 decode_ptr(&src, &isamb->file[i].head.first_block);
324 decode_ptr(&src, &isamb->file[i].head.last_block);
325 decode_ptr(&src, &zint_tmp);
326 isamb->file[i].head.block_size = (int) zint_tmp;
327 decode_ptr(&src, &zint_tmp);
328 isamb->file[i].head.block_max = (int) zint_tmp;
329 decode_ptr(&src, &isamb->file[i].head.free_list);
330 if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
331 decode_ptr(&src, &isamb->root_ptr);
333 assert(isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
334 /* must rewrite the header if root ptr is in use (bug #1017) */
335 if (use_root_ptr && writeflag)
336 isamb->file[i].head_dirty = 1;
338 isamb->file[i].head_dirty = 0;
339 assert(isamb->file[i].head.block_size == sizes[i]);
342 yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
347 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
351 int i, b_size = ISAMB_MIN_SIZE;
353 for (i = 0; i<CAT_NO; i++)
356 b_size = b_size * ISAMB_FAC_SIZE;
358 return isamb_open2(bfs, name, writeflag, method, cache,
362 static void flush_blocks(ISAMB b, int cat)
364 while (b->file[cat].cache_entries)
366 struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
367 b->file[cat].cache_entries = ce_this->next;
371 yaz_log(b->log_io, "bf_write: flush_blocks");
372 bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
379 static int cache_block(ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
381 int cat = (int) (pos&CAT_MASK);
382 int off = (int) (((pos/CAT_MAX) &
383 (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
384 * b->file[cat].head.block_size);
385 zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
387 struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
392 assert(ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
393 for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
396 if ((*ce)->pos == norm)
399 *ce = (*ce)->next; /* remove from list */
401 ce_this->next = b->file[cat].cache_entries; /* move to front */
402 b->file[cat].cache_entries = ce_this;
406 memcpy(ce_this->buf + off, userbuf,
407 b->file[cat].head.block_size);
411 memcpy(userbuf, ce_this->buf + off,
412 b->file[cat].head.block_size);
416 if (no >= b->cache_size)
418 assert(ce_last && *ce_last);
420 *ce_last = 0; /* remove the last entry from list */
423 yaz_log(b->log_io, "bf_write: cache_block");
424 bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
429 ce_this = xmalloc(sizeof(*ce_this));
430 ce_this->next = b->file[cat].cache_entries;
431 b->file[cat].cache_entries = ce_this;
432 ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
434 yaz_log(b->log_io, "bf_read: cache_block");
435 if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
436 memset(ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
439 memcpy(ce_this->buf + off, userbuf, b->file[cat].head.block_size);
445 memcpy(userbuf, ce_this->buf + off, b->file[cat].head.block_size);
451 void isamb_close(ISAMB isamb)
454 for (i = 0; isamb->accessed_nodes[i]; i++)
455 yaz_log(YLOG_DEBUG, "isamb_close level leaf-%d: "ZINT_FORMAT" read, "
456 ZINT_FORMAT" skipped",
457 i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
458 yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
459 "skipped "ZINT_FORMAT,
460 isamb->skipped_numbers, isamb->returned_numbers);
462 for (i = 0; i<isamb->no_cat; i++)
464 flush_blocks(isamb, i);
465 if (isamb->file[i].head_dirty)
467 char hbuf[DST_BUF_SIZE];
468 int major = ISAMB_MAJOR_VERSION;
470 char *dst = hbuf + 16;
472 int b_size = isamb->file[i].head.block_size;
474 encode_ptr(&dst, isamb->file[i].head.first_block);
475 encode_ptr(&dst, isamb->file[i].head.last_block);
476 encode_ptr(&dst, isamb->file[i].head.block_size);
477 encode_ptr(&dst, isamb->file[i].head.block_max);
478 encode_ptr(&dst, isamb->file[i].head.free_list);
480 if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
481 encode_ptr(&dst, isamb->root_ptr);
483 memset(dst, '\0', b_size); /* ensure no random bytes are written */
487 /* print exactly 16 bytes (including trailing 0) */
488 sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
489 isamb->minor_version, len);
491 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
493 for (left = len - b_size; left > 0; left = left - b_size)
496 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
499 if (isamb->file[i].bf)
500 bf_close (isamb->file[i].bf);
503 xfree(isamb->method);
507 /* open_block: read one block at pos.
508 Decode leading sys bytes .. consisting of
510 0: leader byte, != 0 leaf, == 0, non-leaf
511 1-2: used size of block
512 3-7*: number of items and all children
514 * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
515 of items. We can thus have at most 2^40 nodes.
517 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
519 int cat = (int) (pos&CAT_MASK);
521 int offset = b->file[cat].head.block_offset;
522 struct ISAMB_block *p;
525 p = xmalloc(sizeof(*p));
527 p->cat = (int) (pos & CAT_MASK);
528 p->buf = xmalloc(b->file[cat].head.block_size);
531 if (!cache_block (b, pos, p->buf, 0))
533 yaz_log(b->log_io, "bf_read: open_block");
534 if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
536 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
537 (long) pos, (long) pos/CAT_MAX);
538 zebra_exit("isamb:open_block");
541 p->bytes = (char *)p->buf + offset;
543 p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
546 yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
549 assert(p->size >= 0);
550 src = (char*) p->buf + 3;
551 decode_ptr(&src, &p->no_items);
556 p->decodeClientData = (*b->method->codec.start)();
560 struct ISAMB_block *new_block(ISAMB b, int leaf, int cat)
562 struct ISAMB_block *p;
564 p = xmalloc(sizeof(*p));
565 p->buf = xmalloc(b->file[cat].head.block_size);
567 if (!b->file[cat].head.free_list)
570 block_no = b->file[cat].head.last_block++;
571 p->pos = block_no * CAT_MAX + cat;
575 p->pos = b->file[cat].head.free_list;
576 assert((p->pos & CAT_MASK) == cat);
577 if (!cache_block(b, p->pos, p->buf, 0))
579 yaz_log(b->log_io, "bf_read: new_block");
580 if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
582 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
583 (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
584 zebra_exit("isamb:new_block");
587 yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
588 cat, p->pos/CAT_MAX);
589 memcpy(&b->file[cat].head.free_list, p->buf, sizeof(zint));
592 b->file[cat].head_dirty = 1;
593 memset(p->buf, 0, b->file[cat].head.block_size);
594 p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
601 p->decodeClientData = (*b->method->codec.start)();
605 struct ISAMB_block *new_leaf(ISAMB b, int cat)
607 return new_block(b, 1, cat);
611 struct ISAMB_block *new_int(ISAMB b, int cat)
613 return new_block(b, 0, cat);
616 static void check_block(ISAMB b, struct ISAMB_block *p)
618 assert(b); /* mostly to make the compiler shut up about unused b */
626 char *startp = p->bytes;
627 const char *src = startp;
628 char *endp = p->bytes + p->size;
630 void *c1 = (*b->method->codec.start)();
632 decode_ptr(&src, &pos);
633 assert((pos&CAT_MASK) == p->cat);
637 char file_item_buf[DST_ITEM_MAX];
638 char *file_item = file_item_buf;
639 (*b->method->codec.reset)(c1);
640 (*b->method->codec.decode)(c1, &file_item, &src);
643 decode_item_len(&src, &item_len);
644 assert(item_len > 0 && item_len < 128);
647 decode_ptr(&src, &pos);
648 if ((pos&CAT_MASK) != p->cat)
650 assert((pos&CAT_MASK) == p->cat);
653 (*b->method->codec.stop)(c1);
657 void close_block(ISAMB b, struct ISAMB_block *p)
663 yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
664 p->pos, p->cat, p->pos/CAT_MAX);
665 memcpy(p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
666 b->file[p->cat].head.free_list = p->pos;
667 if (!cache_block(b, p->pos, p->buf, 1))
669 yaz_log(b->log_io, "bf_write: close_block (deleted)");
670 bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
675 int offset = b->file[p->cat].head.block_offset;
676 int size = p->size + offset;
677 char *dst = (char*)p->buf + 3;
678 assert(p->size >= 0);
680 /* memset becuase encode_ptr usually does not write all bytes */
681 memset(p->buf, 0, b->file[p->cat].head.block_offset);
683 p->buf[1] = size & 255;
684 p->buf[2] = size >> 8;
685 encode_ptr(&dst, p->no_items);
687 if (!cache_block(b, p->pos, p->buf, 1))
689 yaz_log(b->log_io, "bf_write: close_block");
690 bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
693 (*b->method->codec.stop)(p->decodeClientData);
698 int insert_sub(ISAMB b, struct ISAMB_block **p,
699 void *new_item, int *mode,
701 struct ISAMB_block **sp,
702 void *sub_item, int *sub_size,
703 const void *max_item);
705 int insert_int(ISAMB b, struct ISAMB_block *p, void *lookahead_item,
707 ISAMC_I *stream, struct ISAMB_block **sp,
708 void *split_item, int *split_size, const void *last_max_item)
710 char *startp = p->bytes;
711 const char *src = startp;
712 char *endp = p->bytes + p->size;
714 struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
715 char sub_item[DST_ITEM_MAX];
719 void *c1 = (*b->method->codec.start)();
723 assert(p->size >= 0);
724 decode_ptr(&src, &pos);
728 const char *src0 = src;
730 char file_item_buf[DST_ITEM_MAX];
731 char *file_item = file_item_buf;
732 (*b->method->codec.reset)(c1);
733 (*b->method->codec.decode)(c1, &file_item, &src);
734 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
737 sub_p1 = open_block(b, pos);
739 diff_terms -= sub_p1->no_items;
740 more = insert_sub(b, &sub_p1, lookahead_item, mode,
742 sub_item, &sub_size, file_item_buf);
743 diff_terms += sub_p1->no_items;
749 decode_item_len(&src, &item_len);
750 d = (*b->method->compare_item)(src, lookahead_item);
753 sub_p1 = open_block(b, pos);
755 diff_terms -= sub_p1->no_items;
756 more = insert_sub(b, &sub_p1, lookahead_item, mode,
758 sub_item, &sub_size, src);
759 diff_terms += sub_p1->no_items;
765 decode_ptr(&src, &pos);
769 /* we reached the end. So lookahead > last item */
770 sub_p1 = open_block(b, pos);
772 diff_terms -= sub_p1->no_items;
773 more = insert_sub(b, &sub_p1, lookahead_item, mode, stream, &sub_p2,
774 sub_item, &sub_size, last_max_item);
775 diff_terms += sub_p1->no_items;
778 diff_terms += sub_p2->no_items;
782 p->no_items += diff_terms;
786 /* there was a split - must insert pointer in this one */
787 char dst_buf[DST_BUF_SIZE];
790 const char *sub_item_ptr = sub_item;
792 assert(sub_size < DST_ITEM_MAX && sub_size > 1);
794 memcpy(dst, startp, src - startp);
799 (*b->method->codec.reset)(c1);
800 (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
802 encode_item_len(&dst, sub_size); /* sub length and item */
803 memcpy(dst, sub_item, sub_size);
807 encode_ptr(&dst, sub_p2->pos); /* pos */
809 if (endp - src) /* remaining data */
811 memcpy(dst, src, endp - src);
814 p->size = dst - dst_buf;
815 assert(p->size >= 0);
817 if (p->size <= b->file[p->cat].head.block_max)
819 /* it fits OK in this block */
820 memcpy(startp, dst_buf, dst - dst_buf);
822 close_block(b, sub_p2);
826 /* must split _this_ block as well .. */
827 struct ISAMB_block *sub_p3;
829 char file_item_buf[DST_ITEM_MAX];
830 char *file_item = file_item_buf;
834 zint no_items_first_half = 0;
840 b->number_of_int_splits++;
843 close_block(b, sub_p2);
845 half = src + b->file[p->cat].head.block_size/2;
846 decode_ptr(&src, &pos);
848 if (b->enable_int_count)
850 /* read sub block so we can get no_items for it */
851 sub_p3 = open_block(b, pos);
852 no_items_first_half += sub_p3->no_items;
853 close_block(b, sub_p3);
859 file_item = file_item_buf;
860 (*b->method->codec.reset)(c1);
861 (*b->method->codec.decode)(c1, &file_item, &src);
863 decode_item_len(&src, &split_size_tmp);
864 *split_size = (int) split_size_tmp;
867 decode_ptr(&src, &pos);
869 if (b->enable_int_count)
871 /* read sub block so we can get no_items for it */
872 sub_p3 = open_block(b, pos);
873 no_items_first_half += sub_p3->no_items;
874 close_block(b, sub_p3);
877 /* p is first half */
878 p_new_size = src - dst_buf;
879 memcpy(p->bytes, dst_buf, p_new_size);
882 file_item = file_item_buf;
883 (*b->method->codec.reset)(c1);
884 (*b->method->codec.decode)(c1, &file_item, &src);
885 *split_size = file_item - file_item_buf;
886 memcpy(split_item, file_item_buf, *split_size);
888 decode_item_len(&src, &split_size_tmp);
889 *split_size = (int) split_size_tmp;
890 memcpy(split_item, src, *split_size);
893 /* *sp is second half */
894 *sp = new_int(b, p->cat);
895 (*sp)->size = endp - src;
896 memcpy((*sp)->bytes, src, (*sp)->size);
898 p->size = p_new_size;
900 /* adjust no_items in first&second half */
901 (*sp)->no_items = p->no_items - no_items_first_half;
902 p->no_items = no_items_first_half;
906 close_block(b, sub_p1);
907 (*b->method->codec.stop)(c1);
911 int insert_leaf(ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
912 int *lookahead_mode, ISAMC_I *stream,
913 struct ISAMB_block **sp2,
914 void *sub_item, int *sub_size,
915 const void *max_item)
917 struct ISAMB_block *p = *sp1;
920 char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
922 void *c1 = (*b->method->codec.start)();
923 void *c2 = (*b->method->codec.start)();
925 int quater = b->file[b->no_cat-1].head.block_max / 4;
926 char *mid_cut = dst_buf + quater * 2;
927 char *tail_cut = dst_buf + quater * 3;
928 char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
931 char cut_item_buf[DST_ITEM_MAX];
932 int cut_item_size = 0;
933 int no_items = 0; /* number of items (total) */
934 int no_items_1 = 0; /* number of items (first half) */
935 int inserted_dst_bytes = 0;
939 char file_item_buf[DST_ITEM_MAX];
940 char *file_item = file_item_buf;
943 endp = p->bytes + p->size;
944 (*b->method->codec.decode)(c1, &file_item, &src);
947 const char *dst_item = 0; /* resulting item to be inserted */
948 char *lookahead_next;
953 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
955 /* d now holds comparison between existing file item and
958 d > 0: lookahead before file
959 d < 0: lookahead after file
963 /* lookahead must be inserted */
964 dst_item = lookahead_item;
965 /* if this is not an insertion, it's really bad .. */
966 if (!*lookahead_mode)
968 yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
969 assert(*lookahead_mode);
972 else if (d == 0 && *lookahead_mode == 2)
974 /* For mode == 2, we insert the new key anyway - even
975 though the comparison is 0. */
976 dst_item = lookahead_item;
980 dst_item = file_item_buf;
982 if (d == 0 && !*lookahead_mode)
984 /* it's a deletion and they match so there is nothing
985 to be inserted anyway .. But mark the thing dirty
986 (file item was part of input.. The item will not be
990 else if (!half1 && dst > mid_cut)
992 /* we have reached the splitting point for the first time */
993 const char *dst_item_0 = dst_item;
994 half1 = dst; /* candidate for splitting */
996 /* encode the resulting item */
997 (*b->method->codec.encode)(c2, &dst, &dst_item);
999 cut_item_size = dst_item - dst_item_0;
1000 assert(cut_item_size > 0);
1001 memcpy(cut_item_buf, dst_item_0, cut_item_size);
1004 no_items_1 = no_items;
1009 /* encode the resulting item */
1010 (*b->method->codec.encode)(c2, &dst, &dst_item);
1014 /* now move "pointers" .. result has been encoded .. */
1017 /* we must move the lookahead pointer */
1019 inserted_dst_bytes += (dst - dst_0);
1020 if (inserted_dst_bytes >= quater)
1021 /* no more room. Mark lookahead as "gone".. */
1025 /* move it really.. */
1026 lookahead_next = lookahead_item;
1027 if (!(*stream->read_item)(stream->clientData,
1031 /* end of stream reached: no "more" and no lookahead */
1035 if (lookahead_item && max_item &&
1036 (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1038 /* the lookahead goes beyond what we allow in this
1039 leaf. Mark it as "gone" */
1048 /* exact match .. move both pointers */
1050 lookahead_next = lookahead_item;
1051 if (!(*stream->read_item)(stream->clientData,
1052 &lookahead_next, lookahead_mode))
1058 break; /* end of file stream reached .. */
1059 file_item = file_item_buf; /* move file pointer */
1060 (*b->method->codec.decode)(c1, &file_item, &src);
1064 /* file pointer must be moved */
1067 file_item = file_item_buf;
1068 (*b->method->codec.decode)(c1, &file_item, &src);
1073 /* this loop runs when we are "appending" to a leaf page. That is
1074 either it's empty (new) or all file items have been read in
1077 maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1078 while (lookahead_item)
1081 const char *src = lookahead_item;
1084 /* if we have a lookahead item, we stop if we exceed the value of it */
1086 (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1088 /* stop if we have reached the value of max item */
1091 if (!*lookahead_mode)
1093 /* this is append. So a delete is bad */
1094 yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1095 assert(*lookahead_mode);
1097 else if (!half1 && dst > tail_cut)
1099 const char *src_0 = src;
1100 half1 = dst; /* candidate for splitting */
1102 (*b->method->codec.encode)(c2, &dst, &src);
1104 cut_item_size = src - src_0;
1105 assert(cut_item_size > 0);
1106 memcpy(cut_item_buf, src_0, cut_item_size);
1108 no_items_1 = no_items;
1112 (*b->method->codec.encode)(c2, &dst, &src);
1122 dst_item = lookahead_item;
1123 if (!(*stream->read_item)(stream->clientData, &dst_item,
1130 new_size = dst - dst_buf;
1131 if (p && p->cat != b->no_cat-1 &&
1132 new_size > b->file[p->cat].head.block_max)
1134 /* non-btree block will be removed */
1137 /* delete it too!! */
1138 p = 0; /* make a new one anyway */
1141 { /* must create a new one */
1143 for (i = 0; i < b->no_cat; i++)
1144 if (new_size <= b->file[i].head.block_max)
1150 if (new_size > b->file[p->cat].head.block_max)
1153 const char *cut_item = cut_item_buf;
1158 assert(cut_item_size > 0);
1161 p->size = half1 - dst_buf;
1162 assert(p->size <= b->file[p->cat].head.block_max);
1163 memcpy(p->bytes, dst_buf, half1 - dst_buf);
1164 p->no_items = no_items_1;
1167 *sp2 = new_leaf(b, p->cat);
1169 (*b->method->codec.reset)(c2);
1171 b->number_of_leaf_splits++;
1173 first_dst = (*sp2)->bytes;
1175 (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1177 memcpy(first_dst, half2, dst - half2);
1179 (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1180 assert((*sp2)->size <= b->file[p->cat].head.block_max);
1181 (*sp2)->no_items = no_items - no_items_1;
1184 memcpy(sub_item, cut_item_buf, cut_item_size);
1185 *sub_size = cut_item_size;
1189 memcpy(p->bytes, dst_buf, dst - dst_buf);
1191 p->no_items = no_items;
1193 (*b->method->codec.stop)(c1);
1194 (*b->method->codec.stop)(c2);
1199 int insert_sub(ISAMB b, struct ISAMB_block **p, void *new_item,
1202 struct ISAMB_block **sp,
1203 void *sub_item, int *sub_size,
1204 const void *max_item)
1206 if (!*p || (*p)->leaf)
1207 return insert_leaf(b, p, new_item, mode, stream, sp, sub_item,
1208 sub_size, max_item);
1210 return insert_int(b, *p, new_item, mode, stream, sp, sub_item,
1211 sub_size, max_item);
1214 int isamb_unlink(ISAMB b, ISAM_P pos)
1216 struct ISAMB_block *p1;
1220 p1 = open_block(b, pos);
1225 const char *src = p1->bytes + p1->offset;
1227 void *c1 = (*b->method->codec.start)();
1229 decode_ptr(&src, &sub_p);
1230 isamb_unlink(b, sub_p);
1232 while (src != p1->bytes + p1->size)
1235 char file_item_buf[DST_ITEM_MAX];
1236 char *file_item = file_item_buf;
1237 (*b->method->codec.reset)(c1);
1238 (*b->method->codec.decode)(c1, &file_item, &src);
1241 decode_item_len(&src, &item_len);
1244 decode_ptr(&src, &sub_p);
1245 isamb_unlink(b, sub_p);
1248 (*b->method->codec.stop)(c1);
1255 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1257 char item_buf[DST_ITEM_MAX];
1261 int must_delete = 0;
1268 item_ptr = item_buf;
1270 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1275 item_ptr = item_buf;
1276 more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1279 struct ISAMB_block *p = 0, *sp = 0;
1280 char sub_item[DST_ITEM_MAX];
1284 p = open_block(b, *pos);
1285 more = insert_sub(b, &p, item_buf, &i_mode, stream, &sp,
1286 sub_item, &sub_size, 0);
1288 { /* increase level of tree by one */
1289 struct ISAMB_block *p2 = new_int(b, p->cat);
1290 char *dst = p2->bytes + p2->size;
1292 void *c1 = (*b->method->codec.start)();
1293 const char *sub_item_ptr = sub_item;
1296 encode_ptr(&dst, p->pos);
1297 assert(sub_size < DST_ITEM_MAX && sub_size > 1);
1299 (*b->method->codec.reset)(c1);
1300 (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1302 encode_item_len(&dst, sub_size);
1303 memcpy(dst, sub_item, sub_size);
1306 encode_ptr(&dst, sp->pos);
1308 p2->size = dst - p2->bytes;
1309 p2->no_items = p->no_items + sp->no_items;
1310 *pos = p2->pos; /* return new super page */
1314 (*b->method->codec.stop)(c1);
1319 *pos = p->pos; /* return current one (again) */
1321 if (p->no_items == 0)
1329 isamb_unlink(b, *pos);
1334 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1336 ISAMB_PP pp = xmalloc(sizeof(*pp));
1342 pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1349 pp->skipped_numbers = 0;
1350 pp->returned_numbers = 0;
1352 for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1353 pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1356 struct ISAMB_block *p = open_block(isamb, pos);
1357 const char *src = p->bytes + p->offset;
1358 pp->block[pp->level] = p;
1360 pp->total_size += p->size;
1364 decode_ptr(&src, &pos);
1365 p->offset = src - p->bytes;
1367 pp->accessed_nodes[pp->level]++;
1369 pp->block[pp->level+1] = 0;
1370 pp->maxlevel = pp->level;
1376 ISAMB_PP isamb_pp_open(ISAMB isamb, ISAM_P pos, int scope)
1378 return isamb_pp_open_x(isamb, pos, 0, scope);
1381 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1386 yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, "
1387 "skipped "ZINT_FORMAT,
1388 pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1389 for (i = pp->maxlevel; i>=0; i--)
1390 if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1391 yaz_log(YLOG_DEBUG, "isamb_pp_close level leaf-%d: "
1392 ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1393 pp->accessed_nodes[i], pp->skipped_nodes[i]);
1394 pp->isamb->skipped_numbers += pp->skipped_numbers;
1395 pp->isamb->returned_numbers += pp->returned_numbers;
1396 for (i = pp->maxlevel; i>=0; i--)
1398 pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1399 pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1402 *size = pp->total_size;
1404 *blocks = pp->no_blocks;
1405 for (i = 0; i <= pp->level; i++)
1406 close_block(pp->isamb, pp->block[i]);
1411 int isamb_block_info(ISAMB isamb, int cat)
1413 if (cat >= 0 && cat < isamb->no_cat)
1414 return isamb->file[cat].head.block_size;
1418 void isamb_pp_close(ISAMB_PP pp)
1420 isamb_pp_close_x(pp, 0, 0);
1423 /* simple recursive dumper .. */
1424 static void isamb_dump_r(ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1428 char prefix_str[1024];
1431 struct ISAMB_block *p = open_block(b, pos);
1432 sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1433 ZINT_FORMAT, level*2, "",
1434 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1437 sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1440 while (p->offset < p->size)
1442 const char *src = p->bytes + p->offset;
1444 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1445 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1446 p->offset = src - (char*) p->bytes;
1448 assert(p->offset == p->size);
1452 const char *src = p->bytes + p->offset;
1455 decode_ptr(&src, &sub);
1456 p->offset = src - (char*) p->bytes;
1458 isamb_dump_r(b, sub, pr, level+1);
1460 while (p->offset < p->size)
1463 char file_item_buf[DST_ITEM_MAX];
1464 char *file_item = file_item_buf;
1465 void *c1 = (*b->method->codec.start)();
1466 (*b->method->codec.decode)(c1, &file_item, &src);
1467 (*b->method->codec.stop)(c1);
1468 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1471 decode_item_len(&src, &item_len);
1472 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1475 decode_ptr(&src, &sub);
1477 p->offset = src - (char*) p->bytes;
1479 isamb_dump_r(b, sub, pr, level+1);
1486 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1488 isamb_dump_r(b, pos, pr, 0);
1491 int isamb_pp_read(ISAMB_PP pp, void *buf)
1493 return isamb_pp_forward(pp, buf, 0);
1497 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1498 { /* return an estimate of the current position and of the total number of */
1499 /* occureences in the isam tree, based on the current leaf */
1503 /* if end-of-stream PP may not be leaf */
1505 *total = (double) (pp->block[0]->no_items);
1506 *current = (double) pp->returned_numbers;
1508 yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1509 ZINT_FORMAT, *current, *total, pp->returned_numbers);
1513 int isamb_pp_forward(ISAMB_PP pp, void *buf, const void *untilb)
1517 struct ISAMB_block *p = pp->block[pp->level];
1518 ISAMB b = pp->isamb;
1522 while (p->offset == p->size)
1528 char file_item_buf[DST_ITEM_MAX];
1529 char *file_item = file_item_buf;
1533 while (p->offset == p->size)
1537 close_block(pp->isamb, pp->block[pp->level]);
1538 pp->block[pp->level] = 0;
1540 p = pp->block[pp->level];
1545 src = p->bytes + p->offset;
1548 c1 = (*b->method->codec.start)();
1549 (*b->method->codec.decode)(c1, &file_item, &src);
1551 decode_ptr(&src, &item_len);
1554 decode_ptr(&src, &pos);
1555 p->offset = src - (char*) p->bytes;
1557 src = p->bytes + p->offset;
1561 if (!untilb || p->offset == p->size)
1563 assert(p->offset < p->size);
1566 file_item = file_item_buf;
1567 (*b->method->codec.reset)(c1);
1568 (*b->method->codec.decode)(c1, &file_item, &src);
1569 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1575 decode_item_len(&src, &item_len);
1576 if ((*b->method->compare_item)(untilb, src) < pp->scope)
1580 decode_ptr(&src, &pos);
1581 p->offset = src - (char*) p->bytes;
1588 pp->block[pp->level] = p = open_block(pp->isamb, pos);
1590 pp->total_size += p->size;
1598 src = p->bytes + p->offset;
1601 decode_ptr(&src, &pos);
1602 p->offset = src - (char*) p->bytes;
1604 if (!untilb || p->offset == p->size)
1606 assert(p->offset < p->size);
1609 file_item = file_item_buf;
1610 (*b->method->codec.reset)(c1);
1611 (*b->method->codec.decode)(c1, &file_item, &src);
1612 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1618 decode_ptr(&src, &item_len);
1619 if ((*b->method->compare_item)(untilb, src) <= pp->scope)
1627 (*b->method->codec.stop)(c1);
1630 assert(p->offset < p->size);
1635 src = p->bytes + p->offset;
1636 (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1637 p->offset = src - (char*) p->bytes;
1638 if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) < pp->scope)
1641 if (p->offset == p->size) goto again;
1643 pp->returned_numbers++;
1647 zint isamb_get_int_splits(ISAMB b)
1649 return b->number_of_int_splits;
1652 zint isamb_get_leaf_splits(ISAMB b)
1654 return b->number_of_leaf_splits;
1657 zint isamb_get_root_ptr(ISAMB b)
1662 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
1664 b->root_ptr = root_ptr;
1671 * indent-tabs-mode: nil
1673 * vim: shiftwidth=4 tabstop=8 expandtab