From 9e98ddafd1a6e07d49f1ecdab2e2ba3f6071cb4a Mon Sep 17 00:00:00 2001 From: Jakub Skoczen Date: Fri, 28 Aug 2015 17:01:35 +0200 Subject: [PATCH] Add initial async support (Search, Record) --- src/main/java/org/yaz4j/AsyncConnection.java | 90 +++++++++++ src/main/java/org/yaz4j/AsyncConnections.java | 49 ++++++ src/main/java/org/yaz4j/Connection.java | 6 +- src/main/java/org/yaz4j/ResultSet.java | 12 +- src/main/swig/libyaz4j.i | 1 + src/test/java/org/yaz4j/AsyncConnectionsTest.java | 165 +++++++++++++++++++++ 6 files changed, 315 insertions(+), 8 deletions(-) create mode 100644 src/main/java/org/yaz4j/AsyncConnection.java create mode 100644 src/main/java/org/yaz4j/AsyncConnections.java create mode 100644 src/test/java/org/yaz4j/AsyncConnectionsTest.java diff --git a/src/main/java/org/yaz4j/AsyncConnection.java b/src/main/java/org/yaz4j/AsyncConnection.java new file mode 100644 index 0000000..107a00f --- /dev/null +++ b/src/main/java/org/yaz4j/AsyncConnection.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 1995-2015, Index Data + * All rights reserved. + * See the file LICENSE for details. + */ +package org.yaz4j; + +import org.yaz4j.exception.ZoomException; +import org.yaz4j.jni.yaz4jlib; +import static org.yaz4j.jni.yaz4jlib.*; + +/** + * + * @author jakub + */ +public class AsyncConnection extends Connection { + private ResultSet lastResultSet; + ErrorHandler eh; + SearchHandler sh; + RecordHandler rh; + + public interface SearchHandler { + public void handle(ResultSet rs); + } + + public interface RecordHandler { + public void handle(Record r); + } + + public interface ErrorHandler { + public void handle(ZoomException e); + } + + public AsyncConnection(String host, int port) { + super(host, port); + ZOOM_connection_option_set(zoomConnection, "async", "1"); + //what about piggy back? + ZOOM_connection_option_set(zoomConnection, "count", "100"); + ZOOM_connection_option_set(zoomConnection, "step", "20"); + closed = false; + } + + @Override + public ResultSet search(Query query) throws ZoomException { + lastResultSet = super.search(query); + return null; + } + + public AsyncConnection onSearch(SearchHandler sh) { + this.sh = sh; + return this; + } + + public AsyncConnection onRecord(RecordHandler rh) { + this.rh = rh; + return this; + } + + public AsyncConnection onError(ErrorHandler eh) { + this.eh = eh; + return this; + } + + //actuall handler, pkg-private + + void handleSearch() { + handleError(); + //handle search + if (sh != null) sh.handle(lastResultSet); + } + + void handleRecord() { + try { + if (rh != null) rh.handle(lastResultSet.getRecord(lastResultSet.asyncRecordOffset)); + } catch (ZoomException ex) { + if (eh != null) eh.handle(ex); + } finally { + lastResultSet.asyncRecordOffset++; + } + } + + void handleError() { + //handle error + ZoomException err = ExceptionUtil.getError(zoomConnection, host, port); + if (err != null) { + if (eh != null) eh.handle(err); + } + } + +} diff --git a/src/main/java/org/yaz4j/AsyncConnections.java b/src/main/java/org/yaz4j/AsyncConnections.java new file mode 100644 index 0000000..a7bdd44 --- /dev/null +++ b/src/main/java/org/yaz4j/AsyncConnections.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 1995-2015, Index Data + * All rights reserved. + * See the file LICENSE for details. + */ +package org.yaz4j; + +import java.util.ArrayList; +import java.util.List; +import org.yaz4j.jni.SWIGTYPE_p_p_ZOOM_connection_p; +import static org.yaz4j.jni.yaz4jlib.*; +import static java.lang.System.out; + +/** + * + * @author jakub + */ +public class AsyncConnections { + private List conns = new ArrayList(); + + public void add(AsyncConnection conn) { + conns.add(conn); + } + + public void start() { + SWIGTYPE_p_p_ZOOM_connection_p c_conns = new_zoomConnectionArray(conns.size()); + try { + for (int i=0; i { //for GC refcount - private Connection conn; - private SWIGTYPE_p_ZOOM_resultset_p resultSet; - private long size = 0; + SWIGTYPE_p_ZOOM_resultset_p resultSet; private boolean disposed = false; + int asyncRecordOffset = 0; ResultSet(SWIGTYPE_p_ZOOM_resultset_p resultSet, Connection conn) { + //do not copy anything to the java side at this point, it won't be valid + //in the async mode this.resultSet = resultSet; - size = yaz4jlib.ZOOM_resultset_size(this.resultSet); this.conn = conn; } @@ -121,7 +121,7 @@ public class ResultSet implements Iterable { private long cur; @Override public boolean hasNext() { - return cur < size; + return cur < getHitCount(); } @Override @@ -158,7 +158,7 @@ public class ResultSet implements Iterable { } public long getHitCount() { - return size; + return yaz4jlib.ZOOM_resultset_size(this.resultSet); } void _dispose() { diff --git a/src/main/swig/libyaz4j.i b/src/main/swig/libyaz4j.i index 97a74cb..aad35ec 100644 --- a/src/main/swig/libyaz4j.i +++ b/src/main/swig/libyaz4j.i @@ -11,6 +11,7 @@ %pointer_functions(size_t, size_tp); %include "carrays.i" %array_functions(ZOOM_record, zoomRecordArray); + %array_functions(ZOOM_connection, zoomConnectionArray); %typemap(jni) CharStarByteArray "jbyteArray" %typemap(jtype) CharStarByteArray "byte[]" %typemap(jstype) CharStarByteArray "byte[]" diff --git a/src/test/java/org/yaz4j/AsyncConnectionsTest.java b/src/test/java/org/yaz4j/AsyncConnectionsTest.java new file mode 100644 index 0000000..4053e81 --- /dev/null +++ b/src/test/java/org/yaz4j/AsyncConnectionsTest.java @@ -0,0 +1,165 @@ +/* + * Copyright (c) 1995-2015, Index Data + * All rights reserved. + * See the file LICENSE for details. + */ +package org.yaz4j; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; +import org.yaz4j.exception.ZoomException; + +import static java.lang.System.out; + +/** + * + * @author jakub + */ +public class AsyncConnectionsTest { + + class Box { + T item; + + public Box() { + } + + public Box(T item) { + this.item = item; + } + + T getItem() { + return item; + } + + void setItem(T item) { + this.item = item; + } + } + + public AsyncConnectionsTest() { + } + + @BeforeClass + public static void setUpClass() { + } + + @AfterClass + public static void tearDownClass() { + } + + @Before + public void setUp() { + } + + @After + public void tearDown() { + } + + /** + * Test async ZOOM operation. + */ + @Test + public void testSingleTarget() { + out.println("Trying async connection..."); + AsyncConnection conn = new AsyncConnection("z3950.indexdata.dk:210/gils", 0); + AsyncConnections conns = new AsyncConnections(); + conns.add(conn); + int expectedHitCount = 9; + final Box actualHitCount = new Box(); + final Box actualRecordCounter = new Box(0); + try { + conn.setSyntax("sutrs"); + conn.connect(); + conn.search(new PrefixQuery("@attr 1=4 utah")); + conn + .onSearch(new AsyncConnection.SearchHandler() { + public void handle(ResultSet rs) { + out.println("Received search, hit count "+rs.getHitCount()); + actualHitCount.setItem(rs.getHitCount()); + } + }) + .onRecord(new AsyncConnection.RecordHandler() { + public void handle(Record r) { + out.println("Received a record of type "+r.getSyntax()); + actualRecordCounter.setItem(actualRecordCounter.getItem()+1); + } + }); + + } catch (ZoomException ex) { + fail(ex.getMessage()); + } + conns.start(); + assertEquals(expectedHitCount, actualHitCount.item); + assertEquals(expectedHitCount, actualRecordCounter.item); + + } + + + /** + * Test async ZOOM operation. + */ + @Test + public void testMulitTarget() { + out.println("Trying async with multile connections..."); + AsyncConnections conns = new AsyncConnections(); + AsyncConnection conn = new AsyncConnection("z3950.indexdata.dk:210/gils", 0); + conns.add(conn); + AsyncConnection conn2 = new AsyncConnection("z3950.indexdata.dk:210/marc", 0); + conns.add(conn2); + int expectedHitCount = 19; //for both + final Box actualHitCount = new Box(0L); + final Box actualRecordCounter = new Box(0); + try { + //we need to simplify the API for multiple + conn.setSyntax("sutrs"); + conn.connect(); + conn.search(new PrefixQuery("@attr 1=4 utah")); + conn + .onSearch(new AsyncConnection.SearchHandler() { + public void handle(ResultSet rs) { + out.println("Received search, hit count "+rs.getHitCount()); + actualHitCount.setItem(actualHitCount.getItem() + rs.getHitCount()); + } + }) + .onRecord(new AsyncConnection.RecordHandler() { + public void handle(Record r) { + out.println("Received a record of type "+r.getSyntax()); + actualRecordCounter.setItem(actualRecordCounter.getItem()+1); + } + }); + conn2.setSyntax("marc21"); + conn2.connect(); + conn2.search(new PrefixQuery("@attr 1=4 computer")); + conn2 + .onSearch(new AsyncConnection.SearchHandler() { + public void handle(ResultSet rs) { + out.println("Received search, hit count "+rs.getHitCount()); + actualHitCount.setItem(actualHitCount.getItem() + rs.getHitCount()); + } + }) + .onRecord(new AsyncConnection.RecordHandler() { + public void handle(Record r) { + out.println("Received a record of type "+r.getSyntax()); + actualRecordCounter.setItem(actualRecordCounter.getItem()+1); + } + }) + .onError(new AsyncConnection.ErrorHandler() { + + public void handle(ZoomException e) { + out.println("Caught error: "+e.getMessage()); + } + }); + + } catch (ZoomException ex) { + fail(ex.getMessage()); + } + conns.start(); + assertEquals(expectedHitCount, actualHitCount.item); + assertEquals(expectedHitCount, actualRecordCounter.item); + + } +} -- 1.7.10.4