Apache Mina:如何编写 Http 服务器?

Apache Mina: How to write a Http Server?

我正在尝试使用 Apache Mina 编写一个 Http 服务器。

根据 Mina 的架构,这个任务应该有 2 个过滤器,一个用于 Http 请求传递,另一个用于处理请求并生成响应。所以使用 Mina 示例代码,我想出了以下代码,它有一个接受器、日志过滤器、Http 过滤器和一个用于处理请求的过滤器。

服务器启动正常运行,但请求没有到达 DummyHttpSever 过滤器。我试图调试,但找不到问题所在。这里出了什么问题?

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.api.AbstractIoFilter;
import org.apache.mina.api.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filterchain.ReadFilterChainController;
import org.apache.mina.http.DateUtil;
import org.apache.mina.http.HttpDecoderState;
import org.apache.mina.http.HttpServerDecoder;
import org.apache.mina.http.HttpServerEncoder;
import org.apache.mina.http.api.DefaultHttpResponse;
import org.apache.mina.http.api.HttpContentChunk;
import org.apache.mina.http.api.HttpEndOfContent;
import org.apache.mina.http.api.HttpMethod;
import org.apache.mina.http.api.HttpPdu;
import org.apache.mina.http.api.HttpRequest;
import org.apache.mina.http.api.HttpStatus;
import org.apache.mina.http.api.HttpVersion;
import org.apache.mina.transport.nio.NioTcpServer;

public class HttpTest {

    public static void main(String[] args) throws Exception {

        NioTcpServer httpServer = new NioTcpServer();
        httpServer.setReuseAddress(true);
        httpServer.setFilters(new ProtocolCodecFilter<HttpPdu, ByteBuffer, Void, HttpDecoderState>(new HttpServerEncoder(),
                        new HttpServerDecoder()), new LoggingFilter("DECODED"), new DummyHttpSever());


        httpServer.getSessionConfig().setTcpNoDelay(true);

        httpServer.bind(new InetSocketAddress(8080));

        // run for 20 seconds
        Thread.sleep(2000000000);
        httpServer.unbind();

    }

    private static class DummyHttpSever extends AbstractIoFilter {

        private HttpRequest incomingRequest;

        private List<ByteBuffer> body;

        @Override
        public void messageReceived(IoSession session, Object message, ReadFilterChainController controller) {
            if (message instanceof HttpRequest) {
                System.out.println("This shit is working");


                incomingRequest = (HttpRequest) message;
                body = new ArrayList<ByteBuffer>();

                // check if this request is going to be followed by and HTTP body or not
                if (incomingRequest.getMethod() != HttpMethod.POST && incomingRequest.getMethod() != HttpMethod.PUT) {
                    sendResponse(session, incomingRequest);
                } else {

                }
            } else if (message instanceof ByteBuffer) {
                body.add((ByteBuffer) message);
            } else if (message instanceof HttpEndOfContent) {
                // we received all the post content, send the crap back
                sendResponse(session, incomingRequest);
            }

        }

        public void sendResponse(IoSession session, HttpRequest request) {
            Map<String, String> headers = new HashMap<String, String>();
            headers.put("Server", "Apache MINA Dummy test server/0.0.");
            headers.put("Date", DateUtil.getCurrentAsString());
            headers.put("Connection", "Close");
            String strContent = "Hello ! we reply to request !";
            ByteBuffer content = ByteBuffer.wrap(strContent.getBytes());

            // compute content len
            headers.put("Content-Length", String.valueOf(content.remaining()));
            session.write(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpStatus.SUCCESS_OK, headers));
            session.write(new HttpContentChunk(content));
            session.write(new HttpEndOfContent());
            session.close(false);

        }
    }
}

此外,以下是我正在使用的依赖项。

<dependency>
            <groupId>org.apache.mina</groupId>
            <artifactId>mina-core</artifactId>
            <version>2.0.7</version>
        </dependency>

        <dependency>
            <groupId>org.apache.mina</groupId>
            <artifactId>mina-http</artifactId>
            <version>2.0.7</version>
        </dependency>

        <dependency>
            <groupId>org.apache.mina</groupId>
            <artifactId>mina-coap</artifactId>
            <version>2.0.7</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>LATEST</version>
        </dependency>

这是一个简单的Http web服务器,您可以根据需要对其进行修改。本示例是对Apache Mina示例的示例轻量级组件的修改。

Main.java

import java.net.InetSocketAddress;

import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.asyncweb.common.codec.HttpCodecFactory;
import org.apache.asyncweb.examples.lightweight.HttpProtocolHandler;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class Main {
    public static void main(String[] args) throws Exception {
        SocketAcceptor acceptor = new NioSocketAcceptor();

        acceptor.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new HttpCodecFactory()));

        acceptor.setReuseAddress(true);
        acceptor.getSessionConfig().setReuseAddress(true);
        acceptor.getSessionConfig().setReceiveBufferSize(1024);
        acceptor.getSessionConfig().setSendBufferSize(1024);
        acceptor.getSessionConfig().setTcpNoDelay(true);
        acceptor.getSessionConfig().setSoLinger(-1);
        acceptor.setBacklog(10240);

        acceptor.setHandler(new HttpProtocolHandler());
        acceptor.bind(new InetSocketAddress(9012));
    }
}

