Apache Ignite Read Through Database Caching

Introduction

In my previous post blog post about apache ignite, we learnt how to use a traditional RDBMS as persistent store with apache ignite. As part of that tutorial, we demonstrated write through caching mechanism using JCache API extension provided by ignite and also using sql. In this blog post, we will extend and build upon previous blog post. We will learn to implement apache ignite read through caching with RDBMS as persistent layer and ignite as caching layer.

Tech Stack

JAVA version "1.8.0_144"
Apache Ignite version "2.0.0"
Eclipse Neon
Apache Maven
MySQL version "5.7"
DBeaver "4.0.5"
Windows 10 Laptop

Database Table

As mentioned, we will extend the previous tutorial, hence we don’t need to create anything extra. Instead we’ll use the same tables here as well.

What are we implementing?

This project will have three parts to it –

  • Inserting to DB table through ignite cache using write-through mechanism (already implemented in previous blog)
  • Reading the already inserted records using Cache API (read-through). This will actually read the data off cache.
  • We’ll insert a record in db table manually without affecting cache. We’ll try to read this data using same cache API. However, as the data is not present in cache, the implementation will demonstrate a true read-through behavior. It’ll fetch the data from DB.

Modify the Model Class: Orders POJO

We’ll modify the POJO class Orders.java inside com.iteritory.ignite.model. We’ll add a new variable “List<OrderLines> ol” in this class and add the getter/setter method for this variable. We’ll also create a new constructor that uses it as an input argument. Post modification, the Orders.java file looks like –

package com.iteritory.ignite.model;

import java.sql.Date;
import java.util.List;

public class Orders {
	int orderNumber;
	String orderType;
	Date orderFulfillmentDate;
	OrderLines orderLine[];
	List<OrderLines> ol;
	
	public Orders(int orderNumber, String orderType, Date orderFulfillmentDate, List<OrderLines> ol) {
		super();
		this.orderNumber = orderNumber;
		this.orderType = orderType;
		this.orderFulfillmentDate = orderFulfillmentDate;
		this.orderLine = ol.toArray(new OrderLines[ol.size()]);
	}
	
	public Orders(int orderNumber, String orderType, Date orderFulfillmentDate, OrderLines[] orderLine) {
		super();
		this.orderNumber = orderNumber;
		this.orderType = orderType;
		this.orderFulfillmentDate = orderFulfillmentDate;
		this.orderLine = orderLine;
	}
	public Orders(int orderNumber, String orderType, Date orderFulfillmentDate) {
		// TODO Auto-generated constructor stub
		super();
		this.orderNumber = orderNumber;
		this.orderType = orderType;
		this.orderFulfillmentDate = orderFulfillmentDate;
	}
	public int getOrderNumber() {
		return orderNumber;
	}
	public void setOrderNumber(int orderNumber) {
		this.orderNumber = orderNumber;
	}
	public String getOrderType() {
		return orderType;
	}
	public void setOrderType(String orderType) {
		this.orderType = orderType;
	}
	public Date getOrderFulfillmentDate() {
		return orderFulfillmentDate;
	}
	public void setOrderFulfillmentDate(Date orderFulfillmentDate) {
		this.orderFulfillmentDate = orderFulfillmentDate;
	}
	public OrderLines[] getOrderLine() {
		return orderLine;
	}
	public void setOrderLine(OrderLines[] orderLine) {
		this.orderLine = orderLine;
	}
	public List<OrderLines> getOl() {
		return ol;
	}
	public void setOl(List<OrderLines> ol) {
		this.ol = ol;
	}
}

Implement load (Long key) method in OrdersCacheStoreAdapter.java

Previously we added code snippet in the write method as part of our write-through behavior demonstration. Now, we will implement the load method to demonstrate read-through behavior. This load method will be called each time you call cache.get(args) method and the data is not present in the cache. So, in our demonstration scenario, for the first two cache.get(Args) calls, orders will already be present in the cache; for the third call, it will not be present in the cache. So, it will make a call to the DB.

Post implementing the load method, the entire class looks –

