Pages

Saturday, August 8, 2020

Apache Ignite Extensions




Last November we started working on Apache Ignite Extensions to allow Apache Ignite codebase host core modules capabilities and migrate 3rd party integrations in a separate repository. 


The migration effort started with following motivation:


  1. To keep Apache Ignite core modules and extensions modules to have separate release lifecycles.
  2. Few integrations which are no longer in use can be deprecated.
  3. Help Apache Ignite community to support core and extensions separately (test, release, fix, continue development).


The following extensions are currently undergoing migration and won't be maintained by Apache Ignite community for every core release. If later the community sees demand for an unsupported integration, it can be taken back and be officially supported (testing, dev, releases, compatibility with the core) as an Ignite module.


  1. Flink - Ignite Flink Streamer consumes messages from an Apache Flink consumer endpoint and feeds them into an Ignite cache.
  2. Flume - IgniteSink is a Flume sink that extracts events from an associated Flume channel and injects into an Ignite cache.
  3. Twitter - Ignite Twitter Streamer consumes messages from a Twitter Streaming API and inserts them into an Ignite cache.
  4. ZeroMQ -  Ignite ZeroMQ Streamer consumes messages from a ZeroMQ consumer endpoint and feeds them into an Ignite cache.
  5. RocketMQ - Ignite RocketMQ Streamer consumes messages from an Apache RocketMQ consumer endpoint and feeds them into an Ignite cache.
  6. Storm - Ignite Storm Streamer consumes messages from an Apache Storm consumer endpoint and feeds them into an Ignite cache.
  7. MQTT - Ignite MQTT Streamer consumes messages from a MQTT topic and feeds transformed key-value pairs into an Ignite cache.
  8. Camel - Ignite Camel streamer consumes messages from an Apache Camel consumer endpoint and feeds them into an Ignite cache.
  9. JMS - Ignite JMS Data Streamer consumes messages from JMS brokers and inserts them into Ignite caches.

We considered following requirement guidelines for each Ignite Extensions


  1. An extension can be released separately from Apache Ignite core.
  2. An extension has to be tested with existing testing tools like TeamCity and Travis.
  3. Each extension is validated against every Apache Ignite core release and a new version of extension to be released along with Apache Ignite code if changes are required.
  4. Extensions can continue to have their own specific version release and need not aligned with Apache Ignite core release version. 

We identified risks with migration efforts associated with modification of existing build pipeline  and testing procedures. Also release policies have to be updated to ensure that modules & core versions compatibility matrix is updated regularly.


We also had new extensions which are contributed by Apache Ignite community in Ignite Extensions project:


  1. Pub-Sub - Pub/Sub module is a streaming connector to inject Pub/Sub data into Ignite cache.
  2. Spring Boot Autoconfigure - Apache Ignite Spring Boot Autoconfigure module provides autoconfiguration capabilities for Spring-boot based applications.
  3. Spring Boot Thin Client Autoconfigure - Apache Ignite Client Spring Boot Autoconfigure module provides autoconfiguration capabilities for Spring-boot based applications.

At present we had releases for Spring Boot Autoconfigure and Spring Boot Thin Client Autoconfigure extensions and available for download from maven.


<dependency>

    <groupId>org.apache.ignite</groupId>

    <artifactId>ignite-spring-boot-autoconfigure-ext</artifactId>

    <version>1.0.0</version>

</dependency>

<dependency>

    <groupId>org.apache.ignite</groupId>

    <artifactId>ignite-spring-boot-thin-client-autoconfigure-ext</artifactId>

    <version>1.0.0</version>

</dependency> 


The current migrated modules are hosted here https://github.com/apache/ignite-extensions

Sunday, August 26, 2018

Data streaming using Apache Flink and Apache Ignite

Stream processing topology



Apache IgniteSink offers a streaming connector to inject Flink data into the Ignite cache. The sink emits its input data to the Ignite cache. The key feature to note is the performance and scale both Apache Flink and Apache Ignite offer. Apache Flink can process unbounded and bounded data sets and has been designed to run stateful streaming applications at scale. Application computation is distributed and concurrently executed in clusters. Apache Flink is also optimized for local state access for tasks and does checkpointing of local state for durability. Apache Ignite provides streaming capabilities that allow data ingestion at high scale in its in-memory data grid.

