Google calendar

Tuesday, 28 June 2016

Communication WebSocket and concurrency

The goal is to achieve multiple notes to be downloaded from the peers. Hence the concurrency, the downloading should not block the main running thread.This is common to IpfsNotebookrepo or Bittorentrepo. So how should the design be ?

Here is the current IpfsNotebookrepo class.
The get(hash : Multihash) and get(url : MagnetURL) are blocking calls. It waits till it downloads from peer. Hence they have to be run in a thread. Hence various approaches are
  1. IpfsNotebookRepo implements Runnable and submit it to scheduler. But I will have to create new IpfsNotebookRepo instances everytime.
  2. Create a class IpfsDownloadTask implements Runnable/Callable . Should this class be nested , inner or a separate class. If it is a separate class it should contain IpfsNotebookRepo instance as a member to call .get method.
 I have created a separate example project just focusing on the main part. 


here is the code..
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import java.io.IOException;
public class WebSocketEx implements WebSocketListener {
private Session connection;
public void onWebSocketBinary(byte[] bytes, int i, int i1) {
}
public void onWebSocketClose(int i, String s) {
System.out.println("statuscode = [" + i + "], reason = [" + s + "]");
connection = null;
}
public void onWebSocketConnect(Session session) {
connection = session;
System.out.println("new connection = " + session.getRemoteAddress() + " connected");
}
public void onWebSocketError(Throwable throwable) {
throwable.printStackTrace();
}
public void onWebSocketText(String s) {
IpfsMessage message = IpfsMessage.deserilize(s);
switch (message.op) {
case DOWNLOAD_NOTE:
downloadNote(message);
break;
case LIST_DOWNLOAD:
listDownloads();
break;
case CANCEL_DOWNLOAD:
break;
default:
break;
}
}
private void listDownloads() {
IpfsMessage messageToSend = new IpfsMessage(IpfsOp.LIST_DOWNLOAD);
try {
send(messageToSend.serialize());
} catch (IOException e) {
e.printStackTrace();
}
}
private void downloadNote(final IpfsMessage message) {
final String hash = (String) message.get("hash");
if (hash == null)
return;
IpfsTask downloadJob = new IpfsTask(hash);
final ListenableScheduledFuture<String> noteFuture = ipfsDownloader.getinstance().add(downloadJob, 0);
try {
IpfsMessage messageToSend = new IpfsMessage(IpfsOp.ADDED_TO_DOWNLOAD);
messageToSend.put("hash", hash);
send(messageToSend.serialize());
} catch (IOException e) {
e.printStackTrace();
}
Futures.addCallback(noteFuture, new FutureCallback<String>() {
public void onSuccess(String s) {
try {
IpfsMessage messageToSend = new IpfsMessage(IpfsOp.DOWNLOAD_COMPLETE);
messageToSend.put("hash", hash);
messageToSend.put("output", s);
send(messageToSend.serialize());
} catch (IOException e) {
e.printStackTrace();
}
}
public void onFailure(Throwable throwable) {
throwable.printStackTrace();
try {
send("download failed");
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
private void send(String msg) throws IOException {
if (isSessionOpen()) {
connection.getRemote().sendString(msg);
}
}
private boolean isSessionOpen() {
return connection != null && connection.isOpen();
}
}
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ipfsDownloader {
private static ipfsDownloader instance = null;
private final ListeningScheduledExecutorService scheduler;
private ipfsDownloader(int numThread) {
scheduler = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(numThread));
}
public static ipfsDownloader create(int numberOfThread) {
if (instance == null) {
instance = new ipfsDownloader(numberOfThread);
}
return instance;
}
public static ipfsDownloader getinstance() {
if (instance == null) {
instance = new ipfsDownloader(4);
}
return instance;
}
public ListenableScheduledFuture add(Callable v, long time) {
return scheduler.schedule(v, time, TimeUnit.SECONDS);
}
public ListenableScheduledFuture add(Runnable v, long time) {
return scheduler.schedule(v, time, TimeUnit.SECONDS);
}
}
import com.google.gson.Gson;
import java.util.HashMap;
import java.util.Map;
public class IpfsMessage {
private static final Gson gson = new Gson();
public IpfsOp op;
public Map<String, Object> data = new HashMap<>();
public IpfsMessage(IpfsOp op) {
this.op = op;
}
public void put(String key, Object o){
data.put(key,o);
}
public Object get(String key){
return data.get(key);
}
public static IpfsMessage deserilize(String message) {
return gson.fromJson(message,IpfsMessage.class);
}
public String serialize(){
return gson.toJson(this);
}
}
public enum IpfsOp {
DOWNLOAD_NOTE,
GET_PEER_LIST,
LIST_DOWNLOAD,
CANCEL_DOWNLOAD,
ADDED_TO_DOWNLOAD,
DOWNLOAD_COMPLETE,
DOWNLOAD_FAILED
}
view raw IpfsOp.java hosted with ❤ by GitHub
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
public class ipfsServlet extends WebSocketServlet {
@Override
public void configure(WebSocketServletFactory webSocketServletFactory) {
webSocketServletFactory.register(WebSocketEx.class);
}
}
import java.util.Random;
import java.util.concurrent.Callable;
/**
* Created by onkar on 21/6/16.
*/
public class IpfsTask implements Callable<String> {
Random rn;
String ipfsHash;
IpfsTask(String hash){
rn = new Random(0);
ipfsHash = hash;
}
public String getNote(){
return ipfsHash.toUpperCase();
}
public String call() throws Exception {
System.out.println("Task started");
String noteJson = null;
try {
Thread.sleep(10000);
noteJson = getNote();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("Download Interrupted");
}
System.out.println("Task completed");
return noteJson;
}
}
view raw IpfsTask.java hosted with ❤ by GitHub
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
public class MyServer {
public static void main(String[] args) {
Server server = new Server(9110);
ServletContextHandler scontxt = new ServletContextHandler();
scontxt.setContextPath("/");
scontxt.addServlet(new ServletHolder("ws-events",ipfsServlet.class),"/download");
server.setHandler(scontxt);
try {
server.start();
server.dump(System.err);
server.join();
} catch (Exception e) {
e.printStackTrace();
}
}
}
view raw MyServer.java hosted with ❤ by GitHub
 
Currently I have used callbacks from google-gauva. After the download is complete  send method is called with appropriate operation to notify the user.

So here are my questions 
  1. IpfsTask class call method currently calls getNote which just returns uppercase, actually it will be returning the note in string from peer. Where should this class be ? inner, separate ? If separate , it should contain IpfsNotebookRepo instance ?
  2. After the note is downloaded I need to call the importNote from Notebook Server class which actually adds the note and broadcasts. How to achieve this ?
  3. Ipfs servlet listens on separate url path for websocket. Should it be part of Notebook server path ? 
I thinks design will be common to Bittorrent as well. So I would be grateful if you would give your help and advice on the design of communication.

1 comment:

  1. Good job dude, I would like to ask you about GSoC as you participated in it last year. Can you give your project code and your relevant notes on the project, it would greatly simplify my understanding of how to work with Open Source Organizations. Thanks in advance!

    ReplyDelete