package com.iteritory.ignite.adapter;

import java.sql.Connection;
//import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;

import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.resources.CacheStoreSessionResource;
import org.jetbrains.annotations.Nullable;

import com.iteritory.ignite.model.*;

public class OrdersCacheStoreAdapter extends CacheStoreAdapter<Long, Orders> {

	// This is a key to store connection
	private static final String CONN_NAME = "CONN_STORE";

	@CacheStoreSessionResource
	private CacheStoreSession cacheStoreSession;

	@Override
	public Orders load(Long key) throws CacheLoaderException {
		Connection conn = null;
		PreparedStatement stOrderInfo, stOrderLineInfo;
		ResultSet rsOrder,rsOrderLines;
		Orders ord = null;
		List<OrderLines> ol;

		try{
			System.out.println("INFO: Cache Read through. The function is invoked as data is not available in cache.");
			conn = connection();
			stOrderInfo = conn.prepareStatement("select * from orders where order_number = ?");
			stOrderLineInfo = conn.prepareStatement("select * from order_line where order_number = ?");
			stOrderInfo.setLong(1, key);
			stOrderLineInfo.setLong(1, key);
			rsOrder = stOrderInfo.executeQuery();
			rsOrderLines = stOrderLineInfo.executeQuery();

			ol = new ArrayList<OrderLines>();
			if (rsOrder.next()){
				while (rsOrderLines.next()){
					ol.add(new OrderLines(rsOrderLines.getInt(1),rsOrderLines.getInt(2),rsOrderLines.getString(3), rsOrderLines.getInt(4)));
				}
				//create order object
				ord = new Orders (rsOrder.getInt(1), rsOrder.getString(2),rsOrder.getDate(3), ol);
			}
			return ord!=null ? ord : null;
		}catch (Exception e) {
		      throw new CacheLoaderException("Failed to load: " + key, e);
		}
	}

	@Override
	public void write(Entry<? extends Long, ? extends Orders> entry) throws CacheWriterException {
		// TODO Auto-generated method stub
		
		//Get the key and value for every insert call from the invokation class IgniteWtiteThroughCache
		Long key = entry.getKey();
		Orders value = entry.getValue();

		System.out.println("INFO: Inserting the record for order#:" + key);
		Connection conn = null;
		try {
			conn = connection();

			PreparedStatement stOrder, stOrderLine;
			
			// Delete the row if any from the orderlines table for the current key
			stOrder = conn.prepareStatement("delete from orders where order_number = ?");
			stOrderLine = conn.prepareStatement("delete from order_line where order_number = ?");
			stOrder.setLong(1, value.getOrderNumber());
			stOrderLine.setLong(1, value.getOrderNumber());
			stOrderLine.executeUpdate();
			stOrder.executeUpdate();

			// Insert the rows into table 
			stOrder = conn.prepareStatement("insert into orders (order_number, order_type, order_fulfillment_date) values (?, ?, ?)");
			stOrderLine = conn.prepareStatement("insert into order_line (order_number, order_line_number, item_name,item_qty) values (?, ?, ?,?)");
			stOrder.setLong(1, value.getOrderNumber());
			stOrder.setString(2, value.getOrderType());
			stOrder.setDate(3, value.getOrderFulfillmentDate());
			stOrder.executeUpdate();

			for (int i =0; i < value.getOrderLine().length; i ++){
				OrderLines currentOrderLine = value.getOrderLine()[i];
				stOrderLine.setInt(1, currentOrderLine.getOrderNumber());
				stOrderLine.setInt(2, currentOrderLine.getOrderLineNumber());
				stOrderLine.setString(3, currentOrderLine.getItemName());
				stOrderLine.setInt(4, currentOrderLine.getItemQty());
				stOrderLine.executeUpdate();
			}
		} catch (Exception ex) {
			throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + value + ']', ex);
		} finally {
			endConnection(conn);
		}
	}

	public void delete(Object key) throws CacheWriterException {
		// TODO Auto-generated method stub

	}

	@Override
	public void sessionEnd(boolean commit) {
		Map<String, Connection> connectionProperties = cacheStoreSession.properties();
		try {
			Connection conn = (Connection) connectionProperties.remove(CONN_NAME);
			if (conn != null) {
				//when the transaction is successfully committed [tx.commit() in IgniteWriteThroughCache],
				//the commit variable will be true
				if (commit)
					conn.commit();
				else
					conn.rollback();
			}
			System.out.println("END:Transaction ended successfully [commit=" + commit + ']');
		} catch (SQLException e) {
			throw new CacheWriterException("ERROR:Failed to end transaction: " + cacheStoreSession.transaction(), e);
		}
	}

	private Connection openConnection(boolean autocommitFlag) throws SQLException {
		Connection objConn = DriverManager
				.getConnection("jdbc:mysql://localhost:3306/mysql_ignite?user=root&password=password123");
		objConn.setAutoCommit(autocommitFlag);
		System.out.println("INFO:Connection object is created with autoCommitFlag as:" + autocommitFlag);
		return objConn;
	}

	private void endConnection(@Nullable Connection objConn) {
		if (!cacheStoreSession.isWithinTransaction() && objConn != null) {
			//Close connection as there is no transaction.
			try {
				objConn.close();
				System.out.println("INFO:Connection object is closed");
			} catch (SQLException ex) {
				ex.printStackTrace();
			}
		}
	}

	private Connection connection() throws SQLException {
		// Check if there is ongoing session; if so, we'll reuse it
		if (cacheStoreSession.isWithinTransaction()) {
			System.out.println("INFO:The cache store session is within Transaction");
			Map<Object, Object> connectionProperties = cacheStoreSession.properties();
			Connection conn = (Connection) connectionProperties.get(CONN_NAME);
			if (conn == null) {
				System.out.println("INFO:Connection does not exist; create a new connection with autoCommitFlag as False");
				conn = openConnection(false);
				connectionProperties.put(CONN_NAME, conn);
			}else{
				System.out.println("INFO:Connection exists; we'll reuse the same connection");
			}
			return conn;
		} else {
			System.out.println("INFO:The cache store session is NOT within Transaction; create a new connection");
			return openConnection(true);
		}
	}
}