In this article, we will discuss how we can build a data streaming application using Apache Flink and Apache Ignite. Building a data streaming application offers the benefit of ingesting large finite and infinite volumes of data in an optimized and fault tolerant way into the Ignite cluster. The data ingestion rate is very high and can scale up to millions of events per seconds.

Setup: Download and Start Flink

Download a binary from the downloads page. You can pick any Hadoop/Scala combination you like. If you plan to just use the local file system, any Hadoop version will work fine. Go to the download directory.

Unpack the downloaded archive.

$ cd ~/Downloads        # Go to download directory
$ tar xzf flink-*.tgz   # Unpack the downloaded archive
$ cd flink-1.5.0

Start a Local Flink Cluster

$ ./bin/start-cluster.sh  # Start Flink
Check the Dispatcher’s web frontend at http://localhost:8081 and make sure everything is up and running. The web frontend should report a single available TaskManager instance.
Dispatcher: Overview
You can also verify that the system is running by checking the log files in the logs directory:
$ tail log/flink-*-standalonesession-*.log

Download Kafka

Download a binary from the downloads page (https://kafka.apache.org/downloads). You can pick Apache Kafka 0.10.2.2version with scala 2.11.

Unpack the Downloaded Archive

$ cd ~/Downloads        # Go to download directory
$ tar xzf kafka_2.11-0.10.2.2.tgz   # Unpack the downloaded archive
$ cd kafka_2.11-0.10.2.2

Start zookeeper server

$./bin/zookeeper-server-start.sh ./config/zookeeper.properties

Start broker

./bin/kafka-server-start.sh ./config/server.properties 

Create topic “mytopic”

$ ./bin/kafka-topics.sh --create --topic mytopic --zookeeper localhost:2181 --partitions 1 --replication-factor 1

Describe topic "mytopic"

$ ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mytopic

Produce something into the topic (write something and hit enter)

$ ./bin/kafka-console-producer.sh --topic mytopic --broker-list localhost:9092

Consume from the topic using the console producer

$ ./bin/kafka-console-consumer.sh --topic mytopic --zookeeper localhost:2181

Clone Apache Ignite

As of writing this document the IgniteSink support for data streaming application in Flink cluster is available in masterbranch.
$ git clone https://github.com/apache/ignite

Build Apache Ignite

$ mvn clean package install -DskipTests

Build the Flink program :

$ mvn clean package

Submit the Flink program :

$ ./bin/flink run streamers-1.0-SNAPSHOT.jar

Produce something into the topic (write something and hit enter)

$ ./bin/kafka-console-producer.sh --topic mytopic --broker-list localhost:9092
The .out file will print the counts at the end of each time window as long as words are floating in, e.g.:
$ tail -f log/flink-*-taskexecutor-*.out
lorem : 1
bye : 1
ipsum : 4

Ignite rest service

To check the cache key values you can use the Ignite rest service
$ curl -X GET http://localhost:8080/ignite\?cmd\=getall\&k1\=jam\&cacheName\=testCache

Scan cache

To check all the keys from an Ignite cache the following rest service can be used
$ curl -X GET http://localhost:8080/ignite?cmd=qryscanexe&pageSize=10&cacheName=testCache

Ignite Web Console

Ignite Web Console Build Instructions

  1. Install MongoDB (version >=3.2.0 <=3.4.15) using instructions from http://docs.mongodb.org/manual/installation.
  2. Install Node.js (version >=8.0.0) using installer from https://nodejs.org/en/download/current for your OS.
  3. Change directory to 'modules/web-console/backend' and run "npm install --no-optional" for download backend dependencies.
  4. Change directory to 'modules/web-console/frontend' and run "npm install --no-optional" for download frontend dependencies.
  5. Build ignite-web-agent module follow instructions from 'modules/web-console/web-agent/README.txt'.
  6. Copy ignite-web-agent-.zip from 'modules/web-console/web-agent/target' to 'modules/web-console/backend/agent_dists' folder.
  7. Unzip ignite-web-agent-.zip in 'modules/web-console/backend/agent_dists'
  8. run './ignite-web-agent.sh' inside ignite-web-agent- folder
Steps 1 - 4 should be executed once.

Ignite Web Console Run In Development Mode

  1. Configure MongoDB to run as service or in terminal change dir to $MONGO_INSTALL_DIR/server/3.2/bin and start MongoDB by executing "mongod".
  2. In new terminal change directory to 'modules/web-console/backend'. If needed run "npm install --no-optional" (if dependencies changed) and run "npm start" to start backend.
  3. In new terminal change directory to 'modules/web-console/frontend'. If needed run "npm install --no-optional" (if dependencies changed) and start webpack in development mode "npm run dev".
  4. In browser open: http://localhost:9000

Web console can be used to scan cache and view all the cache contents. 

To Stop Flink When You’re Done, Type:

$ ./bin/stop-cluster.sh

Summary

We covered how we can build a simple data streaming application using Apache Flink and Apache Ignite and create stream processing topology that will allow data streaming in a distributed, scalable and fault tolerant way that can process unbounded data sets consisting of millions of events.

Tuesday, June 12, 2012

PickerBot- Flipkart Hack Day 2012


"Johnny Sokko And His Flying Robot" Does it ring a bell? It was my favorite program back in school. I am always fascinated by robots , especially the small ones which can be used as utility bots.

 This time at Flipkart Hack Day 2012 we planned to build a bot to serve similar purpose. The idea struck me as I saw an arduino based implementation of bitbeambot http://bitbeam.org/bitbeambot
"Robots that can play angry birds" thats awesome. Our idea was to build a bot that can pick items from one point lets say picking point and deliver to a particular shelf in warehouse.This bot can be used to organise products in warehouses to reduce manual efforts and minimise errors. The elongated plan was to build robotic arms which will pick items,scan and drop it on the Picker Bot.The PickerBot will deliver it to particular location. The PickerBot will be smart to avoid obstacles on the way with the help of sharp IR sensors.PickerBot will be assigned a wireless radios for real time monitoring solutions.Finaly equipped with a barcode reader it will be able to identify the deliverables.In the delivery shelf we will have another robotic arm to pick up the product and arrange it on the shelf. Sounds cool already...

Well I had two alternate plans as well for the bot.

If(success){
 The bot can be used to deliver snacks from the kitchen while I am on a difficult mission on my PS3.
}else {
 The non functional bot will be a toy for my newborn daughter.
}


This was my first hardware hack and I was already nervous about the fate of our robot.We soon realised that the motor drivers were not working as desired means we have to hack their need too.We connected the motors directly to arduino but then realise that the arduino out will not deliver sufficient Volts to power both the motors. Time was running out as it was 24 hrs event and it was already 6pm. We have nimble chance to buy another Motor driver and even if we purchase there is no guarantee that it will work as desired. The confusion was that we probably won't be able to stop the bot without the motor driver. We then connected the arduino to the motors and scripted the Motor controlling functions on arduino.The arduino board was connected directly to   the Motors and was able control the Motors. Victory !!! however small it was, it raised our hopes a little.
   
Few minutes later, we started assembling the parts of the bot. We connected the small motors with wheels and a caster wheel at the front to support and turn the bot left and right.We connected the breadboard, arduino board , connected the batteries and started testing it. The bot moved but was very slow.The second problem with the bot was that one motor moves slower than the other and the bot turns in circles than moving in a straight line.9pm it was , Dinner time.

Post dinner we connected the large DC Motors and large Wheels so that given more power the bot will move faster.To fancy us the bot didn't moved at all.:( The DC Motors were heavy and wheels had better grips in addition to that the carpeted floor were not helping.Our hope sank again.

Post midnight, The rolls arrived.After a short break we planned to add more batteries and the attach the lighter wheels on the front.Once we powered up the bot again the bot started rolling on  the floors. This also subsided the moving in the circle problem.3 am,  we were quite happy by this time so much so that to celebrate our victory we added LEDs to celebrate.

Finally, connected the servo motor and attached the IR sensors and our PickerBot was ready for Demo.


Thursday, May 10, 2012

Http profiler using Selenium to track omniture calls


We often come across issues like missing tracking calls for omniture or beacon tracking. These calls are useful for various reasons like to generate hit map using beacon tracking where it tracks the mouse clicks of users, to track business metrics like page views, number of orders placed online, user engagement with page objects etc.

Traditionally while development we verify the calls using tools like firebug in Firefox or Httpwatch in IE but checking the calls for a end to end flow of order placement or many pages in a automated way could get tricky.

I would like to share how selenium can be used for this purpose. DefaultSelenium has captureNetworkTraffic() method that can be used for this purpose. This method can be used to track omniture , beacon tracking calls for complete page views as well as ajax calls.


Sample code
 
package com.example.common;

import com.thoughtworks.selenium.Wait;
import fitlibrary.SequenceFixture;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.StringReader;
import java.net.URL;
import java.net.URLDecoder;
import java.util.*;
import java.util.logging.Logger;

public class Omniture extends SequenceFixture {
    public static final String TIMEOUT = "120000";
    Logger logger = Logger.getLogger(Omniture.class.getName());
    static Properties props = new Properties();
 
    String omnitureCall;
 
    // This method validate presence of omniture call in
    // an action like openPage
 
    public boolean validateOmnitureCall(String calls) {
        BufferedReader reader = new BufferedReader(new StringReader(calls));
        String strline;
        try {
            while ((strline = reader.readLine()) != null) {


                if (strline.contains("200 GET http://domain-name.d1.sc.net/b/ss/")) {

                    omnitureCall = strline;
                    return true;
                }
                if (strline.contains("200 GET https://domain-name.d1.sc.net:443/b/ss/")) {

                    omnitureCall = strline;
                    return true;
                }


            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        logger.info("omniture call not found.");
        return false;
    }

// This method validate each omniture params mentioned in
//Omniture.properties file with the actual omniture call for
//the action like openPage

    public boolean validateOmnitureParams(String page, String action) {

        try {
            FileInputStream f = new FileInputStream(config.locators + "/Omniture.properties");
            props.load(f);
            omnitureCall = URLDecoder.decode(omnitureCall);

            String omnitureQueryParamns = omnitureCall.substring(omnitureCall.indexOf("?") + 1);

            StringTokenizer stringTokenizer = new StringTokenizer(omnitureQueryParamns, "&");

            while (stringTokenizer.hasMoreElements()) {
                String param = stringTokenizer.nextToken();
                String paramArray[] = param.split("=");
                String paramName = paramArray[0];
                String paramValue = paramArray[1];

                String value = props.getProperty(page + "." + action + "." + paramName);

                if (!(value == null)){
                    if (!paramValue.contains(value)) {
                        logger.info("page "+page + " action " + action + " paramName " + paramName );
                        logger.info("Omniture param value defined in Omniture.properties for " + paramName + " is  " + value);
                        logger.info("Omniture param value in omniture call for " + paramName + " is " + paramValue);
                        return false;

                    }
                }
            }
            f.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

        return true;
    }


// A open page call using selenium to open a webpage
 
     public boolean openPage(String page, String url) throws Exception {
        config.selenium.open(url);
        String calls = config.selenium.captureNetworkTraffic("plain");
        config.selenium.waitForPageToLoad(TIMEOUT);
        setWalletBalance();

        if (!validateOmnitureCall(calls)) {
            return false;
        }
        if (!validateOmnitureParams(page, "openPage")) {
            return false;
        }

        return true;
    }


Sample properties file
       
 Omniture.properties
         
 Homepage.openPage.v1 = Home
 Homepage.openPage.v2 = Home
 Homepage.openPage.v3 = Homepage
 Homepage.openPage.pageName = Homepage

Monday, April 2, 2012

Parallel execution of Fitnesse tests

We often face the problem of selenium tests running so slow.If you are using fitnesse as your testcase manager and testrunner, probably running tests in Suite takes a long time for you.

We faced similar situations as we integrated our testsuite in continuous integration system. Slow running tests ends up slowing down the build and in turn shipping the features to production environment.

The solution we would recommend is to use the trinidad package shipped with version 20100103. More info at the following url

http://www.fitnesse.info/trinidad

We are using the trinidad package testrunner to run our tests in threads in parallel.The fitnesse tests are maintained in a txt file and reading that file as input we create threads to run tests.



Sample code


package travelQa;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import fit.Counts;
import fitnesse.trinidad.*;
import java.io.*;

public class InProcessRunner {
public static String testright, testwrong, testexceptions, testname,summary;
public static Counts cs;
public static int right, wrong, exceptions, totalright, totalwrong,
totalexceptions;
public static String str1, strhead , strsummary;
static UUID batchId = UUID.randomUUID();

public static void startProcessing(final List tests)
throws InterruptedException {

Thread t = new Thread() {

@Override
public void run() {
try{
for (String next : tests) {


String dbhost = "jdbc:mysql://localhost/automation?user=root";
Class.forName("com.mysql.jdbc.Driver").newInstance();
Connection conn = DriverManager.getConnection(dbhost);

Statement stmt = conn.createStatement();
stmt.executeQuery("select * from environment");

ResultSet rs = stmt.getResultSet();

while(rs.next()){

System.setProperty("browser", rs.getString("browserCode"));



boolean status = new File("C:\\wamp\\www\\output\\"+ batchId +"\\"+ rs.getString("browserName") +"").mkdirs();
System.out.println(status);
TestRunner tdd = new TestRunner(new FitNesseRepository(
"C:\\root\\fitnesse"), new FitTestEngine(),
"C:\\wamp\\www\\output\\"+ batchId+"\\"+rs.getString("browserName")+"");

cs = tdd.runTest(next);
right = cs.right;
wrong = cs.wrong;
exceptions = cs.exceptions;
totalright = right + totalright;
totalwrong = wrong + totalwrong;
totalexceptions = exceptions + totalexceptions;


testname = tests.toString();
testname = testname.replace("[", "");
testname = testname.replace("]", "");
summary = cs.toString();


java.util.Date dt = new java.util.Date();
java.text.SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime = sdf.format(dt);

String sql = "insert into report(testName,passed,failed,exception,createdTime,batchId,browser) values ('"+ testname +"','"+ right +"','"+ wrong +"','"+ exceptions +"','"+ currentTime + "','"+ batchId +"','"+ rs.getString("browserName") +"')";
System.out.println(sql);
PreparedStatement ps = conn.prepareStatement(sql);
System.out.println(sql);

ps.execute(sql);
ps.close ();


System.out.println(" Test Passed " + totalright);
System.out.println(" Test Failed : " + totalwrong);
System.out.println(" Test Exceptions : " + totalexceptions);
}

}

}catch(Exception e){
e.printStackTrace();
}
}

};
t.start();

}


public static void main(String[] args) throws IOException {

File file = new File("C:\\root\\fitnesse\\TestList.txt");


try {
BufferedReader bufRdr = new BufferedReader(new FileReader(file));
String csvline = null;

int c = 0;
List testList = new ArrayList();

while ((csvline = bufRdr.readLine()) != null) {
if (c == 1) {
startProcessing(testList);
testList = new ArrayList();
c = 0;
}
testList.add(csvline);
c++;
}
startProcessing(testList);

} catch (Exception e) {
e.printStackTrace();
}
}


}


Thursday, June 18, 2009

AOL Innovation Competition 2



Won the AOL Innovation competition 2009. This time the competition was announced 2 months prior to the date of event. This gave developers good time to think,innovate and implement the idea. There were 150 ideas posted and my app was selected as one of the best ideas.

I had an opportunity to explain the idea to our CEO Timothy Armstrong.