View Javadoc

1   /*
2    * $HeadURL: $
3    * $Date: $
4    * $Revision: $
5    * $Author: $
6    * 
7    * Copyright (c) 2005 MindTree Consulting Ltd. 
8    * 
9    * This file is part of Insight.
10   * 
11   * Insight is free software: you can redistribute it and/or modify it under the 
12   * terms of the GNU General Public License as published by the Free Software 
13   * Foundation, either version 3 of the License, or (at your option) any later 
14   * version.
15   * 
16   * Insight is distributed in the hope that it will be useful, but 
17   * WITHOUT ANY WARRANTY; without even the implied warranty of 
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General 
19   * Public License for more details.
20   * 
21   * You should have received a copy of the GNU General Public License along with 
22   * Insight.  If not, see <http://www.gnu.org/licenses/>.
23   */
24  package com.mindtree.techworks.insight.receiver;
25  
26  import java.io.BufferedReader;
27  import java.io.IOException;
28  import java.io.Serializable;
29  import java.io.StringReader;
30  import java.net.DatagramPacket;
31  import java.net.DatagramSocket;
32  import java.net.SocketException;
33  import java.net.SocketTimeoutException;
34  import java.util.ArrayList;
35  import java.util.Arrays;
36  import java.util.HashMap;
37  import java.util.Iterator;
38  import java.util.LinkedList;
39  import java.util.List;
40  import java.util.Map;
41  
42  import com.mindtree.techworks.insight.model.ReceiverFormat;
43  import com.mindtree.techworks.insight.remoteprotocol.RemoteProtocolException;
44  import com.mindtree.techworks.insight.remoteprotocol.spi.DataPacket;
45  import com.mindtree.techworks.insight.remoteprotocol.spi.LogMessageDataContent;
46  import com.mindtree.techworks.insight.remoteprotocol.spi.MessageType;
47  import com.mindtree.techworks.insight.remoteprotocol.spi.NullMessageDataContent;
48  import com.mindtree.techworks.insight.remoteprotocol.spi.PacketDataContent;
49  import com.mindtree.techworks.insight.remoteprotocol.spi.PacketHeader;
50  import com.mindtree.techworks.insight.remoteprotocol.spi.StartupMessageDataContent;
51  import com.mindtree.techworks.insight.spi.LogEvent;
52  import com.mindtree.techworks.insight.spi.LogNamespace;
53  
54  
55  /**
56   * This is a concrete implementation of the
57   * <code>com.mindtree.techworks.insight.receiver.AbstractReceiver</code> class. It
58   * supports listening on an UDP port and receiving log events in the form of
59   * DataPackets as defined in
60   * <code>com.mindtree.techworks.insight.remoteprotocol.spi.DataPacket</code>.
61   * <p>
62   * This class supports receiving from multiple namespaces at the same time, and
63   * also from multiple sources at the same time.
64   * </p>
65   * 
66   * @see com.mindtree.techworks.insight.receiver.AbstractReceiver
67   * @see com.mindtree.techworks.insight.remoteprotocol.spi.DataPacket
68   * @author <a href="mailto:bindul_bhowmik@mindtree.com">Bindul Bhowmik</a>
69   * @version $Revision: 27 $ $Date: 2007-12-16 04:58:03 -0700 (Sun, 16 Dec 2007) $
70   */
71  public class RemoteProtocolStreamReceiver extends AbstractReceiver {
72  	
73  	/**
74  	 * The default timeout set on the datagram socket
75  	 */
76  	private static final int SOCKET_TIMEOUT = 2000;
77  
78  	// -------------------------------------------------------------------------
79  	// Instance variables
80  	// -------------------------------------------------------------------------
81  
82  	/**
83  	 * The queue containing all packets that are read.
84  	 */
85  	protected LinkedList packetQueue = new LinkedList ();
86  
87  	/**
88  	 * The port on which the receiver is listening
89  	 */
90  	protected int portListeningOn;
91  
92  	/**
93  	 * Map of known sources from which this receiver is receiving data.
94  	 */
95  	private Map knownSources = new HashMap (4);
96  
97  	/**
98  	 * Map of unknown sources from which we are getting data.
99  	 */
100 	private Map unknownSources = new HashMap (2);
101 	
102 	/**
103 	 * Has a list of all loaded namespaces.
104 	 */
105 	private List loadedNamespaces = new ArrayList(2);
106 
107 	/**
108 	 * The current receiver format being used to interpret log messages.
109 	 */
110 	private ReceiverFormat[] receiverFormat;
111 
112 	/**
113 	 * Namespace used for messages for which no log namespace has been defined.
114 	 */
115 	private LogNamespace unknownNamespace;
116 
117 	/**
118 	 * The datagram socket used by this class to listen to a port
119 	 */
120 	private DatagramSocket socket;
121 
122 	// -------------------------------------------------------------------------
123 	// Constructor
124 	// -------------------------------------------------------------------------
125 
126 	/**
127 	 * Creates an instance of the Receiver
128 	 * 
129 	 * @param receiverFormat
130 	 *            The Receiver format used for the receiver.
131 	 * @param portListeningOn
132 	 *            The port on which to listen.
133 	 * @throws ReceiverInitializationException
134 	 *             If the Receiver could not be initialized properly.
135 	 */
136 	public RemoteProtocolStreamReceiver (ReceiverFormat[] receiverFormat,
137 			int portListeningOn) throws ReceiverInitializationException {
138 
139 		this.receiverFormat = receiverFormat;
140 		this.portListeningOn = portListeningOn;
141 		this.unknownNamespace = new LogNamespace ("unknown", "unknown",
142 				receiverFormat, "unknown");
143 		this.initialize (null, true);
144 	}
145 
146 	// -------------------------------------------------------------------------
147 	// Methods overridden from com.mindtree.techworks.insight.receiver.AbstractReceiver
148 	// -------------------------------------------------------------------------
149 
150 	/**
151 	 * @see com.mindtree.techworks.insight.receiver.AbstractReceiver#getNamespaces()
152 	 */
153 	public LogNamespace[] getNamespaces () {
154 
155 		if (loadedNamespaces.size() > 0) {
156 			LogNamespace[] namespaces = new LogNamespace[loadedNamespaces.size()];
157 			return (LogNamespace []) loadedNamespaces.toArray(namespaces);
158 		}
159 		return new LogNamespace[] {unknownNamespace};
160 	}
161 
162 	/**
163 	 * Always returns <code>true</code> since the RemoteProtocolStreamReceiver
164 	 * is by nature tailing.
165 	 * 
166 	 * @see com.mindtree.techworks.insight.receiver.AbstractReceiver#isTailing()
167 	 */
168 	public boolean isTailing () {
169 		return true;
170 	}
171 
172 	/**
173 	 * Initializes this instance. Both the parameters are ignored.
174 	 * 
175 	 * @see com.mindtree.techworks.insight.receiver.AbstractReceiver#initialize(com.mindtree.techworks.insight.spi.LogNamespace,
176 	 *      boolean)
177 	 */
178 	protected void initialize (LogNamespace namespace, boolean isTailing)
179 			throws ReceiverInitializationException {
180 		try {
181 			socket = new DatagramSocket (this.portListeningOn);
182 			socket.setSoTimeout(SOCKET_TIMEOUT);
183 		} catch (SocketException e) {
184 			throw new ReceiverInitializationException (
185 					"Could not open socket.", e);
186 		}		
187 	}
188 
189 	/**
190 	 * @see com.mindtree.techworks.insight.receiver.AbstractReceiver#deInitialize()
191 	 */
192 	protected void deInitialize () {
193 		this.socket.close();
194 	}
195 
196 	/**
197 	 * @see com.mindtree.techworks.insight.receiver.AbstractReceiver#getNextEvents()
198 	 */
199 	protected LogEvent [] getNextEvents () {
200 
201 		populatePacketQueue();
202 		
203 		// Check the current size of the packet queue and start processing.
204 		// There might be packets received during the period, but they will be
205 		// processed on the next call to this method.
206 		int packetCountToProcess = packetQueue.size ();
207 		ArrayList processedEvents = new ArrayList();
208 		for (int i = 0; i < packetCountToProcess; i++ ) {
209 			DataPacket currentPacket = null;
210 			synchronized (packetQueue) {
211 				currentPacket = (DataPacket) packetQueue.removeFirst ();
212 			}
213 			switch (currentPacket.getPacketHeader ().getMessageType ()
214 					.getType ()) {
215 				case MessageType.STARTUP_MESSAGE_TYPE:
216 					processStartupPacket (currentPacket);
217 					break;
218 				case MessageType.LOG_DATA_MESSAGE_TYPE:
219 					processedEvents.addAll (processLogPacket (currentPacket));
220 					break;
221 				case MessageType.SHUTDOWN_MESSAGE_TYPE:
222 					Short sourceId = new Short (currentPacket
223 							.getPacketHeader ().getSourceIdentifier ());
224 					knownSources.remove (sourceId);
225 					break;
226 				default:
227 			// do nothing!
228 			}
229 		}
230 
231 		if (null != processedEvents) {
232 			LogEvent [] loggingEvents = new LogEvent [processedEvents.size ()];
233 			return (LogEvent []) processedEvents.toArray (loggingEvents);
234 		}
235 		return null;
236 	}
237 
238 
239 	/**
240 	 * @see com.mindtree.techworks.insight.receiver.AbstractReceiver#hasMoreEvents()
241 	 */
242 	protected boolean hasMoreEvents () {
243 
244 		if ( !this.packetQueue.isEmpty ()) {
245 			for (Iterator packetItr = packetQueue.iterator (); packetItr
246 					.hasNext ();) {
247 				if (MessageType.LOG_DATA_MESSAGE
248 						.equals (((DataPacket) packetItr.next ())
249 								.getPacketHeader ().getMessageType ())) {
250 					return true;
251 				}
252 			}
253 			return false;
254 		}
255 		return false;
256 	}
257 
258 	// -------------------------------------------------------------------------
259 	// Private utility methods
260 	// -------------------------------------------------------------------------
261 
262 	/**
263 	 * This method processes the startup message from a receiver.
264 	 * 
265 	 * @param currentPacket
266 	 *            The current packet, must be a startup packet.
267 	 */
268 	private void processStartupPacket (DataPacket currentPacket) {
269 
270 		// Check the packet type
271 		if ( !MessageType.STARTUP_MESSAGE.equals (currentPacket
272 				.getPacketDataContent ().getMessageType ())) {
273 			return;
274 		}
275 
276 		// Get the source details object or create it
277 		Short sourceId = new Short (currentPacket.getPacketHeader ()
278 				.getSourceIdentifier ());
279 		SourceDetails sourceDetails = null;
280 		if ( !knownSources.containsKey (sourceId)) {
281 			// Check in unknown sources
282 			if (unknownSources.containsKey (sourceId)) {
283 				sourceDetails = (SourceDetails) unknownSources.get (sourceId);
284 				unknownSources.remove (sourceId);
285 			} else {
286 				sourceDetails = new SourceDetails (sourceId.shortValue ());
287 			}
288 		} else {
289 			sourceDetails = (SourceDetails) knownSources.get (sourceId);
290 		}
291 
292 		// Check if namespace is set
293 		if ( !sourceDetails.isLogNamespaceSet ()) {
294 			StartupMessageDataContent messageData = (StartupMessageDataContent) currentPacket
295 					.getPacketDataContent ();
296 			String namespace = messageData.getNamespace ();
297 			String nodeId = namespace.split ("/")[2];
298 
299 			LogNamespace logNamespace = new LogNamespace (namespace, null,
300 					receiverFormat, nodeId);
301 			sourceDetails.setLogNamespace (logNamespace);
302 
303 			LogInterpreter logInterpreter = new LogInterpreter (logNamespace);
304 			sourceDetails.setLogInterpreter (logInterpreter);
305 
306 			knownSources.put (sourceId, sourceDetails);
307 		}
308 
309 	}
310 
311 	/**
312 	 * Processes a log message packet
313 	 * 
314 	 * @param currentPacket
315 	 *            The log message packet to process
316 	 * @return The lsit of messages interpretted.
317 	 */
318 	private ArrayList processLogPacket (DataPacket currentPacket) {
319 
320 		// Check the packet type
321 		if ( !MessageType.LOG_DATA_MESSAGE.equals (currentPacket
322 				.getPacketDataContent ().getMessageType ())) {
323 			return new ArrayList (0);
324 		}
325 
326 		// Get the source details object
327 		Short sourceId = new Short (currentPacket.getPacketHeader ()
328 				.getSourceIdentifier ());
329 		SourceDetails sourceDetails = null;
330 		if ( !knownSources.containsKey (sourceId)) {
331 			// Check unknown sources
332 			if (unknownSources.containsKey (sourceId)) {
333 				sourceDetails = (SourceDetails) unknownSources.get (sourceId);
334 			} else {
335 				sourceDetails = new SourceDetails (sourceId.shortValue ());
336 				sourceDetails.setLogNamespace (unknownNamespace);
337 				sourceDetails.setLogInterpreter (new LogInterpreter (unknownNamespace));
338 				sourceDetails.setLogNamespaceSet (false);
339 
340 				unknownSources.put (sourceId, sourceDetails);
341 			}
342 		} else {
343 			sourceDetails = (SourceDetails) knownSources.get (sourceId);
344 		}
345 
346 		// Get the LogInterpreter and the message
347 		LogInterpreter logInterpreter = sourceDetails.getLogInterpreter ();
348 		LogMessageDataContent logMessageDataContent = (LogMessageDataContent) currentPacket
349 				.getPacketDataContent ();
350 		ArrayList logMessages = new ArrayList ();
351 		String logData = null;
352 		
353 		try {
354 			logData = logMessageDataContent.getLogMessage ();
355 		} catch (RemoteProtocolException e) {
356 			// TODO Log this here
357 			logData = "";
358 		}
359 		
360 		// Add the namespace to the loaded namespaces if not already existing
361 		if (!loadedNamespaces.contains(sourceDetails.getLogNamespace())) {
362 			loadedNamespaces.add(sourceDetails.getLogNamespace());
363 		}
364 
365 		BufferedReader logReader = new BufferedReader (new StringReader (
366 				logData));
367 		try {
368 			while (logReader.ready ()) {
369 				String line = logReader.readLine ();
370 				if (null != line) {
371 					LogEvent [] logEvents = logInterpreter
372 							.parseLogMessage(line);
373 					logMessages.addAll (Arrays.asList (logEvents));
374 				} else {
375 					break;
376 				}
377 			}
378 		} catch (IOException ioe) {
379 			// Just ignore it and send the messages we have interpretted!
380 		}
381 
382 		return logMessages;
383 	}
384 
385 		/**
386 		 * @see java.lang.Thread#run()
387 		 */
388 		private void populatePacketQueue () {
389 			DatagramPacket packet = new DatagramPacket (
390 					new byte [DataPacket.MAX_PACKET_SIZE],
391 					DataPacket.MAX_PACKET_SIZE);
392 			try {
393 				socket.receive (packet);
394 
395 				try {
396 					PacketHeader header = new PacketHeader (packet
397 							.getData ());
398 					// TODO Change this implementation to get the packet
399 					// TODO See java.nio
400 					byte [] messageData = new byte [packet.getLength ()];
401 					System.arraycopy (packet.getData (), 0, messageData, 0,
402 										packet.getLength ());
403 
404 					PacketDataContent content = null;
405 
406 					switch (header.getMessageType ().getType ()) {
407 						case MessageType.STARTUP_MESSAGE_TYPE:
408 							content = new StartupMessageDataContent (
409 									messageData, header);
410 							break;
411 						case MessageType.LOG_DATA_MESSAGE_TYPE:
412 							content = new LogMessageDataContent (
413 									messageData, header);
414 							break;
415 						case MessageType.SHUTDOWN_MESSAGE_TYPE:
416 							content = new NullMessageDataContent ();
417 							content
418 									.setMessageType (MessageType.SHUTDOWN_MESSAGE);
419 							break;
420 						default:
421 					// do nothing
422 					}
423 
424 					if (null != content) {
425 						DataPacket dataPacket = new DataPacket (content,
426 								header);
427 						synchronized (packetQueue) {
428 							packetQueue.addLast (dataPacket);
429 						}
430 					}
431 				} catch (RemoteProtocolException e) {
432 					// Do nothing
433 
434 				}					
435 			} catch (SocketTimeoutException ste) {
436 				// Try again.
437 			} catch (IOException ioex) {
438 				// Do nothing!
439 				ioex.printStackTrace ();
440 			} catch (Exception ie) {
441 				// Someone wants us to stop
442 				ie.printStackTrace();
443 			}
444 		}
445 
446 	/**
447 	 * Inner class to hold the different objects in relation to a particular
448 	 * source including the <code>LogNamespace</code>,
449 	 * <code>LogInterpreter</code> (including the <code>ReceiverFormat</code>)
450 	 * and if a startup message has been received for this source.
451 	 * 
452 	 * @see Serializable
453 	 * @author <a href="mailto:bindul_bhowmik@mindtree.com">Bindul Bhowmik</a>
454 	 * @version $Revision: 27 $ $Date: 2007-12-16 04:58:03 -0700 (Sun, 16 Dec 2007) $
455 	 */
456 	protected class SourceDetails implements Serializable {
457 
458 		// ---------------------------------------------------------------------
459 		// Class variables
460 		// ---------------------------------------------------------------------
461 
462 		/**
463 		 * The serial version UID for the class
464 		 */
465 		private static final long serialVersionUID = -4363523350633442160L;
466 
467 		// ---------------------------------------------------------------------
468 		// Instance variables
469 		// ---------------------------------------------------------------------
470 
471 		/**
472 		 * The namespace object associated with the source.
473 		 */
474 		private LogNamespace logNamespace;
475 
476 		/**
477 		 * The source id of the identifier
478 		 */
479 		private short sourceId;
480 
481 		/**
482 		 * Indicates if a startup message has been received for this source.
483 		 */
484 		private boolean isLogNamespaceSet = false;
485 
486 		/**
487 		 * The log interpreter used for this source.
488 		 */
489 		private LogInterpreter logInterpreter;
490 
491 		// ---------------------------------------------------------------------
492 		// Constructors
493 		// ---------------------------------------------------------------------
494 
495 		/**
496 		 * Creates an instance of this source details.
497 		 * 
498 		 * @param sourceId
499 		 *            The identifier of the source.
500 		 */
501 		public SourceDetails (short sourceId) {
502 
503 			this.sourceId = sourceId;
504 		}
505 
506 		// ---------------------------------------------------------------------
507 		// Accessors
508 		// ---------------------------------------------------------------------
509 
510 		/**
511 		 * Returns the isLogNamespaceSet
512 		 * 
513 		 * @return Returns the isLogNamespaceSet.
514 		 */
515 		public boolean isLogNamespaceSet () {
516 
517 			return isLogNamespaceSet;
518 		}
519 
520 
521 		/**
522 		 * Sets the isLogNamespaceSet
523 		 * 
524 		 * @param isLogNamespaceSet
525 		 *            The isLogNamespaceSet to set.
526 		 */
527 		public void setLogNamespaceSet (boolean isLogNamespaceSet) {
528 
529 			this.isLogNamespaceSet = isLogNamespaceSet;
530 		}
531 
532 
533 		/**
534 		 * Returns the logNamespace
535 		 * 
536 		 * @return Returns the logNamespace.
537 		 */
538 		public LogNamespace getLogNamespace () {
539 
540 			return logNamespace;
541 		}
542 
543 
544 		/**
545 		 * Sets the logNamespace
546 		 * 
547 		 * @param logNamespace
548 		 *            The logNamespace to set.
549 		 */
550 		public void setLogNamespace (LogNamespace logNamespace) {
551 
552 			this.logNamespace = logNamespace;
553 		}
554 
555 
556 		/**
557 		 * Returns the sourceId
558 		 * 
559 		 * @return Returns the sourceId.
560 		 */
561 		public short getSourceId () {
562 
563 			return sourceId;
564 		}
565 
566 
567 		/**
568 		 * Returns the logInterpreter
569 		 * 
570 		 * @return Returns the logInterpreter.
571 		 */
572 		public LogInterpreter getLogInterpreter () {
573 
574 			return logInterpreter;
575 		}
576 
577 
578 		/**
579 		 * Sets the logInterpreter
580 		 * 
581 		 * @param logInterpreter
582 		 *            The logInterpreter to set.
583 		 */
584 		public void setLogInterpreter (LogInterpreter logInterpreter) {
585 
586 			this.logInterpreter = logInterpreter;
587 		}
588 	}
589 }