Maven Dependency

Here we want to print the Order object as JSON, hence we will add few dependency for Jackson as well. Post modification, my pom.xml looks as –

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.iteritory.ignite</groupId>
	<artifactId>ignite-tutorial</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>ignite-tutorial</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-core</artifactId>
			<version>2.0.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-spring</artifactId>
			<version>2.0.0</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>6.0.5</version>
		</dependency>
		<dependency>
    		<groupId>com.fasterxml.jackson.core</groupId>
    		<artifactId>jackson-core</artifactId>
    		<version>2.9.2</version>
		</dependency>
		<dependency>
    		<groupId>com.fasterxml.jackson.core</groupId>
    		<artifactId>jackson-databind</artifactId>
    		<version>2.9.2</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.6.2</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

Modify IgniteWriteThroughCache.java to implement read-through behavior

Here we’ll set the read through property in the main program. We’ll also write method for reading data. Post modification, the class looks like –

package com.iteritory.ignite.impl;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.transactions.Transaction;

import com.iteritory.ignite.adapter.OrdersCacheStoreAdapter;
import com.iteritory.ignite.model.OrderLines;
import com.iteritory.ignite.model.Orders;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;

import java.text.SimpleDateFormat;

import javax.cache.configuration.FactoryBuilder;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class IgniteWriteThroughCache {
	/** Orders cache name. */
	private static final String ORDERS_CACHE = "Orders_Cache";

	public static void main(String[] args) {
		try {
			// Set the node start mode as client; so, this node will join the
			// apache
			// cluster as client
			Ignition.setClientMode(true);

			// Here, we provide the cache configuration file
			Ignite objIgnite = Ignition.start("F:\\apache-ignite-fabric-2.0.0-bin\\config\\itc-poc-config.xml");
			CacheConfiguration<Long, Orders> ordersCacheCfg = new CacheConfiguration<>(ORDERS_CACHE);

			// Set atomicity as transaction
			ordersCacheCfg.setAtomicityMode(TRANSACTIONAL);

			// Configure JDBC store.
			ordersCacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(OrdersCacheStoreAdapter.class));

			//Set as read-thorugh cache
			ordersCacheCfg.setReadThrough(true);

			// Set as write-thorugh cache
			ordersCacheCfg.setWriteThrough(true);

			IgniteCache<Long, Orders> cache = objIgnite.getOrCreateCache(ordersCacheCfg);
			// Start transaction and execute several cache operations with
			// write-through to persistent store.
			persistData(cache);
			// write-through to persistent store.			
			readData(cache);
		} catch (Exception ex) {
			ex.printStackTrace();
		}

	}

	private static void persistData(IgniteCache<Long, Orders> cache) {
		try {

			Transaction tx = Ignition.ignite().transactions().txStart();
			System.out.println("START:Write");
			// Put/Insert first order
			OrderLines olBanana = new OrderLines(11, 1, "Banana", 12);
			OrderLines olApple = new OrderLines(11, 2, "Apple", 6);
			OrderLines[] ol1 = new OrderLines[] { olBanana, olApple };

			java.util.Date utilDate1 = new SimpleDateFormat("dd-MM-yyyy").parse("01-01-2017");
			java.sql.Date sqlDate1 = new java.sql.Date(utilDate1.getTime());
			Orders ord1 = new Orders(11, "EcomDeliveryOrder", sqlDate1, ol1);
			cache.put((long) 11, ord1);

			// Put/Insert second order
			OrderLines olMosambi = new OrderLines(22, 1, "Mosambi", 3);
			OrderLines olMango = new OrderLines(22, 2, "Mango", 4);
			OrderLines[] ol2 = new OrderLines[] { olMosambi, olMango };
			java.util.Date utilDate2 = new SimpleDateFormat("dd-MM-yyyy").parse("02-01-2017");
			java.sql.Date sqlDate2 = new java.sql.Date(utilDate2.getTime());
			Orders ord2 = new Orders(22, "StorePickupOrder", sqlDate2, ol2);
			cache.put((long) 22, ord2);
			tx.commit();
			System.out.println("END: Write");
			
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
	
	private static void readData(IgniteCache<Long, Orders> cache){
		//Transaction tx = Ignition.ignite().transactions().txStart();
		System.out.println("START:Read");
		Orders ord1 = cache.get((long)11);
		Orders ord2 = cache.get((long)22);
		Orders ord3 = cache.get((long)33);
		ObjectMapper mapper = new ObjectMapper();
		try {
			//reading from cache
            String json = mapper.writeValueAsString(ord1);
            System.out.println("JSON (hits the cache) = " + json);
            
            //reading from cache
            json = mapper.writeValueAsString(ord2);
            System.out.println("JSON (hits the cache) = " + json);
            
            //order is not avialble in cache; reading from database
            json = mapper.writeValueAsString(ord3);
            System.out.println("JSON (hits the DB) = " + json);
            
            //After the first get call, order is now avialble in cache; reading from cache
            Orders ord4 = cache.get((long)33);
            json = mapper.writeValueAsString(ord4);
            System.out.println("JSON (hits the cache) = " + json);
            
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
		
		System.out.println("END:Read");
	
	}
}

Insert an Order record in DB

In order to demonstrate that our implementation is indeed read-through, we’ll insert an order record using DBEaver (you can use sql client of your own choice) with order number 33. This record will not be in the cache initially as it was manually inserted in the DB. So, when we execute the program, for the first time, it will try to get it from cache; however, it won’t be available in cache. It will then try to fetch it from the DB and also store in the cache for anu subsequent call.

insert into orders values(33, 'EcomDeliveryOrder', '2017-05-05')
insert into order_line values(33,1,'Ptato', 2)
insert into order_line values(33,2,'Onion', 2)
commit

 

Execute the program

  • we’ll do a maven update [Right Click ignite-tutorial –> Maven –> Update Project …] and then will do a maven build [Right Click ignite-tutorial –>Run As –> Maven Build… ; set Goals as “clean install”]
  • This will create a jar file in the target directory.
  • Important step: Copy the jar file created in above step to the %IGNITE_HOME%/libs folder. This is essential for our program to run.
  • Start a single node apache ignite server by executing following command –>ignite.bat F:\apache-ignite-fabric-2.0.0-bin\config\itc-poc-config.xml from %IGNITE_HOME%/bin directory.
  • Now run the IgniteWriteThroughCache.java program. Upon successful execution, you will see execution result something similar to below on the eclipse console:-
    [01:08:01] Ignite node started OK (id=f6d7f63d)
    [01:08:01] Topology snapshot [ver=2, servers=1, clients=1, CPUs=4, heap=2.8GB]
    START:Write
    INFO: Inserting the record for order#:11
    INFO:The cache store session is within Transaction
    INFO:Connection does not exist; create a new connection with autoCommitFlag as False
    Mon Nov 20 01:08:02 IST 2017 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
    INFO:Connection object is created with autoCommitFlag as:false
    INFO: Inserting the record for order#:22
    INFO:The cache store session is within Transaction
    INFO:Connection exists; we'll reuse the same connection
    END:Transaction ended successfully [commit=true]
    END: Write
    START:Read
    JSON (hits the cache) = {"orderNumber":11,"orderType":"EcomDeliveryOrder","orderFulfillmentDate":1483209000000,"orderLine":[{"orderNumber":11,"orderLineNumber":1,"itemName":"Banana","itemQty":12},{"orderNumber":11,"orderLineNumber":2,"itemName":"Apple","itemQty":6}],"ol":null}
    JSON (hits the cache) = {"orderNumber":22,"orderType":"StorePickupOrder","orderFulfillmentDate":1483295400000,"orderLine":[{"orderNumber":22,"orderLineNumber":1,"itemName":"Mosambi","itemQty":3},{"orderNumber":22,"orderLineNumber":2,"itemName":"Mango","itemQty":4}],"ol":null}
    JSON (hits the DB) = {"orderNumber":33,"orderType":"EcomDeliveryOrder","orderFulfillmentDate":1493922600000,"orderLine":[{"orderNumber":33,"orderLineNumber":1,"itemName":"Ptato","itemQty":2},{"orderNumber":33,"orderLineNumber":2,"itemName":"Onion","itemQty":2}],"ol":null}
    JSON (hits the cache) = {"orderNumber":33,"orderType":"EcomDeliveryOrder","orderFulfillmentDate":1493922600000,"orderLine":[{"orderNumber":33,"orderLineNumber":1,"itemName":"Ptato","itemQty":2},{"orderNumber":33,"orderLineNumber":2,"itemName":"Onion","itemQty":2}],"ol":null}
    END:Read
    
  • Also check the ignite log in the terminal –

    ignite-console-log-read-through

    ignite-console-log-read-through

  • If you notice, the DB was hit only the first time cache.get((long) 33) was called. For the subsequent call to the same key does not hit DB. It hits the cache. So, in this case, when the data wasn’t available in the cache, it went to DB to fetch the data and then stored the same in cache as well. Hence, the next call to same key fetches the result from cache itself.

Conclusion

In this blog post, we implemented and also covered some concepts around read through cache mode. happy reading!! 🙂

 

141

14 Responses

  1. Rob
    July 8, 2018
  2. Sadruddin Md
    July 9, 2018
  3. IGUser
    July 10, 2018
  4. IGUser
    July 10, 2018
  5. Sadruddin Md
    July 18, 2018
  6. Sadruddin Md
    July 18, 2018
  7. Nek
    July 27, 2018
  8. Nek
    July 27, 2018
  9. Ali
    July 28, 2018
  10. Sadruddin Md
    July 28, 2018
  11. Sadruddin Md
    July 28, 2018
  12. Ali
    July 28, 2018
  13. Ali
    July 28, 2018
  14. Sadruddin Md
    July 29, 2018

Write a response

This site uses Akismet to reduce spam. Learn how your comment data is processed.