/* This file is part of GNUnet. Copyright (C) 2012, 2013 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ package org.gnunet.util; import org.gnunet.construct.Construct; import org.gnunet.construct.MessageLoader; import org.gnunet.construct.ProtocolViolationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; /** * Extract GNUnet messages from a stream of bytes. */ public class MessageStreamTokenizer { private static final Logger logger = LoggerFactory .getLogger(MessageStreamTokenizer.class); private MstCalllback mstCalllback; private ByteBuffer buffer; GnunetMessage.Header msgh; public MessageStreamTokenizer(MstCalllback mstCalllback) { this.mstCalllback = mstCalllback; // large enough buffer for a message header this.buffer = ByteBuffer.allocate(4); } public void readAndDispatch() { Class unionClass = null; boolean found = true; try { unionClass = MessageLoader.getUnionClass(GnunetMessage.Body.class, msgh.messageType); } catch (ProtocolViolationException e) { found = false; } logger.debug("dispatching message"); if (found) { GnunetMessage msg; int oldPos = buffer.position(); try { msg = Construct.parseAs(buffer, GnunetMessage.class); } catch (OutOfMemoryError e) { throw new OutOfMemoryError("oom while parsing " + unionClass); } int parsedSize = buffer.position() - oldPos; if (parsedSize != msg.header.messageSize) { throw new AssertionError( String.format("mismatch between parsed message and header for %s: parsed size %s, header size %s", msg.body.getClass(), parsedSize, msg.header.messageSize)); } mstCalllback.onKnownMessage(msg); } else { UnknownMessageBody b = new UnknownMessageBody(); b.id = msgh.messageType; mstCalllback.onUnknownMessage(b); } } /** * Try to extract one message from the MST, call appropriate callbacks. * * @return true if message could be extracted, false if not enough data is available */ public boolean extractOne() { if (msgh == null && buffer.position() >= 4) { // prepare for reading buffer.flip(); msgh = Construct.parseAs(buffer, GnunetMessage.Header.class); // undo read buffer.position(0); logger.debug("got header in mst (t: " + msgh.messageType + ", s: " + msgh.messageSize + " (" + buffer.limit() + "/" + msgh.messageSize + " read)"); if (buffer.capacity() < msgh.messageSize) { ByteBuffer newBuf = ByteBuffer.allocate(msgh.messageSize); newBuf.put(buffer); buffer = newBuf; } else { // set pos to limit and limit to capacity and buffer.compact(); } logger.debug("buffer pos is now " + buffer.position()); } if (msgh != null && buffer.position() >= msgh.messageSize) { buffer.flip(); readAndDispatch(); msgh = null; buffer.compact(); return true; } return false; } /** * Read from a channel into the mst. Does not call any callbacks. * * @param source channel to read from * @return -1 on end of stream, number of bytes read otherwise */ public int readFrom(ReadableByteChannel source, boolean oneShot) throws IOException { int n; logger.debug("reading in mst from channel"); n = source.read(buffer); logger.debug("read {} bytes from channel", n); if (oneShot) { extractOne(); } else { while (extractOne()) { // loop } } return n; } }