The Sequence API enables you to manipulate sequences of reliable messages. It sends requests and responses reliably by creating an output org.systinet.wasp.wsrm.sequence.OutputSequence associated with a particular org.systinet.wasp.webservice.ServiceClient.
Example 234 illustrates how to send a reliable request to a service. Three messages in a sequence are sent to the service.
Example 234. Sequential Reliable Synchronous Client Requests
ServiceClient sc = ServiceClient.create(WSDL_URL); Service service = (Service) sc.createProxy(Service.class); RegistryFactory factory = RegistryFactory.getInstance(); OutputSequenceRegistry outputRegistry = factory.getOutputSequenceRegistry(); OutputSequence sequence = outputRegistry.createSequence(sc); try { System.out.println(service.hello("Europe")); System.out.println(service.hello("Asia")); System.out.println(service.hello("America")); sequence.finish(); } catch (RemoteException e) { // Sequence broken? See if e.getCause() returns an instance of SequenceException if (e.getCause() instanceof SequenceException) { OneWaySequence brokenSequence = ((SequenceException) e.getCause()).getSequence(); System.err.println("Broken sequence: " + brokenSequence.toString()); e.printStackTrace(); } }
The service receives and acknowledges reliable requests automatically. If reliable responses are required, a client must create a duplex sequence org.systinet.wasp.wsrm.sequence.DuplexSequence, as shown in Example 235.
Example 235. Reliable Responses
ServiceClient sc = ServiceClient.create(WSDL_URL); Service service = (Service)sc.createProxy(Service.class); RegistryFactory factory = RegistryFactory.getInstance(); DuplexSequenceRegistry duplexRegistry = factory.getDuplexSequenceRegistry(); DuplexSequence duplexSequence = duplexRegistry.createSequence(sc); OutputSequence sequence = duplexSequence.getOutputSequence(); try { System.out.println(service.hello("Europe")); System.out.println(service.hello("Asia")); System.out.println(service.hello("America")); sequence.finish(); } catch(RemoteException e) { // Sequence broken? See if e.getCause() returns an instance of SequenceException if(e.getCause() instanceof SequenceException) { OneWaySequence brokenSequence = ((SequenceException)e.getCause()).getSequence(); System.err.println("Broken sequence: " + brokenSequence.toString()); e.printStackTrace(); } }
Input messages belong to input sequences, which are created by the reliability provider to determine the following:
Whether the input message has been secured by reliability
The sequence number
State changes
Example 236 is an example of such a sequence.
Example 236. Input Sequence
String hello(String who) { InputSequence sequence = SequenceRegistry.getActiveInputSequence(); if (sequence != null) { System.out.println("Request has a message number " + sequence.getCurrentMessageNumber() + " in a sequence " + sequence.getID() + "."); } return "Hello, " + who + "!"; }
When the application uses reliable messaging, it usually wants to be informed if the message could not be delivered (or more generally, when the required delivery assurances could not be fulfilled). On the client side, it is possible to catch an exception, as shown in Example 237
Example 237. Catching an Exception
ServiceClient sc = ServiceClient.create(WSDL_URL); Service service = (Service)sc.createProxy(Service.class); RegistryFactory factory = RegistryFactory.getInstance(); DuplexSequenceRegistry duplexRegistry = factory.getDuplexSequenceRegistry(); DuplexSequence duplexSequence = duplexRegistry.createSequence(sc); OutputSequence sequence = duplexSequence.getOutputSequence(); try { sequence.setLength(1); System.out.println(service.hello("World")); } catch(RemoteException e) { // Sequence broken? See if e.getCause() returns an instance of SequenceException. if(e.getCause() instanceof SequenceException) { OneWaySequence brokenSequence = ((SequenceException)e.getCause()).getSequence(); System.err.println("Broken sequence: " + brokenSequence.toString()); e.printStackTrace(); } }
This technique cannot be used for response messages. Instead, the proper solution is to register an org.systinet.wasp.wsrm.sequence.SequenceListener that will be notified when the sequence breaks. This is shown for the server side in Example 238.
Example 238. Listening for Sequence Exception
SequenceRegistry.addSequenceListener(new GenericSequenceListener() { public void onBreak(SequenceException e, OneWaySequence.State previous) { // sequence broken System.err.println("Broken sequence: " + e.getSequence().toString()); e.printStackTrace(); } });
A similar technique can be used on the client side but care should be taken to ignore caught exceptions so they are not processed twice.
A Sequence can be in any of following states:
Table 38. Sequence States
State | Description |
---|---|
OPEN: OPEN in org.systinet.wasp.wsrm.sequence.OneWaySequence.State | Some messages have not yet been sent and acknowledged (in output sequence) or acknowledged (in input sequence). |
DELIVERED: DELIVERED in org.systinet.wasp.wsrm.sequence.OneWaySequence.State | In an output sequence, all messages sent up to this moment have been acknowledged (in output sequence). In an input sequence, all messages received up to this moment have been confirmed. The last message has not yet been sent or received. |
COMPLETED: COMPLETED in org.systinet.wasp.wsrm.sequence.OneWaySequence.State | The sequence has been completed without errors. In an output sequence, all messages including the last message have been sent and acknowledged. In an input sequence, all messages including the last message have been confirmed. No more messages can be sent or received within this sequence. |
BROKEN: BROKEN in org.systinet.wasp.wsrm.sequence.OneWaySequence.State | The sequence was not able to fulfil the required delivery assurances or was aborted by the application. No more messages can be sent or received within this sequence. |
DESTROYED: DESTROYED in org.systinet.wasp.wsrm.sequence.OneWaySequence.State | State of destroyed sequences. Such a sequence should no longer be used since it will be neither updated nor used by the active Sequence provider. No more messages can be sent or received within this sequence. |
The following diagrams display the life-cycle of output and input sequences. Note that although they look similar, they differ slightly in the events causing transitions between states.
An output sequence is created by the application prior to sending the first message. Then it switches between DELIVERED and OPEN states as the application sends messages and the reliability provider delivers them and receives their acknowledgements. An output sequence ends its life either as COMPLETED after sending and acknowledging the last message, or as BROKEN when the application aborts the sequence or when the reliability provider is unable to fulfil required delivery assurances.
An input sequence is created by the reliability provider after receiving the first message. Then it switches between OPEN and DELIVERED states as the reliability provider receives messages and the application consumes and confirms them. An input sequence ends its life either as COMPLETED after delivering and confirming the last message, or as BROKEN when the application aborts it or when the reliability provider is unable to fulfil the required delivery assurances.
An application can watch sequence state changes by registering as an org.systinet.wasp.wsrm.sequence.SequenceListener. It is then notified whenever the sequence state changes. Such a SequenceListener is given in Example 239.
Example 239. Listening for Sequence State Changes
SequenceRegistry.addSequenceListener(new GenericSequenceListener() { public void onCreate(OneWaySequence sequence) { reportStateChange(sequence, null); } public void onComplete(OneWaySequence sequence, OneWaySequence.State previous) { reportStateChange(sequence, previous); } public void onBreak(SequenceException e, OneWaySequence.State previous) { reportStateChange(e.getSequence(), previous); } private void reportStateChange(OneWaySequence sequence, OneWaySequence.State previous) { System.out.println("Sequence " + sequence.getID() + " changed state from " + previous + " to " + sequence.getState() + "."); } });
When the receiver gets an input message, it returns an acknowledgement to the sender and enqueues the message to the input queue. The message is later taken from the queue and dispatched to the application, either as an invocation on the server side or as the response on the client side. The message is not removed from the queue until its delivery is confirmed. If delivery is not confirmed and the message remains in the queue, it can be dispatched to the application in the future.
By default, messages are confirmed implicitly by WSO2 SOA Enablement Server for Java. Requests to the service are confirmed when the implementation method of the service returns. Responses are confirmed just before the client application collects the response.
This default implicit mechanism can be turned off and the application can take over the message confirmation. The application can express its interest in explicit message confirmation by calling setExplicitConfirmation(boolean) in org.systinet.wasp.wsrm.sequence.InputSequence. Then it takes over all responsibility to confirm every message explicitly. Example 240 shows the explicit confirmation of messages on the server side:
Example 240. Explicit Message Confirmation
public class MyService implements MyServiceIface, ServiceEndpointListener { private SequenceListener listener = new GenericSequenceListener() { public void onCreate(OneWaySequence sequence) { if(!sequence.isOutput()) { // for input sequences only try { ((InputSequence)sequence).setExplicitConfirmation(true); } catch (SequenceException e) { System.out.println("Exception thrown when setting explicit confirmation: "+e); } } } }; public void onPublish(ServiceEndpoint serviceEndpoint) { SequenceRegistry.addSequenceListener(this.listener, serviceEndpoint); } public void onUnpublish(ServiceEndpoint serviceEndpoint) { SequenceRegistry.removeSequenceListener(this.listener, serviceEndpoint); } public String ping(String msg) { InputSequence sequence = SequenceRegistry.getActiveInputSequence(); // throw dice if (Math.random() > 0.5) { // confirm the message try { sequence.confirmCurrentMessage(); } catch (SequenceException e) { System.out.println("Exception thrown when confirming message: "+e); } } else { // message remains unconfirmed and will be dispatched again later } return "ping"; } }
Another use for explicit message confirmation is when the service invocation takes a long time and the service decides at some point that the request has been processed and that it does not wish to be invoked again with the same request when the system restarts. In such cases it can confirm the current message without turning the implicit message confirmation off. This is shown in Example 241.
Example 241. Explicit Message Confirmation for Lengthy Invocations
String ping(String msg) { InputSequence sequence = SequenceRegistry.getActiveInputSequence(); try { sequence.confirmCurrentMessage(); } catch (SequenceException e) { System.out.println("Exception thrown when confirming message: "+e); } // do something time-consuming here // ... }
In request-response scenarios it may be desirable to create two sequences. From the client's point of view, the Output sequence is used to send requests reliably, and the Input sequence is used to receive responses reliably. Both of these sequences are created at once and are related in one Duplex sequence. Example 242 shows how to use them.
Example 242. Use of Duplex Sequences
ServiceClient sc = ServiceClient.create(WSDL_URL); Service service = (Service)sc.createProxy(Service.class); RegistryFactory factory = RegistryFactory.getInstance(); DuplexSequenceRegistry duplexRegistry = factory.getDuplexSequenceRegistry(); DuplexSequence duplexSequence = duplexRegistry.createSequence(sc); OutputSequence outputSequence = duplexSequence.getOutputSequence(); InputSequence inputSequence = duplexSequence.getInputSequence();
For more examples of sequence correlation see the org.systinet.wasp.wsrm.sequence.DuplexSequence documentation.
Transient sequences are destroyed upon server restart, along with messages which have not been delivered. However, persistent sequences and their undelivered messages are stored. Undelivered messages from the output queue are transmitted after server restart. New messages can be sent within these older queues. The loading of a client-side persistent queue is shown in Example 243.
All sequences in WSO2 SOA Enablement Server are persistent. Therefore, the persistent store (persistent_store.jar) must be installed.
Example 243. Loading a Persistent Sequence on the Client Side
ServiceClient sc = ServiceClient.create(WSDL_URL); Service service = (Service) sc.createProxy(Service.class); RegistryFactory factory = RegistryFactory.getInstance(); OutputSequenceRegistry registry = factory.getOutputSequenceRegistry(); // load persistent sequence or create a new one (if it was not stored yet) OutputSequence sequence = registry.loadSequence(PERSISTENT_SEQUENCE_PREFIX, sc); if (sequence == null) { sequence = registry.createSequence(PERSISTENT_SEQUENCE_PREFIX, sc, true); }
Input sequences are stored and reloaded automatically.
Default time to live of persistent sequences is one year from their creation. When using persistent sequences, the application is strongly encouraged to set the proper - application specific - expiration dates of sequences. This saves the resources used to store all non-expired sequences (required to conform to the specification). The expiration date can be set either by setExpires(java.util.Date) in org.systinet.wasp.wsrm.sequence.Sequence or by the effective policy.
WSO2 SOA Enablement Server for Java stores persistent sequences in a database. WSO2 SOA Enablement Server for Java uses JDBC for database server integration. Performing this integration is described in Overview. Normally, tables are created automatically by SSJ. Manual creation is needed only if you want to run SSJ with an unsupported database or you are fine tuning index use.
Some databases have a limit on index size (the sum of the size of indexed columns). For example, on Sybase this is 600 bytes and on versions of MySQL prior to 4.1.2 this is 500 bytes. For these databases it is necessary to modify the database schema (generic SQL is given in Example 244) and create the tables manually. After this WSO2 SOA Enablement Server will use these modified tables and not try to create them automatically.
We recommend reducing the limit on the length of VARCHAR fields. You could instead customize index creation but this might degrade performance. With MySQL you could also use a newer version (4.1.2+) or recompile the server.
Example 244. Generic WS-RM Database Schema
CREATE TABLE DbOutputSequenceData ( waspInstancePK INTEGER NOT NULL, idPK VARCHAR(512) NOT NULL, wireId VARCHAR(512), endPointPath VARCHAR(512), processingId BIGINT, state INTEGER, expires BIGINT, length BIGINT, currentLength BIGINT, sequenceMetadata BLOB, PRIMARY KEY (waspInstancePK, idPK) ) CREATE INDEX ix_DbSequen1947015 ON DbOutputSequenceData(endPointPath) CREATE TABLE DbInputSequenceData ( waspInstancePK INTEGER NOT NULL, idPK VARCHAR(512) NOT NULL, wireId VARCHAR(512), endPointPath VARCHAR(512), processingId BIGINT, state INTEGER, expires BIGINT, explicitConfirmation BOOLEAN, length BIGINT, acksTo VARCHAR(512), createSequenceMessageId VARCHAR(512), sequenceMetadata BLOB, PRIMARY KEY (waspInstancePK, idPK) ) CREATE INDEX ix_DbSequen1947015 ON DbInputSequenceData(endPointPath) CREATE TABLE DbSequenceQueue ( waspInstancePK INTEGER NOT NULL, idPK VARCHAR(512) NOT NULL, outputPK BIT NOT NULL, queueIdPK VARCHAR(512) NOT NULL, priority BIGINT, PRIMARY KEY (waspInstancePK, idPK, outputPK, queueIdPK) ) CREATE INDEX ix_DbSequen4197474 ON DbSequenceQueue(waspInstancePK, queueIdPK, priority) CREATE TABLE DbSourceMessage ( waspInstancePK INTEGER NOT NULL, sequenceIdPK VARCHAR(512) NOT NULL, messageNumberPK BIGINT NOT NULL, blocked BOOLEAN, active BOOLEAN, timetoReschedule BIGINT, metadata BLOB, message BLOB, PRIMARY KEY (waspInstancePK, sequenceIdPK, messageNumberPK) ) CREATE INDEX ix_DbSource1084571 ON DbSourceMessage(waspInstancePK, blocked, active, timeRoReschedule) CREATE TABLE DbDestinationMessage( waspInstancePK INTEGER NOT NULL, sequenceIdPK VARCHAR(512) NOT NULL, messageNumberPK BIGINT NOT NULL, blocked BOOLEAN, active BOOLEAN, timeWhenReceived BIGINT, marked BOOLEAN, message BLOB, PRIMARY KEY (waspInstancePK, sequenceIdPK, messageNumberPK) ) CREATE INDEX ix_DbDestin3013127 ON DbDestinationMessage(waspInstancePK, blocked, active, marked, timeWhenReceived) CREATE TABLE UidGen ( guardPK INTEGER NOT NULL PRIMARY KEY, firstFree INTEGER ) CREATE TABLE WaspInstances ( waspInstanceNamePK VARCHAR(512) NOT NULL PRIMARY KEY, waspInstanceCode INTEGER )