Large Scale Data Transfer with Data Services Streaming

Do you need large data sets in data services results? And how about faster response times? In this short tutorial, Anjana Fernando, Software Engineer at WSO2 introduces you to the usage of data streaming where the client will have an instantanious data flow of the data service response.

Date: Tue, 5th Jan, 2010
Level: Intermediate
Reads: 5447 Comments: 1 | Login or register to post comments
Anjana Fernando

WSO2 Inc.
anjana's picture

Applies To


WSO2 Data-Services Server 2.2.0

Objective

The objective of this tutorial is to present the data streaming capability of WSO2 Data Services Server. In using the data streaming functionality, theoretically there is no limit to the data size of a data service response. Some of the main advantages of this approach are:

  • Memory efficiency – There is no memory build up in the server, as in the case where the full result is stored in memory. But here, as the data is generated, it is streamed to the client.
  • Low response time – Since the data is returned as soon as it is generated from the server, the response will be instantaneous for the client, and will be able to process the data as its streamed real-time from the server.

Using a sample MySQL database, a data service will be created to show data streaming in action.

Prerequisites

  • Download WSO2 Data Services Server latest build from http://wso2.org/downloads/data-services-server.
  • Download and install Apache Ant from http://ant.apache.org/.
  • Install WSO2 Data Services Server as a stand-alone server (Install location will be referred to as DS_HOME here after).
  • Start the server (Run DS_HOME/bin/wso2server.bat | wso2server.sh).
  • Open a web browser and navigate to https://localhost:9443/carbon.
  • For first time user, login using the default credentials: username=admin, password=admin.

Step 1 – Create Database

We will be creating a sample database with a table which will contain an id field, and two text fields each having 1KB of data. There are 524288 records, which adds up to a Gigabyte of textual data. The following script will create the database and populate the data.

stream_db.sql

DROP DATABASE IF EXISTS StreamTestDB;
CREATE DATABASE StreamTestDB;
GRANT all on StreamTestDB.* to 'user1'@'localhost' identified by 'pass1';
USE StreamTestDB;
CREATE TABLE Data (id INT, field1 VARCHAR(1024), field2 VARCHAR(1024));

DELIMITER $$
 DROP PROCEDURE IF EXISTS PopulateData$$
 CREATE PROCEDURE PopulateData()
       BEGIN
               DECLARE count INT;
               DECLARE strDataEntry VARCHAR(1024);
               SET count = 1;
               SET strDataEntry =  '';
               WHILE count <= 1024 DO
                           SET strDataEntry = CONCAT(strDataEntry, 'x');
                           SET count = count + 1;
               END WHILE;
               SET count = 1;
               WHILE count <= 524288 DO
                           INSERT INTO Data VALUES (count, strDataEntry, strDataEntry);
                           SET count = count + 1;
               END WHILE;
               SELECT strDataEntry;
       END$$
   DELIMITER ;

CALL PopulateData();

Run the above script with the following command,

# mysql -u root -p < stream_db.sql

You will be first prompted for the MySQL root password, and then the database will be created.

Step 2 – Create the Data Service

Here we are defining a rather simple data service with a single operation which will return all the records in the database we created earlier. The data service definition is as follows.

StreamingTestDS.dbs

<?xml version="1.0" encoding="UTF-8"?> 
<data name="StreamingTestDS"> 
    <config id="ds1"> 
        <property name="org.wso2.ws.dataservice.driver">com.mysql.jdbc.Driver</property> 
        <property name="org.wso2.ws.dataservice.protocol">jdbc:mysql://localhost:3306/StreamTestDB</property> 
        <property name="org.wso2.ws.dataservice.user">user1</property> 
        <property name="org.wso2.ws.dataservice.password">pass1</property> 
        <property name="org.wso2.ws.dataservice.minpoolsize">2</property> 
        <property name="org.wso2.ws.dataservice.maxpoolsize">10</property> 
    </config> 
    <query id="dataQuery" useConfig="ds1"> 
        <sql>select id, field1, field2 from Data</sql> 
        <result element="Data" rowName="DataElement" defaultNamespace="http://org.test.streaming"> 
            <element name="id" column="id" xsdType="xs:integer"/> 
            <element name="field1" column="field1" xsdType="xs:string"/> 
            <element name="field2" column="field2" xsdType="xs:string"/> 
        </result> 
    </query> 
    <operation name="getData"> 
        <call-query href="dataQuery"/> 
    </operation> 
</data>

Save the above contents to a file called 'StreamingTestDS' and upload to Data Services as shown in Figure 2.1.

Data Service Upload Screen

Figure 2.1: Data Service Upload Screen

Step 3 – Create Service Client

After the data service is deployed, we have to write a service client in order to access it. Here we will be writing an Apache Axis2 client for consuming the data service.

StreamingDSClient.java

import javax.xml.namespace.QName;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamReader;

import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMFactory;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;

public class StreamingDSClient {

	private static OMElement getPayload() {
		OMFactory fac = OMAbstractFactory.getOMFactory();
		OMElement op = fac.createOMElement(new QName("getData"));
		return op;
	}

