The goal is to achieve multiple notes to be downloaded from the peers. Hence the concurrency, the downloading should not block the main running thread.This is common to IpfsNotebookrepo or Bittorentrepo. So how should the design be ?
Here is the current IpfsNotebookrepo class.
The get(hash : Multihash) and get(url : MagnetURL) are blocking calls. It waits till it downloads from peer. Hence they have to be run in a thread. Hence various approaches are
here is the code..
Currently I have used callbacks from google-gauva. After the download is complete send method is called with appropriate operation to notify the user.
So here are my questions
Here is the current IpfsNotebookrepo class.
The get(hash : Multihash) and get(url : MagnetURL) are blocking calls. It waits till it downloads from peer. Hence they have to be run in a thread. Hence various approaches are
- IpfsNotebookRepo implements Runnable and submit it to scheduler. But I will have to create new IpfsNotebookRepo instances everytime.
- Create a class IpfsDownloadTask implements Runnable/Callable . Should this class be nested , inner or a separate class. If it is a separate class it should contain IpfsNotebookRepo instance as a member to call .get method.
here is the code..
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.util.concurrent.FutureCallback; | |
import com.google.common.util.concurrent.Futures; | |
import com.google.common.util.concurrent.ListenableScheduledFuture; | |
import org.eclipse.jetty.websocket.api.Session; | |
import org.eclipse.jetty.websocket.api.WebSocketListener; | |
import java.io.IOException; | |
public class WebSocketEx implements WebSocketListener { | |
private Session connection; | |
public void onWebSocketBinary(byte[] bytes, int i, int i1) { | |
} | |
public void onWebSocketClose(int i, String s) { | |
System.out.println("statuscode = [" + i + "], reason = [" + s + "]"); | |
connection = null; | |
} | |
public void onWebSocketConnect(Session session) { | |
connection = session; | |
System.out.println("new connection = " + session.getRemoteAddress() + " connected"); | |
} | |
public void onWebSocketError(Throwable throwable) { | |
throwable.printStackTrace(); | |
} | |
public void onWebSocketText(String s) { | |
IpfsMessage message = IpfsMessage.deserilize(s); | |
switch (message.op) { | |
case DOWNLOAD_NOTE: | |
downloadNote(message); | |
break; | |
case LIST_DOWNLOAD: | |
listDownloads(); | |
break; | |
case CANCEL_DOWNLOAD: | |
break; | |
default: | |
break; | |
} | |
} | |
private void listDownloads() { | |
IpfsMessage messageToSend = new IpfsMessage(IpfsOp.LIST_DOWNLOAD); | |
try { | |
send(messageToSend.serialize()); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
private void downloadNote(final IpfsMessage message) { | |
final String hash = (String) message.get("hash"); | |
if (hash == null) | |
return; | |
IpfsTask downloadJob = new IpfsTask(hash); | |
final ListenableScheduledFuture<String> noteFuture = ipfsDownloader.getinstance().add(downloadJob, 0); | |
try { | |
IpfsMessage messageToSend = new IpfsMessage(IpfsOp.ADDED_TO_DOWNLOAD); | |
messageToSend.put("hash", hash); | |
send(messageToSend.serialize()); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
Futures.addCallback(noteFuture, new FutureCallback<String>() { | |
public void onSuccess(String s) { | |
try { | |
IpfsMessage messageToSend = new IpfsMessage(IpfsOp.DOWNLOAD_COMPLETE); | |
messageToSend.put("hash", hash); | |
messageToSend.put("output", s); | |
send(messageToSend.serialize()); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
public void onFailure(Throwable throwable) { | |
throwable.printStackTrace(); | |
try { | |
send("download failed"); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
}); | |
} | |
private void send(String msg) throws IOException { | |
if (isSessionOpen()) { | |
connection.getRemote().sendString(msg); | |
} | |
} | |
private boolean isSessionOpen() { | |
return connection != null && connection.isOpen(); | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.util.concurrent.ListenableScheduledFuture; | |
import com.google.common.util.concurrent.ListeningScheduledExecutorService; | |
import com.google.common.util.concurrent.MoreExecutors; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
public class ipfsDownloader { | |
private static ipfsDownloader instance = null; | |
private final ListeningScheduledExecutorService scheduler; | |
private ipfsDownloader(int numThread) { | |
scheduler = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(numThread)); | |
} | |
public static ipfsDownloader create(int numberOfThread) { | |
if (instance == null) { | |
instance = new ipfsDownloader(numberOfThread); | |
} | |
return instance; | |
} | |
public static ipfsDownloader getinstance() { | |
if (instance == null) { | |
instance = new ipfsDownloader(4); | |
} | |
return instance; | |
} | |
public ListenableScheduledFuture add(Callable v, long time) { | |
return scheduler.schedule(v, time, TimeUnit.SECONDS); | |
} | |
public ListenableScheduledFuture add(Runnable v, long time) { | |
return scheduler.schedule(v, time, TimeUnit.SECONDS); | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.gson.Gson; | |
import java.util.HashMap; | |
import java.util.Map; | |
public class IpfsMessage { | |
private static final Gson gson = new Gson(); | |
public IpfsOp op; | |
public Map<String, Object> data = new HashMap<>(); | |
public IpfsMessage(IpfsOp op) { | |
this.op = op; | |
} | |
public void put(String key, Object o){ | |
data.put(key,o); | |
} | |
public Object get(String key){ | |
return data.get(key); | |
} | |
public static IpfsMessage deserilize(String message) { | |
return gson.fromJson(message,IpfsMessage.class); | |
} | |
public String serialize(){ | |
return gson.toJson(this); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public enum IpfsOp { | |
DOWNLOAD_NOTE, | |
GET_PEER_LIST, | |
LIST_DOWNLOAD, | |
CANCEL_DOWNLOAD, | |
ADDED_TO_DOWNLOAD, | |
DOWNLOAD_COMPLETE, | |
DOWNLOAD_FAILED | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.eclipse.jetty.websocket.servlet.WebSocketServlet; | |
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; | |
public class ipfsServlet extends WebSocketServlet { | |
@Override | |
public void configure(WebSocketServletFactory webSocketServletFactory) { | |
webSocketServletFactory.register(WebSocketEx.class); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Random; | |
import java.util.concurrent.Callable; | |
/** | |
* Created by onkar on 21/6/16. | |
*/ | |
public class IpfsTask implements Callable<String> { | |
Random rn; | |
String ipfsHash; | |
IpfsTask(String hash){ | |
rn = new Random(0); | |
ipfsHash = hash; | |
} | |
public String getNote(){ | |
return ipfsHash.toUpperCase(); | |
} | |
public String call() throws Exception { | |
System.out.println("Task started"); | |
String noteJson = null; | |
try { | |
Thread.sleep(10000); | |
noteJson = getNote(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
System.out.println("Download Interrupted"); | |
} | |
System.out.println("Task completed"); | |
return noteJson; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.eclipse.jetty.server.Server; | |
import org.eclipse.jetty.servlet.ServletContextHandler; | |
import org.eclipse.jetty.servlet.ServletHolder; | |
public class MyServer { | |
public static void main(String[] args) { | |
Server server = new Server(9110); | |
ServletContextHandler scontxt = new ServletContextHandler(); | |
scontxt.setContextPath("/"); | |
scontxt.addServlet(new ServletHolder("ws-events",ipfsServlet.class),"/download"); | |
server.setHandler(scontxt); | |
try { | |
server.start(); | |
server.dump(System.err); | |
server.join(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
Currently I have used callbacks from google-gauva. After the download is complete send method is called with appropriate operation to notify the user.
So here are my questions
- IpfsTask class call method currently calls getNote which just returns uppercase, actually it will be returning the note in string from peer. Where should this class be ? inner, separate ? If separate , it should contain IpfsNotebookRepo instance ?
- After the note is downloaded I need to call the importNote from Notebook Server class which actually adds the note and broadcasts. How to achieve this ?
- Ipfs servlet listens on separate url path for websocket. Should it be part of Notebook server path ?