[esb-java-dev] Splitter and Aggregator into ESB
Ruwan Linton
ruwan at wso2.com
Mon Aug 20 01:34:40 PDT 2007
Hi all,
First of all sorry for this long email.
I have done an implementation of SplittMediator together with the
AggregateMediator. Basically Splitter will split one message in to
number of messages from the specified XPATH expression and forward to a
sequence for mediation (in which mostly those messages will be send out
for a actual service). The Aggregator will aggregate the messages on the
correlation property (for splitted message responses this will be
MESSAGE_SEQUENCE property together with the CORELATION property) and
create a one message depending on the given aggregation algorithm.
Here is the proposed configuration for the splitter;
<split continueParent=(true | false)>
<iterate expression="XPATH expression">
<target [to="To address to be set"] [soapAction="soap action"]
[sequence="sequence-ref"] [endpoint="endpoint-ref"]>
<sequence> ( mediator +) </sequence>?
<endpoint> (endpoint) </endpoint>?
</target>
</iterate>
</split>
This configuration will iterate the message by the given XPATH
expression and extract out the elements described by the expression and
create new messages for each and every message. Then depending on
whether the configuration specifies the "to" and "soapAction" or not
splitter will assign the values to the new messages, if not specified in
the configuration those values will be inherited from the parent message.
Ultimately these new messages will be handed over to the specified
target sequence which can be inline sequence or else a reference to
another existing sequence for further mediation. In the process of
splitting the message and creating the new messages splitter will add
two MC property namely MESSAGE_SEQUENCE and the CORELATION which
specifies the message number and the full splitted message count, parent
message ID respectively so that aggregator knows the responses that it
has to wait for.
The continueParent attribute on the split element specifies whether to
continue the mediation of the parent message or to drop that message
Here is the AggregateMediator syntax in generic format;
<aggregate>
<corelateOn expression="XPATH-expression"/>
<completeCondition timeout="time-in-seconds">
<messageCount min="int-min" max="int-max"/>
</completeCondition>
<aggregator type="default" expression="XPATH-expression"/>
<onComplete sequence="sequence-ref>
(mediator +)?
</onComplete>
<invalidate sequence="sequence-ref" timeout="time-in-seconds">
(mediator +)?
</invalidate>
</aggregate>
This mediator syntax is a little bit complex because this mediator is
written to handle the generic aggregation tasks rather than just the
splitted message aggregation. Syntax for the splitted message
aggregation will not include the corelateOn element because those will
be correlated on the MESSAGE_SEQUENCE and CORELATION properties.
<corelateOn> element will specify the XPATH expression to correlate
messages which does not contain a MESSAGE_SEQUENCE property. (For
example a particular service will expect the bundle of aggregated
messages rather than every single message to be forwarded to it self
from the ESB. In that case we can put a aggregator and collect a
specified number of messages which contains an element specified by the
XPATH expression and send it as a bundled message to the service)
<comleteOn> element specifies the completion of the aggregation which
can be all the messages in the splitted case or minimum number of
messages or else we can set a timeout to complete the aggregation.
<aggregator> element specifies the aggregation algorithm and the
relevant parameters for the aggregation such as XPATH expression which
will be used to extract out elements form each and every message and set
those to the newly created message body to create the aggregated message.
<onComplete> specifies the sequence which will be invoked after
aggregation is complete and the aggrgated message is generated. (Until
the aggregations complete condition is validated matching correlated
messages will be blocked in the AggregateMediator. This sequence can be
inlined or referred.
<invalidate> element specifies the sequence for the expired messages.
For example say, a particular message is splitted into 4 messages and
sent out and there were only 3 responses availabel before the
aggregation timeout. When the aggregation is timeout it will take it as
a completed aggregation and send the aggregated message with the 3
responses for the mediation. Now after some time the remaining response
arrives and because this aggregation is already completed this message
is an invalid message for the aggregator. In order to determine this
invlaidness AggregateMediator will keep the completed aggregations for a
certain time inside itself. This time period can be configured via the
timeout attribute of the invalidate element.
I have done with the implementation but thought to get some ideas on the
configuration language and certain set of logic as well.
Any comments? Sorry for the long email.
Thanks,
Ruwan
More information about the Esb-java-dev
mailing list