如何使用 SEDA(分阶段事件驱动架构)编写简单的 HTTP Echo 服务?
How to write Simple HTTP Echo Service Using SEDA (staged event driven architecture)?
在论文"SEDA: An Architecture for Well-Conditioned, Scalable Internet Services"中,SEDA首次发表。
SEDA 由一组阶段组成,其中每个阶段都有一个单独的线程池。
Sandstorm 是 SEDA 的 Java API,可在 https://github.com/chenhaodong/seda-sandstorm 中使用。此外,Apache MINA 内部使用 SEDA。然而,这些实现没有任何关于如何使用 SEDA 实现服务器的文档。
有谁知道如何使用 SEDA 构建一个非常简单的回显服务? (Java)
Apache MINA 是 SEDA 的开源实现。
有一个答案显示了如何使用 Apache MINA 构建一个简单的 http 服务。
Apache MINA 现在有点过时了(和 2019 年一样),那里使用的技术非常古老。因此,我从头开始编写了一个简单的新 SEDA 轻量级库和一个 Http Server 示例,如下所示。
SEDA-CORE
Event.java
import com.pasindu.queue.seda.handler.EventHandler;
public interface Event {
public EventHandler getHandler();
}
EventHandler.java
import com.pasindu.queue.seda.queue.Queue;
public interface EventHandler extends Runnable {
public void onEvent() throws InterruptedException ;
public void run();
public void setOutQueue(Queue queue);
public String getName();
public void setName(String name);
}
Logger.java
public class Logger {
public static void log(String className, String msg){
// System.out.println("SEDA-CORE LOG ----- "+ className+ "----- \t \t \t"+ msg+" -----");
}
}
Queue.java
import com.pasindu.queue.seda.event.Event;
import com.pasindu.queue.seda.helper.Logger;
import java.util.concurrent.ArrayBlockingQueue;
public class Queue {
private int capacity;
private ArrayBlockingQueue<Event> queue;
private String name;
public Queue (int capacity, String name){
this.setCapacity(capacity);
this.name = name;
setQueue(new ArrayBlockingQueue<Event>(capacity));
}
public String getName(){return this.name;}
public void enqueu(Event event) throws InterruptedException{
Logger.log(this.toString(), "Enqueing attempt for "+event.toString()+" to "+this.toString());
getQueue().put(event); // if queue is full the calling thread has to wait till this sucess (in our case the main thread or one of event handler threads in the executor pool)
}
public Event dequeue() throws InterruptedException{
Logger.log(this.toString(), "Dequeing attempt "+" from "+this.toString());
return this.getQueue().take(); // if queue is empty then the calling thread (stage thread) has to wait till the event becomes available
}
public int getCapacity() {
return capacity;
}
public void setCapacity(int capacity) {
this.capacity = capacity;
}
public ArrayBlockingQueue<Event> getQueue() {
return queue;
}
public void setQueue(ArrayBlockingQueue<Event> queue) {
this.queue = queue;
}
public int getNumElements(){
return queue.size();
}
}
Stage.java
import com.pasindu.queue.seda.event.Event;
import com.pasindu.queue.seda.thread.pool.ThreadPool;
import com.pasindu.queue.seda.handler.EventHandler;
import com.pasindu.queue.seda.helper.Logger;
import com.pasindu.queue.seda.queue.Queue;
public class Stage extends Thread {
private Queue inputQueue;
private Queue outputQueue;
private int batchSize;
private ThreadPool threadPool;
public Stage(Queue inputQueue, Queue outputQueue, int batchSize){
this.threadPool = new ThreadPool();
this.batchSize = batchSize;
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
}
@Override
public void run(){
while(true){
Event event = null;
try{
event = inputQueue.dequeue();
Logger.log(this.toString(), "Dequeued "+event.toString()+" from "+inputQueue.toString());
}catch (InterruptedException ex){
}
if(event != null) {
EventHandler handler = event.getHandler();
handler.setOutQueue(outputQueue);
handler.setName(this.getName()+"'s Event Handler");
threadPool.submit(handler);
Logger.log(this.toString(), "Enqueued " + event.toString() + " to " + outputQueue);
}else{
try {
Thread.sleep(10);
}catch(InterruptedException ex){
}
}
}
}
}
ThreadPool.java
import com.pasindu.queue.seda.handler.EventHandler;
import com.pasindu.queue.seda.helper.Logger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
ExecutorService executorService;
public ThreadPool (){
this.executorService = Executors.newFixedThreadPool(4);
}
public void submit(EventHandler handler){
Logger.log(this.toString(),"Calling submit of "+executorService.toString());
this.executorService.submit(handler);
}
}
SEDA-HTTP-SERVER
缓冲区Event.java
import com.pasindu.queue.seda.event.Event;
import com.pasindu.queue.seda.handler.EventHandler;
import handler.BufferEventHandler;
import java.nio.ByteBuffer;
public class BufferEvent implements Event {
private EventHandler handler;
private ByteBuffer buffer;
private String requestId;
private int numRead;
public BufferEvent(ByteBuffer byteBuffer, String requestId, int numRead){
this.setBuffer(byteBuffer);
this.setRequestId(requestId);
this.setNumRead(numRead);
this.setHandler(new BufferEventHandler(this));
}
public EventHandler getHandler(){
return this.handler;
}
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public int getNumRead() {
return numRead;
}
public void setNumRead(int numRead) {
this.numRead = numRead;
}
public void setHandler(EventHandler handler) {
this.handler = handler;
}
}
字节数组Event.java
import com.pasindu.queue.seda.event.Event;
import com.pasindu.queue.seda.handler.EventHandler;
import handler.ByteArrayEventHandler;
import java.nio.ByteBuffer;
public class ByteArrayEvent implements Event {
private EventHandler handler;
private ByteBuffer buffer;
private String requestId;
private byte[] data ;
private int numRead;
public ByteArrayEvent(ByteBuffer byteBuffer, String requestId, byte[] data, int numRead ){
this.setBuffer(byteBuffer);
this.setRequestId(requestId);
this.setData(data);
this.setHandler(new ByteArrayEventHandler(this));
this.numRead = numRead;
}
public EventHandler getHandler(){
return this.handler;
}
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public void setHandler(EventHandler handler) {
this.handler = handler;
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
public int getNumRead() {
return numRead;
}
public void setNumRead(int numRead) {
this.numRead = numRead;
}
}
HttpRequestEvent.java
import com.pasindu.queue.seda.event.Event;
import com.pasindu.queue.seda.handler.EventHandler;
import handler.HttpRequestEventHandler;
import java.nio.ByteBuffer;
public class HttpRequestEvent implements Event {
private EventHandler handler;
private ByteBuffer buffer;
private String requestId;
private String request;
public HttpRequestEvent(ByteBuffer byteBuffer, String requestId, String request){
this.setBuffer(byteBuffer);
this.setRequestId(requestId);
this.setRequest(request);
this.setHandler(new HttpRequestEventHandler(this));
}
public EventHandler getHandler(){
return this.handler;
}
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public void setHandler(EventHandler handler) {
this.handler = handler;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
}
HttpResponseEvent.java
import com.pasindu.queue.seda.event.Event;
import com.pasindu.queue.seda.handler.EventHandler;
public class HttpResponseEvent implements Event {
private String requestId;
public HttpResponseEvent(String requestId){
this.setRequestId(requestId);
}
public EventHandler getHandler(){
return null;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
}
缓冲区EventHandler.java
import com.pasindu.queue.seda.handler.EventHandler;
import com.pasindu.queue.seda.helper.Logger;
import com.pasindu.queue.seda.queue.Queue;
import event.BufferEvent;
import event.ByteArrayEvent;
import java.nio.ByteBuffer;
public class BufferEventHandler implements EventHandler {
private BufferEvent event;
private Queue outQueue;
private String name;
public BufferEventHandler(BufferEvent event){
this.event = event;
}
public String getName(){
return this.name;
}
public void setName(String name){
this.name = name;
}
public void setOutQueue(Queue queue){
this.outQueue = queue;
}
public void onEvent() throws InterruptedException{
ByteBuffer buffer = this.event.getBuffer();
String requestId = this.event.getRequestId();
int numRead = this.event.getNumRead();
Logger.log(this.toString(), "Recieved "+event.toString());
buffer.flip();
byte[] data = new byte[numRead];
System.arraycopy(buffer.array(), 0, data, 0, numRead);
ByteArrayEvent byteEvent = new ByteArrayEvent(buffer, requestId, data, numRead );
Logger.log(this.toString(), "Set new object to "+byteEvent.toString());
outQueue.enqueu(byteEvent);
Logger.log(this.toString(), byteEvent.toString()+" added to "+outQueue.toString());
}
public void run(){
Logger.log(this.toString(), "Running "+ this.toString()+" for "+event.toString());
try{
this.onEvent();
}catch (InterruptedException ex){
}
}
}
字节数组EventHandler.java
import com.pasindu.queue.seda.handler.EventHandler;
import com.pasindu.queue.seda.helper.Logger;
import com.pasindu.queue.seda.queue.Queue;
import event.ByteArrayEvent;
import event.HttpRequestEvent;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
public class ByteArrayEventHandler implements EventHandler {
private ByteArrayEvent event;
private Queue outQueue;
private String name;
public String getName(){
return this.name;
}
public void setName(String name){
this.name = name;
}
public ByteArrayEventHandler(ByteArrayEvent event){
this.event = event;
}
public void onEvent() throws InterruptedException{
Logger.log(this.toString(), "Recieved event "+event.toString());
ByteBuffer buffer = this.event.getBuffer();
String requestId = this.event.getRequestId();
byte[] data = this.event.getData();
int numRead = this.event.getNumRead();
String request = null;
try {
request = new String(data, "US-ASCII");
}catch (UnsupportedEncodingException ex){
}
request = request.split("\n")[0].trim();
HttpRequestEvent httpRequestEvent = new HttpRequestEvent(buffer, requestId, request);
outQueue.enqueu(httpRequestEvent);
Logger.log(this.toString(), "Enqueued "+httpRequestEvent.toString() +" to "+outQueue.toString());
}
public void setOutQueue(Queue queueu){
this.outQueue = queueu;
}
public void run(){
Logger.log(this.toString(), "Running "+ this.toString()+" for "+event.toString());
try{
this.onEvent();
}catch (InterruptedException ex){
}
}
}
HttpRequestHandler.java
import com.pasindu.queue.seda.handler.EventHandler;
import com.pasindu.queue.seda.helper.Logger;
import com.pasindu.queue.seda.queue.Queue;
import event.HttpRequestEvent;
import event.HttpResponseEvent;
import java.nio.ByteBuffer;
import java.util.Dictionary;
public class HttpRequestEventHandler implements EventHandler {
private HttpRequestEvent event;
private Queue outQueue;
private String name;
public String getName(){
return this.name;
}
public void setName(String name){
this.name = name;
}
public HttpRequestEventHandler(HttpRequestEvent event){
this.event = event;
}
public void setOutQueue(Queue queue){
this.outQueue = queue;
}
private String serverRequest(String request) {
String response = "";
if (request.startsWith("GET")) {
// pass the request and generate response here
response = "response";
return response;
}
public void onEvent() throws InterruptedException{
Logger.log(this.toString(),"Recieved "+event.toString());
ByteBuffer buffer = this.event.getBuffer();
String requestId = this.event.getRequestId();
String request = this.event.getRequest();
Logger.log(this.toString(), "Recieved object inside is "+event);
String response = serverRequest(request);
buffer.clear();
buffer.put(response.getBytes());
HttpResponseEvent responseEvent= new HttpResponseEvent(requestId);
Logger.log(this.toString(), "Set new object inside "+event.toString());
outQueue.enqueu(responseEvent);
Logger.log(this.toString(), responseEvent.toString()+" added to "+outQueue.toString());
}
public void run(){
Logger.log(this.toString(), "Running "+ this.toString()+" for "+event.toString());
try{
this.onEvent();
}catch (InterruptedException ex){
}
}
}
QueueMonitor.java
import com.pasindu.queue.seda.helper.Logger;
import com.pasindu.queue.seda.queue.Queue;
public class QueueMonitor extends Thread {
private Queue[] queues;
public QueueMonitor(Queue[] queues){
this.queues= queues;
}
@Override
public void run(){
while(true){
try{
Thread.sleep(9000);
}catch(InterruptedException ex){
}
for(Queue queue: queues){
Logger.log(this.toString(), queue.getName()+" is "+queue.getNumElements());
}
}
}
}
ThreadMonitor.java
import com.pasindu.queue.seda.helper.Logger;
public class ThreadMonitor extends Thread{
private Thread [] threads;
public ThreadMonitor(Thread [] threads){
this.threads= threads;
}
@Override
public void run(){
while(true){
try{
Thread.sleep(11000);
}catch(InterruptedException ex){
}
for(Thread thread: threads){
Logger.log(this.toString(), thread.getName()+" is "+thread.getState());
}
}
}
}
HttpEventMain.java
import com.pasindu.queue.seda.queue.Queue;
import com.pasindu.queue.seda.stage.Stage;
import event.BufferEvent;
import monitor.QueueMonitor;
import monitor.ThreadMonitor;
import org.apache.commons.lang3.RandomStringUtils;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
public class HttpEventMain extends Thread
{
private InetAddress addr;
private int port;
private Selector selector;
private ConcurrentHashMap concurrentHashMapResponse;
private ConcurrentHashMap concurrentHashMapKey;
public HttpEventMain(InetAddress addr, int port) throws IOException {
this.setAddr(addr);
this.setPort(port);
this.setConcurrentHashMapResponse(new ConcurrentHashMap<>());
this.concurrentHashMapKey = new ConcurrentHashMap<>();
}
@Override
public void run(){
System.out.println("----- Running the server on machine with "+Runtime.getRuntime().availableProcessors()+" cores -----");
try {
System.out.println("\n====================Server Details====================");
System.out.println("Server Machine: "+ InetAddress.getLocalHost().getCanonicalHostName());
System.out.println("Port number: " + this.getPort());
} catch (UnknownHostException e1) {
e1.printStackTrace();
}
try {
this.startServer();
} catch (IOException e) {
System.err.println("Error occured in runnable.HttpEventMain:" + e.getMessage());
System.exit(0);
}
}
public static void main(String[] args) throws Exception
{
HttpEventMain server = new HttpEventMain(null, 4333);
server.start();
}
private void startServer() throws IOException {
this.selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
serverChannel.socket().bind(listenAddr);
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("Server ready. Ctrl-C to stop.");
Queue inQueue = new Queue(100, "In Queue");
Queue outQueue1 = new Queue(100, "Out Queue 1");
Queue outQueue2 = new Queue(100, "Out Queue 2");
Queue outQueue3 = new Queue(100, "Out Queue 3");
int batchSize = 10;
// Stage stage = new Stage(inQueue, outQueue, batchSize);
this.setName("Event Main");
Stage bufferstage = new Stage(inQueue, outQueue1, batchSize);
bufferstage.setName("bufferstage");
Stage byteArrayStage = new Stage(outQueue1, outQueue2, batchSize);
byteArrayStage.setName("byteArrayStage");
Stage httpRequestStage = new Stage(outQueue2, outQueue3, batchSize);
httpRequestStage.setName("httpRequestStage");
ResponseMannager responseMannager = new ResponseMannager(concurrentHashMapResponse, outQueue3);
responseMannager.setName("responseMannager");
Thread [] threads = {this, bufferstage, byteArrayStage, httpRequestStage, responseMannager};
ThreadMonitor monitor = new ThreadMonitor(threads);
monitor.start();
Queue [] queues = {inQueue, outQueue1, outQueue2, outQueue3};
QueueMonitor queueMonitor = new QueueMonitor(queues);
queueMonitor.start();
bufferstage.start();
byteArrayStage.start();
httpRequestStage.start();
responseMannager.start();
while (true) {
this.selector.select();
Iterator keys = this.selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();
if (! key.isValid()) {
continue;
}
if (key.isAcceptable()) {
this.accept(key);
}
else if (key.isReadable()) {
this.read(key, inQueue);
}
else if (key.isWritable()) {
this.write(key);
}
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
channel.register(this.selector, SelectionKey.OP_READ);
}
private void read(SelectionKey key, Queue inQueue) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(8192);
int numRead = -1;
try {
numRead = channel.read(buffer);
}
catch (IOException e) {
e.printStackTrace();
}
if (numRead == -1) {
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
channel.close();
key.cancel();
return;
}
String requestID = RandomStringUtils.random(32, true, true);
while(concurrentHashMapKey.containsValue(requestID) || concurrentHashMapResponse.containsKey(requestID)){
requestID = RandomStringUtils.random(15, true, true);
}
concurrentHashMapKey.put(key, requestID);
try {
inQueue.enqueu(new BufferEvent(buffer, requestID, numRead));
}catch (InterruptedException ex){
}
concurrentHashMapResponse.put(requestID, false);
channel.register(this.selector, SelectionKey.OP_WRITE, buffer);
}
private boolean responseReady(SelectionKey key){
String requestId = concurrentHashMapKey.get(key).toString();
Boolean response = (Boolean) concurrentHashMapResponse.get(requestId);
if(response==true){
concurrentHashMapKey.remove(key);
concurrentHashMapResponse.remove(requestId);
return true;
}else{
return false;
}
}
private void write(SelectionKey key) throws IOException {
if(responseReady(key)) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer inputBuffer = (ByteBuffer) key.attachment();
inputBuffer.flip();
channel.write(inputBuffer);
channel.close();
key.cancel();
}else{
}
}
public ConcurrentHashMap getConcurrentHashMapResponse() {
return concurrentHashMapResponse;
}
public void setConcurrentHashMapResponse(ConcurrentHashMap concurrentHashMapResponse) {
this.concurrentHashMapResponse = concurrentHashMapResponse;
}
public InetAddress getAddr() {
return addr;
}
public void setAddr(InetAddress addr) {
this.addr = addr;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public Selector getSelector() {
return selector;
}
public void setSelector(Selector selector) {
this.selector = selector;
}
}
ResponseMannager.java
import com.pasindu.queue.seda.helper.Logger;
import com.pasindu.queue.seda.queue.Queue;
import event.HttpResponseEvent;
import java.util.concurrent.ConcurrentHashMap;
public class ResponseMannager extends Thread{
ConcurrentHashMap concurrentHashMapResponse;
Queue inQueue;
public ResponseMannager(ConcurrentHashMap concurrentHashMap, Queue queue){
this.concurrentHashMapResponse = concurrentHashMap;
this.inQueue = queue;
}
@Override
public void run() {
while(true){
HttpResponseEvent event = null;
try {
event = (HttpResponseEvent) inQueue.dequeue();
}catch(InterruptedException ex){
}
if(event!=null) {
Logger.log(this.toString(), "Dequeued " + event.toString() + " from " + inQueue.toString());
concurrentHashMapResponse.put(event.getRequestId(), true);
Logger.log(this.toString(), "Set response availabliity for " + event.getRequestId() + " in " + concurrentHashMapResponse.toString());
}else{
try{
Thread.sleep(10);
}catch(InterruptedException ex){
}
}
}
}
}
在论文"SEDA: An Architecture for Well-Conditioned, Scalable Internet Services"中,SEDA首次发表。
SEDA 由一组阶段组成,其中每个阶段都有一个单独的线程池。
Sandstorm 是 SEDA 的 Java API,可在 https://github.com/chenhaodong/seda-sandstorm 中使用。此外,Apache MINA 内部使用 SEDA。然而,这些实现没有任何关于如何使用 SEDA 实现服务器的文档。
有谁知道如何使用 SEDA 构建一个非常简单的回显服务? (Java)
Apache MINA 是 SEDA 的开源实现。
Apache MINA 现在有点过时了(和 2019 年一样),那里使用的技术非常古老。因此,我从头开始编写了一个简单的新 SEDA 轻量级库和一个 Http Server 示例,如下所示。
SEDA-CORE
Event.java
import com.pasindu.queue.seda.handler.EventHandler;
public interface Event {
public EventHandler getHandler();
}
EventHandler.java
import com.pasindu.queue.seda.queue.Queue;
public interface EventHandler extends Runnable {
public void onEvent() throws InterruptedException ;
public void run();
public void setOutQueue(Queue queue);
public String getName();
public void setName(String name);
}
Logger.java
public class Logger {
public static void log(String className, String msg){
// System.out.println("SEDA-CORE LOG ----- "+ className+ "----- \t \t \t"+ msg+" -----");
}
}
Queue.java
import com.pasindu.queue.seda.event.Event;
import com.pasindu.queue.seda.helper.Logger;
import java.util.concurrent.ArrayBlockingQueue;
public class Queue {
private int capacity;
private ArrayBlockingQueue<Event> queue;
private String name;
public Queue (int capacity, String name){
this.setCapacity(capacity);
this.name = name;
setQueue(new ArrayBlockingQueue<Event>(capacity));
}
public String getName(){return this.name;}
public void enqueu(Event event) throws InterruptedException{
Logger.log(this.toString(), "Enqueing attempt for "+event.toString()+" to "+this.toString());
getQueue().put(event); // if queue is full the calling thread has to wait till this sucess (in our case the main thread or one of event handler threads in the executor pool)
}
public Event dequeue() throws InterruptedException{
Logger.log(this.toString(), "Dequeing attempt "+" from "+this.toString());
return this.getQueue().take(); // if queue is empty then the calling thread (stage thread) has to wait till the event becomes available
}
public int getCapacity() {
return capacity;
}
public void setCapacity(int capacity) {
this.capacity = capacity;
}
public ArrayBlockingQueue<Event> getQueue() {
return queue;
}
public void setQueue(ArrayBlockingQueue<Event> queue) {
this.queue = queue;
}
public int getNumElements(){
return queue.size();
}
}
Stage.java
import com.pasindu.queue.seda.event.Event;
import com.pasindu.queue.seda.thread.pool.ThreadPool;
import com.pasindu.queue.seda.handler.EventHandler;
import com.pasindu.queue.seda.helper.Logger;
import com.pasindu.queue.seda.queue.Queue;
public class Stage extends Thread {
private Queue inputQueue;
private Queue outputQueue;
private int batchSize;
private ThreadPool threadPool;
public Stage(Queue inputQueue, Queue outputQueue, int batchSize){
this.threadPool = new ThreadPool();
this.batchSize = batchSize;
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
}
@Override
public void run(){
while(true){
Event event = null;
try{
event = inputQueue.dequeue();
Logger.log(this.toString(), "Dequeued "+event.toString()+" from "+inputQueue.toString());
}catch (InterruptedException ex){
}
if(event != null) {
EventHandler handler = event.getHandler();
handler.setOutQueue(outputQueue);
handler.setName(this.getName()+"'s Event Handler");
threadPool.submit(handler);
Logger.log(this.toString(), "Enqueued " + event.toString() + " to " + outputQueue);
}else{
try {
Thread.sleep(10);
}catch(InterruptedException ex){
}
}
}
}
}
ThreadPool.java
import com.pasindu.queue.seda.handler.EventHandler;
import com.pasindu.queue.seda.helper.Logger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
ExecutorService executorService;
public ThreadPool (){
this.executorService = Executors.newFixedThreadPool(4);
}
public void submit(EventHandler handler){
Logger.log(this.toString(),"Calling submit of "+executorService.toString());
this.executorService.submit(handler);
}
}
SEDA-HTTP-SERVER
缓冲区Event.java
import com.pasindu.queue.seda.event.Event;
import com.pasindu.queue.seda.handler.EventHandler;
import handler.BufferEventHandler;
import java.nio.ByteBuffer;
public class BufferEvent implements Event {
private EventHandler handler;
private ByteBuffer buffer;
private String requestId;
private int numRead;
public BufferEvent(ByteBuffer byteBuffer, String requestId, int numRead){
this.setBuffer(byteBuffer);
this.setRequestId(requestId);
this.setNumRead(numRead);
this.setHandler(new BufferEventHandler(this));
}
public EventHandler getHandler(){
return this.handler;
}
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public int getNumRead() {
return numRead;
}
public void setNumRead(int numRead) {
this.numRead = numRead;
}
public void setHandler(EventHandler handler) {
this.handler = handler;
}
}
字节数组Event.java
import com.pasindu.queue.seda.event.Event;
import com.pasindu.queue.seda.handler.EventHandler;
import handler.ByteArrayEventHandler;
import java.nio.ByteBuffer;
public class ByteArrayEvent implements Event {
private EventHandler handler;
private ByteBuffer buffer;
private String requestId;
private byte[] data ;
private int numRead;
public ByteArrayEvent(ByteBuffer byteBuffer, String requestId, byte[] data, int numRead ){
this.setBuffer(byteBuffer);
this.setRequestId(requestId);
this.setData(data);
this.setHandler(new ByteArrayEventHandler(this));
this.numRead = numRead;
}
public EventHandler getHandler(){
return this.handler;
}
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public void setHandler(EventHandler handler) {
this.handler = handler;
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
public int getNumRead() {
return numRead;
}
public void setNumRead(int numRead) {
this.numRead = numRead;
}
}
HttpRequestEvent.java
import com.pasindu.queue.seda.event.Event;
import com.pasindu.queue.seda.handler.EventHandler;
import handler.HttpRequestEventHandler;
import java.nio.ByteBuffer;
public class HttpRequestEvent implements Event {
private EventHandler handler;
private ByteBuffer buffer;
private String requestId;
private String request;
public HttpRequestEvent(ByteBuffer byteBuffer, String requestId, String request){
this.setBuffer(byteBuffer);
this.setRequestId(requestId);
this.setRequest(request);
this.setHandler(new HttpRequestEventHandler(this));
}
public EventHandler getHandler(){
return this.handler;
}
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public void setHandler(EventHandler handler) {
this.handler = handler;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
}
HttpResponseEvent.java
import com.pasindu.queue.seda.event.Event;
import com.pasindu.queue.seda.handler.EventHandler;
public class HttpResponseEvent implements Event {
private String requestId;
public HttpResponseEvent(String requestId){
this.setRequestId(requestId);
}
public EventHandler getHandler(){
return null;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
}
缓冲区EventHandler.java
import com.pasindu.queue.seda.handler.EventHandler;
import com.pasindu.queue.seda.helper.Logger;
import com.pasindu.queue.seda.queue.Queue;
import event.BufferEvent;
import event.ByteArrayEvent;
import java.nio.ByteBuffer;
public class BufferEventHandler implements EventHandler {
private BufferEvent event;
private Queue outQueue;
private String name;
public BufferEventHandler(BufferEvent event){
this.event = event;
}
public String getName(){
return this.name;
}
public void setName(String name){
this.name = name;
}
public void setOutQueue(Queue queue){
this.outQueue = queue;
}
public void onEvent() throws InterruptedException{
ByteBuffer buffer = this.event.getBuffer();
String requestId = this.event.getRequestId();
int numRead = this.event.getNumRead();
Logger.log(this.toString(), "Recieved "+event.toString());
buffer.flip();
byte[] data = new byte[numRead];
System.arraycopy(buffer.array(), 0, data, 0, numRead);
ByteArrayEvent byteEvent = new ByteArrayEvent(buffer, requestId, data, numRead );
Logger.log(this.toString(), "Set new object to "+byteEvent.toString());
outQueue.enqueu(byteEvent);
Logger.log(this.toString(), byteEvent.toString()+" added to "+outQueue.toString());
}
public void run(){
Logger.log(this.toString(), "Running "+ this.toString()+" for "+event.toString());
try{
this.onEvent();
}catch (InterruptedException ex){
}
}
}
字节数组EventHandler.java
import com.pasindu.queue.seda.handler.EventHandler;
import com.pasindu.queue.seda.helper.Logger;
import com.pasindu.queue.seda.queue.Queue;
import event.ByteArrayEvent;
import event.HttpRequestEvent;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
public class ByteArrayEventHandler implements EventHandler {
private ByteArrayEvent event;
private Queue outQueue;
private String name;
public String getName(){
return this.name;
}
public void setName(String name){
this.name = name;
}
public ByteArrayEventHandler(ByteArrayEvent event){
this.event = event;
}
public void onEvent() throws InterruptedException{
Logger.log(this.toString(), "Recieved event "+event.toString());
ByteBuffer buffer = this.event.getBuffer();
String requestId = this.event.getRequestId();
byte[] data = this.event.getData();
int numRead = this.event.getNumRead();
String request = null;
try {
request = new String(data, "US-ASCII");
}catch (UnsupportedEncodingException ex){
}
request = request.split("\n")[0].trim();
HttpRequestEvent httpRequestEvent = new HttpRequestEvent(buffer, requestId, request);
outQueue.enqueu(httpRequestEvent);
Logger.log(this.toString(), "Enqueued "+httpRequestEvent.toString() +" to "+outQueue.toString());
}
public void setOutQueue(Queue queueu){
this.outQueue = queueu;
}
public void run(){
Logger.log(this.toString(), "Running "+ this.toString()+" for "+event.toString());
try{
this.onEvent();
}catch (InterruptedException ex){
}
}
}
HttpRequestHandler.java
import com.pasindu.queue.seda.handler.EventHandler;
import com.pasindu.queue.seda.helper.Logger;
import com.pasindu.queue.seda.queue.Queue;
import event.HttpRequestEvent;
import event.HttpResponseEvent;
import java.nio.ByteBuffer;
import java.util.Dictionary;
public class HttpRequestEventHandler implements EventHandler {
private HttpRequestEvent event;
private Queue outQueue;
private String name;
public String getName(){
return this.name;
}
public void setName(String name){
this.name = name;
}
public HttpRequestEventHandler(HttpRequestEvent event){
this.event = event;
}
public void setOutQueue(Queue queue){
this.outQueue = queue;
}
private String serverRequest(String request) {
String response = "";
if (request.startsWith("GET")) {
// pass the request and generate response here
response = "response";
return response;
}
public void onEvent() throws InterruptedException{
Logger.log(this.toString(),"Recieved "+event.toString());
ByteBuffer buffer = this.event.getBuffer();
String requestId = this.event.getRequestId();
String request = this.event.getRequest();
Logger.log(this.toString(), "Recieved object inside is "+event);
String response = serverRequest(request);
buffer.clear();
buffer.put(response.getBytes());
HttpResponseEvent responseEvent= new HttpResponseEvent(requestId);
Logger.log(this.toString(), "Set new object inside "+event.toString());
outQueue.enqueu(responseEvent);
Logger.log(this.toString(), responseEvent.toString()+" added to "+outQueue.toString());
}
public void run(){
Logger.log(this.toString(), "Running "+ this.toString()+" for "+event.toString());
try{
this.onEvent();
}catch (InterruptedException ex){
}
}
}
QueueMonitor.java
import com.pasindu.queue.seda.helper.Logger;
import com.pasindu.queue.seda.queue.Queue;
public class QueueMonitor extends Thread {
private Queue[] queues;
public QueueMonitor(Queue[] queues){
this.queues= queues;
}
@Override
public void run(){
while(true){
try{
Thread.sleep(9000);
}catch(InterruptedException ex){
}
for(Queue queue: queues){
Logger.log(this.toString(), queue.getName()+" is "+queue.getNumElements());
}
}
}
}
ThreadMonitor.java
import com.pasindu.queue.seda.helper.Logger;
public class ThreadMonitor extends Thread{
private Thread [] threads;
public ThreadMonitor(Thread [] threads){
this.threads= threads;
}
@Override
public void run(){
while(true){
try{
Thread.sleep(11000);
}catch(InterruptedException ex){
}
for(Thread thread: threads){
Logger.log(this.toString(), thread.getName()+" is "+thread.getState());
}
}
}
}
HttpEventMain.java
import com.pasindu.queue.seda.queue.Queue;
import com.pasindu.queue.seda.stage.Stage;
import event.BufferEvent;
import monitor.QueueMonitor;
import monitor.ThreadMonitor;
import org.apache.commons.lang3.RandomStringUtils;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
public class HttpEventMain extends Thread
{
private InetAddress addr;
private int port;
private Selector selector;
private ConcurrentHashMap concurrentHashMapResponse;
private ConcurrentHashMap concurrentHashMapKey;
public HttpEventMain(InetAddress addr, int port) throws IOException {
this.setAddr(addr);
this.setPort(port);
this.setConcurrentHashMapResponse(new ConcurrentHashMap<>());
this.concurrentHashMapKey = new ConcurrentHashMap<>();
}
@Override
public void run(){
System.out.println("----- Running the server on machine with "+Runtime.getRuntime().availableProcessors()+" cores -----");
try {
System.out.println("\n====================Server Details====================");
System.out.println("Server Machine: "+ InetAddress.getLocalHost().getCanonicalHostName());
System.out.println("Port number: " + this.getPort());
} catch (UnknownHostException e1) {
e1.printStackTrace();
}
try {
this.startServer();
} catch (IOException e) {
System.err.println("Error occured in runnable.HttpEventMain:" + e.getMessage());
System.exit(0);
}
}
public static void main(String[] args) throws Exception
{
HttpEventMain server = new HttpEventMain(null, 4333);
server.start();
}
private void startServer() throws IOException {
this.selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
serverChannel.socket().bind(listenAddr);
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("Server ready. Ctrl-C to stop.");
Queue inQueue = new Queue(100, "In Queue");
Queue outQueue1 = new Queue(100, "Out Queue 1");
Queue outQueue2 = new Queue(100, "Out Queue 2");
Queue outQueue3 = new Queue(100, "Out Queue 3");
int batchSize = 10;
// Stage stage = new Stage(inQueue, outQueue, batchSize);
this.setName("Event Main");
Stage bufferstage = new Stage(inQueue, outQueue1, batchSize);
bufferstage.setName("bufferstage");
Stage byteArrayStage = new Stage(outQueue1, outQueue2, batchSize);
byteArrayStage.setName("byteArrayStage");
Stage httpRequestStage = new Stage(outQueue2, outQueue3, batchSize);
httpRequestStage.setName("httpRequestStage");
ResponseMannager responseMannager = new ResponseMannager(concurrentHashMapResponse, outQueue3);
responseMannager.setName("responseMannager");
Thread [] threads = {this, bufferstage, byteArrayStage, httpRequestStage, responseMannager};
ThreadMonitor monitor = new ThreadMonitor(threads);
monitor.start();
Queue [] queues = {inQueue, outQueue1, outQueue2, outQueue3};
QueueMonitor queueMonitor = new QueueMonitor(queues);
queueMonitor.start();
bufferstage.start();
byteArrayStage.start();
httpRequestStage.start();
responseMannager.start();
while (true) {
this.selector.select();
Iterator keys = this.selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();
if (! key.isValid()) {
continue;
}
if (key.isAcceptable()) {
this.accept(key);
}
else if (key.isReadable()) {
this.read(key, inQueue);
}
else if (key.isWritable()) {
this.write(key);
}
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
channel.register(this.selector, SelectionKey.OP_READ);
}
private void read(SelectionKey key, Queue inQueue) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(8192);
int numRead = -1;
try {
numRead = channel.read(buffer);
}
catch (IOException e) {
e.printStackTrace();
}
if (numRead == -1) {
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
channel.close();
key.cancel();
return;
}
String requestID = RandomStringUtils.random(32, true, true);
while(concurrentHashMapKey.containsValue(requestID) || concurrentHashMapResponse.containsKey(requestID)){
requestID = RandomStringUtils.random(15, true, true);
}
concurrentHashMapKey.put(key, requestID);
try {
inQueue.enqueu(new BufferEvent(buffer, requestID, numRead));
}catch (InterruptedException ex){
}
concurrentHashMapResponse.put(requestID, false);
channel.register(this.selector, SelectionKey.OP_WRITE, buffer);
}
private boolean responseReady(SelectionKey key){
String requestId = concurrentHashMapKey.get(key).toString();
Boolean response = (Boolean) concurrentHashMapResponse.get(requestId);
if(response==true){
concurrentHashMapKey.remove(key);
concurrentHashMapResponse.remove(requestId);
return true;
}else{
return false;
}
}
private void write(SelectionKey key) throws IOException {
if(responseReady(key)) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer inputBuffer = (ByteBuffer) key.attachment();
inputBuffer.flip();
channel.write(inputBuffer);
channel.close();
key.cancel();
}else{
}
}
public ConcurrentHashMap getConcurrentHashMapResponse() {
return concurrentHashMapResponse;
}
public void setConcurrentHashMapResponse(ConcurrentHashMap concurrentHashMapResponse) {
this.concurrentHashMapResponse = concurrentHashMapResponse;
}
public InetAddress getAddr() {
return addr;
}
public void setAddr(InetAddress addr) {
this.addr = addr;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public Selector getSelector() {
return selector;
}
public void setSelector(Selector selector) {
this.selector = selector;
}
}
ResponseMannager.java
import com.pasindu.queue.seda.helper.Logger;
import com.pasindu.queue.seda.queue.Queue;
import event.HttpResponseEvent;
import java.util.concurrent.ConcurrentHashMap;
public class ResponseMannager extends Thread{
ConcurrentHashMap concurrentHashMapResponse;
Queue inQueue;
public ResponseMannager(ConcurrentHashMap concurrentHashMap, Queue queue){
this.concurrentHashMapResponse = concurrentHashMap;
this.inQueue = queue;
}
@Override
public void run() {
while(true){
HttpResponseEvent event = null;
try {
event = (HttpResponseEvent) inQueue.dequeue();
}catch(InterruptedException ex){
}
if(event!=null) {
Logger.log(this.toString(), "Dequeued " + event.toString() + " from " + inQueue.toString());
concurrentHashMapResponse.put(event.getRequestId(), true);
Logger.log(this.toString(), "Set response availabliity for " + event.getRequestId() + " in " + concurrentHashMapResponse.toString());
}else{
try{
Thread.sleep(10);
}catch(InterruptedException ex){
}
}
}
}
}