	public static void main(String[] args) throws Exception {
		/* service target end-point */
		String epr = "http://localhost:9763/services/StreamingTestDS";
		
		/* create the service client */
		ServiceClient client = new ServiceClient();
		Options options = new Options();
		client.setOptions(options);
		options.setTo(new EndpointReference(epr));
		options.setAction("urn:getData");
		
		/* invoke service */
		long beginTime = System.currentTimeMillis();
		OMElement res = client.sendReceive(getPayload());
		
		/* process the result */
		XMLStreamReader reader = res.getXMLStreamReaderWithoutCaching();
		int eventType;
		long count = 0, idSum = 0;
		boolean isId = false;
		while (reader.hasNext()) {
			eventType = reader.next();
			if (eventType == XMLStreamConstants.START_ELEMENT) {
				if (reader.getLocalName().equals("id")) {
					isId = true;
					count += 2;
					if (count % 1024 == 0) {
						System.out.println("Data Read: " + count / 1024 + " MB");
					}
				}
			} else if (isId && eventType == XMLStreamConstants.CHARACTERS) {
				idSum += Integer.parseInt(reader.getText());
				isId = false;
			}
		}
		long endTime = System.currentTimeMillis();
		System.out.println("\nID Sum: " + idSum);
		System.out.println("Time: " + ((endTime - beginTime) / 1000) + " seconds.");		
	}
}

We will be using 'ant' in order to build the client and run it. The following build file will be used for that purpose.

build.xml

<?xml version="1.0" encoding="UTF-8"?> 
<project default="run"> 
    <property environment="env"/> 
    <property name="axis2.home" value="${env.AXIS2_HOME}"/> 
    <path id="class.path"> 
        <pathelement path="."/> 
        <pathelement path="${java.class.path}"/> 
        <fileset dir="${axis2.home}"> 
            <include name="lib/*.jar"/> 
        </fileset> 
    </path> 
    <target name="compile"> 
        <javac fork="true" destdir="." srcdir="."> 
            <classpath refid="class.path"/> 
        </javac> 
    </target> 
    <target name="run" depends="compile"> 
        <java fork="true" classname="StreamingDSClient"> 
            <classpath> 
                <path refid="class.path"/> 
            </classpath> 
        </java> 
    </target> 
</project>

Keep the build.xml and StreamingDSClient.java in the same directory. Before running the client, the AXIS2_HOME environment variable should be set to DS_HOME/repository path.

e.g. :-

Unix/Linux:

# export AXIS2_HOME=/home/laf/home/laf/dev/bin/wso2dataservices-2.2.0/repository

Windows:

# set AXIS2_HOME=c:\wso2dataservices-2.2.0\repository

Now to build and run the client, navigate to the directory which has the source code and type 'ant'. There the code will be compiled and run. The output should resemble the following.

 

.
.
.
     [java] Data Read: 1019 MB 
     [java] Data Read: 1020 MB 
     [java] Data Read: 1021 MB 
     [java] Data Read: 1022 MB 
     [java] Data Read: 1023 MB 
     [java] Data Read: 1024 MB 
     [java] 
     [java] ID Sum: 137439215616 
     [java] Time: 92 seconds. 

BUILD SUCCESSFUL 
Total time: 1 minute 34 seconds 

In the service client, it prints a message for every one megabyte of streamed data. It also calculates the sum of all the 'id' fields that the client reads. By using the id sum, we can be sure that all of the 524288 records will be read. The formula for adding consecutive numbers from 1 to n is (n+1)*n/2 which equals to 137439215616 when n=524288.

So here with a web service invocation which persisted for 92 seconds (the time most probably will be different for each user, which depends on the user's environment), in this time period, we have managed to stream a Gigabyte of data.

Summary

This tutorial looked into the streaming functionality of the WSO2 Data Services Server. The advantages of data services streaming are: low memory consumption in the server and also quick response times for the client. With this functionality, it effectively allows for a server and client to make data service calls, which could return an unlimited size data set.


Author
Anjana Fernando, Software Engineer, WSO2 Inc. anjana at wso2 dot com

 

AttachmentSize
ds_streaming_files.zip2.56 KB
jakub.ganko.gazeta.pl's picture

It works !

Hello, Thanks you for the example. I have made it on Postgresql database server on Windows 7 (64 bit) and wso2dataservices-2.5.1 with non identical rows, full inserted, it works ! [java] Data Read: 1021 MB [java] Data Read: 1022 MB [java] Data Read: 1023 MB [java] Data Read: 1024 MB [java] [java] ID Sum: 137439215616 [java] Time: 183 seconds. BUILD SUCCESSFUL Total time: 3 minutes 7 seconds In other side in your client source StreamingDSClient.java I must add: options.setTimeOutInMilliSeconds(600000); in main metod, because timeout error occours and increase heap size to 2GB (java 64 bits). Jakub
library project main code
Learn Cloud
Learn
Cloud

The WSO2 Application Server is a reliable application server that can host your enterprise web applications. The WSO2 Application Server as a Service is offered in StratosLive, the WSO2 Platform as a Service. This article explains how a simple web application can be developed and deployed from Carbon Studio to the WSO2 Application Server...

Latest Webinar
Different groups within an organization need to monitor different Key Performance Indicators (KPIs) - An operations team will be interested in the response times of business services and loads of each service,..
Thursday, February 9th 2012, 09.00 AM (PST)

Thursday, February 9th 2012, 10.00 AM (GMT)