HttpProtocalHandler.java

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.future.WriteFuture;
import org.apache.asyncweb.common.HttpRequest;
import org.apache.asyncweb.common.HttpResponseStatus;
import org.apache.asyncweb.common.MutableHttpResponse;
import org.apache.asyncweb.common.DefaultHttpResponse;
import org.apache.asyncweb.common.HttpHeaderConstants;



public class HttpProtocolHandler implements IoHandler {
    private static final int CONTENT_PADDING = 0; // 101

    private final Map<Integer, IoBuffer> buffers = new ConcurrentHashMap<Integer, IoBuffer>();

    private final Timer timer;

    public HttpProtocolHandler() {
        timer = new Timer(true);
    }

    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {
        if (!(cause instanceof IOException)) {
            cause.printStackTrace();
        }
        session.close();
    }

    public Dictionary extractParameters(Map hashParameters){
        Dictionary parameters = new Hashtable();
        Iterator it = hashParameters.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry pair = (Map.Entry)it.next();
            parameters.put(pair.getKey(), ((ArrayList) pair.getValue()).get(0) );
            // it.remove(); // avoids a ConcurrentModificationException
        }

        return parameters;
    }

    public void messageReceived(IoSession session, Object message)
            throws Exception {
        HttpRequest req = (HttpRequest) message;
        String path = req.getRequestUri().getPath(); //path: /echo
        String end_point = path;
        Dictionary parameters = this.extractParameters(req.getParameters());
        String response = "";
        /* switch (end_point) {
            case "/io":
                response= new IOHandler().handleRequest(parameters);
                break;
            case "/cpu":
                response= new CPUHandler().handleRequest(parameters);
                break;
            case "/db":
                response= new DBHandler().handleRequest(parameters);
                break;
            case "/memory":
                response= new MemoryHandler().handleRequest(parameters);
                break;
            default:
                response = "No end point found";
        } */
        response = "No end point found";

        MutableHttpResponse res;


//        if (path.startsWith("/size/")) {
//            doDataResponse(session, req);
//        } else if (path.startsWith("/delay/")) {
//            doAsynchronousDelayedResponse(session, req);
//        } else if (path.startsWith("/adelay/")) {
//            doAsynchronousDelayedResponse(session, req);
//        } else {
        res = new DefaultHttpResponse();



        IoBuffer bb = IoBuffer.allocate(1024);
        bb.setAutoExpand(true);
        bb.putString(response.toString(), Charset.forName("UTF-8").newEncoder());
        bb.flip();
        res.setContent(bb);

//        res.setHeader("Pragma", "no-cache");
//        res.setHeader("Cache-Control", "no-cache");
        res.setStatus(HttpResponseStatus.OK);

        WriteFuture future = session.write(res);
        if (!HttpHeaderConstants.VALUE_KEEP_ALIVE.equalsIgnoreCase(
                res.getHeader( HttpHeaderConstants.KEY_CONNECTION))) {
            future.addListener(IoFutureListener.CLOSE);
        }

    }

    private void writeResponse(IoSession session, HttpRequest req,
            MutableHttpResponse res) {
        res.normalize(req);
        WriteFuture future = session.write(res);
        if (!HttpHeaderConstants.VALUE_KEEP_ALIVE.equalsIgnoreCase(
                res.getHeader( HttpHeaderConstants.KEY_CONNECTION))) {
            future.addListener(IoFutureListener.CLOSE);
        }
    }

    private void doDataResponse(IoSession session, HttpRequest req) {
        String path = req.getRequestUri().getPath();
        int size = Integer.parseInt(path.substring(path.lastIndexOf('/') + 1))
                + CONTENT_PADDING;

        MutableHttpResponse res = new DefaultHttpResponse();
        res.setStatus(HttpResponseStatus.OK);
        res.setHeader("ETag", "W/\"" + size + "-1164091960000\"");
        res.setHeader("Last-Modified", "Tue, 31 Nov 2006 06:52:40 GMT");

        IoBuffer buf = buffers.get(size);
        if (buf == null) {
            buf = IoBuffer.allocate(size);
            buffers.put(size, buf);
        }

        res.setContent(buf.duplicate());
        writeResponse(session, req, res);
    }

    private void doAsynchronousDelayedResponse(final IoSession session,
            final HttpRequest req) {
        String path = req.getRequestUri().getPath();
        int delay = Integer.parseInt(path.substring(path.lastIndexOf('/') + 1));

        final MutableHttpResponse res = new DefaultHttpResponse();
        res.setStatus(HttpResponseStatus.OK);
        res.setHeader("ETag", "W/\"0-1164091960000\"");
        res.setHeader("Last-Modified", "Tue, 31 Nov 2006 06:52:40 GMT");

        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                writeResponse(session, req, res);
            }
        }, delay);
    }

    public void messageSent(IoSession session, Object message) throws Exception {
    }

    public void sessionClosed(IoSession session) throws Exception {
    }

    public void sessionCreated(IoSession session) throws Exception {
    }

    public void sessionIdle(IoSession session, IdleStatus status)
            throws Exception {
        session.close();
    }

    public void sessionOpened(IoSession session) throws Exception {
        session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);
    }
}