1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  package com.mindtree.techworks.insight.receiver;
25  
26  import java.io.BufferedReader;
27  import java.io.IOException;
28  import java.io.Serializable;
29  import java.io.StringReader;
30  import java.net.DatagramPacket;
31  import java.net.DatagramSocket;
32  import java.net.SocketException;
33  import java.net.SocketTimeoutException;
34  import java.util.ArrayList;
35  import java.util.Arrays;
36  import java.util.HashMap;
37  import java.util.Iterator;
38  import java.util.LinkedList;
39  import java.util.List;
40  import java.util.Map;
41  
42  import com.mindtree.techworks.insight.model.ReceiverFormat;
43  import com.mindtree.techworks.insight.remoteprotocol.RemoteProtocolException;
44  import com.mindtree.techworks.insight.remoteprotocol.spi.DataPacket;
45  import com.mindtree.techworks.insight.remoteprotocol.spi.LogMessageDataContent;
46  import com.mindtree.techworks.insight.remoteprotocol.spi.MessageType;
47  import com.mindtree.techworks.insight.remoteprotocol.spi.NullMessageDataContent;
48  import com.mindtree.techworks.insight.remoteprotocol.spi.PacketDataContent;
49  import com.mindtree.techworks.insight.remoteprotocol.spi.PacketHeader;
50  import com.mindtree.techworks.insight.remoteprotocol.spi.StartupMessageDataContent;
51  import com.mindtree.techworks.insight.spi.LogEvent;
52  import com.mindtree.techworks.insight.spi.LogNamespace;
53  
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  
69  
70  
71  public class RemoteProtocolStreamReceiver extends AbstractReceiver {
72  	
73  	
74  
75  
76  	private static final int SOCKET_TIMEOUT = 2000;
77  
78  	
79  	
80  	
81  
82  	
83  
84  
85  	protected LinkedList packetQueue = new LinkedList ();
86  
87  	
88  
89  
90  	protected int portListeningOn;
91  
92  	
93  
94  
95  	private Map knownSources = new HashMap (4);
96  
97  	
98  
99  
100 	private Map unknownSources = new HashMap (2);
101 	
102 	
103 
104 
105 	private List loadedNamespaces = new ArrayList(2);
106 
107 	
108 
109 
110 	private ReceiverFormat[] receiverFormat;
111 
112 	
113 
114 
115 	private LogNamespace unknownNamespace;
116 
117 	
118 
119 
120 	private DatagramSocket socket;
121 
122 	
123 	
124 	
125 
126 	
127 
128 
129 
130 
131 
132 
133 
134 
135 
136 	public RemoteProtocolStreamReceiver (ReceiverFormat[] receiverFormat,
137 			int portListeningOn) throws ReceiverInitializationException {
138 
139 		this.receiverFormat = receiverFormat;
140 		this.portListeningOn = portListeningOn;
141 		this.unknownNamespace = new LogNamespace ("unknown", "unknown",
142 				receiverFormat, "unknown");
143 		this.initialize (null, true);
144 	}
145 
146 	
147 	
148 	
149 
150 	
151 
152 
153 	public LogNamespace[] getNamespaces () {
154 
155 		if (loadedNamespaces.size() > 0) {
156 			LogNamespace[] namespaces = new LogNamespace[loadedNamespaces.size()];
157 			return (LogNamespace []) loadedNamespaces.toArray(namespaces);
158 		}
159 		return new LogNamespace[] {unknownNamespace};
160 	}
161 
162 	
163 
164 
165 
166 
167 
168 	public boolean isTailing () {
169 		return true;
170 	}
171 
172 	
173 
174 
175 
176 
177 
178 	protected void initialize (LogNamespace namespace, boolean isTailing)
179 			throws ReceiverInitializationException {
180 		try {
181 			socket = new DatagramSocket (this.portListeningOn);
182 			socket.setSoTimeout(SOCKET_TIMEOUT);
183 		} catch (SocketException e) {
184 			throw new ReceiverInitializationException (
185 					"Could not open socket.", e);
186 		}		
187 	}
188 
189 	
190 
191 
192 	protected void deInitialize () {
193 		this.socket.close();
194 	}
195 
196 	
197 
198 
199 	protected LogEvent [] getNextEvents () {
200 
201 		populatePacketQueue();
202 		
203 		
204 		
205 		
206 		int packetCountToProcess = packetQueue.size ();
207 		ArrayList processedEvents = new ArrayList();
208 		for (int i = 0; i < packetCountToProcess; i++ ) {
209 			DataPacket currentPacket = null;
210 			synchronized (packetQueue) {
211 				currentPacket = (DataPacket) packetQueue.removeFirst ();
212 			}
213 			switch (currentPacket.getPacketHeader ().getMessageType ()
214 					.getType ()) {
215 				case MessageType.STARTUP_MESSAGE_TYPE:
216 					processStartupPacket (currentPacket);
217 					break;
218 				case MessageType.LOG_DATA_MESSAGE_TYPE:
219 					processedEvents.addAll (processLogPacket (currentPacket));
220 					break;
221 				case MessageType.SHUTDOWN_MESSAGE_TYPE:
222 					Short sourceId = new Short (currentPacket
223 							.getPacketHeader ().getSourceIdentifier ());
224 					knownSources.remove (sourceId);
225 					break;
226 				default:
227 			
228 			}
229 		}
230 
231 		if (null != processedEvents) {
232 			LogEvent [] loggingEvents = new LogEvent [processedEvents.size ()];
233 			return (LogEvent []) processedEvents.toArray (loggingEvents);
234 		}
235 		return null;
236 	}
237 
238 
239 	
240 
241 
242 	protected boolean hasMoreEvents () {
243 
244 		if ( !this.packetQueue.isEmpty ()) {
245 			for (Iterator packetItr = packetQueue.iterator (); packetItr
246 					.hasNext ();) {
247 				if (MessageType.LOG_DATA_MESSAGE
248 						.equals (((DataPacket) packetItr.next ())
249 								.getPacketHeader ().getMessageType ())) {
250 					return true;
251 				}
252 			}
253 			return false;
254 		}
255 		return false;
256 	}
257 
258 	
259 	
260 	
261 
262 	
263 
264 
265 
266 
267 
268 	private void processStartupPacket (DataPacket currentPacket) {
269 
270 		
271 		if ( !MessageType.STARTUP_MESSAGE.equals (currentPacket
272 				.getPacketDataContent ().getMessageType ())) {
273 			return;
274 		}
275 
276 		
277 		Short sourceId = new Short (currentPacket.getPacketHeader ()
278 				.getSourceIdentifier ());
279 		SourceDetails sourceDetails = null;
280 		if ( !knownSources.containsKey (sourceId)) {
281 			
282 			if (unknownSources.containsKey (sourceId)) {
283 				sourceDetails = (SourceDetails) unknownSources.get (sourceId);
284 				unknownSources.remove (sourceId);
285 			} else {
286 				sourceDetails = new SourceDetails (sourceId.shortValue ());
287 			}
288 		} else {
289 			sourceDetails = (SourceDetails) knownSources.get (sourceId);
290 		}
291 
292 		
293 		if ( !sourceDetails.isLogNamespaceSet ()) {
294 			StartupMessageDataContent messageData = (StartupMessageDataContent) currentPacket
295 					.getPacketDataContent ();
296 			String namespace = messageData.getNamespace ();
297 			String nodeId = namespace.split ("/")[2];
298 
299 			LogNamespace logNamespace = new LogNamespace (namespace, null,
300 					receiverFormat, nodeId);
301 			sourceDetails.setLogNamespace (logNamespace);
302 
303 			LogInterpreter logInterpreter = new LogInterpreter (logNamespace);
304 			sourceDetails.setLogInterpreter (logInterpreter);
305 
306 			knownSources.put (sourceId, sourceDetails);
307 		}
308 
309 	}
310 
311 	
312 
313 
314 
315 
316 
317 
318 	private ArrayList processLogPacket (DataPacket currentPacket) {
319 
320 		
321 		if ( !MessageType.LOG_DATA_MESSAGE.equals (currentPacket
322 				.getPacketDataContent ().getMessageType ())) {
323 			return new ArrayList (0);
324 		}
325 
326 		
327 		Short sourceId = new Short (currentPacket.getPacketHeader ()
328 				.getSourceIdentifier ());
329 		SourceDetails sourceDetails = null;
330 		if ( !knownSources.containsKey (sourceId)) {
331 			
332 			if (unknownSources.containsKey (sourceId)) {
333 				sourceDetails = (SourceDetails) unknownSources.get (sourceId);
334 			} else {
335 				sourceDetails = new SourceDetails (sourceId.shortValue ());
336 				sourceDetails.setLogNamespace (unknownNamespace);
337 				sourceDetails.setLogInterpreter (new LogInterpreter (unknownNamespace));
338 				sourceDetails.setLogNamespaceSet (false);
339 
340 				unknownSources.put (sourceId, sourceDetails);
341 			}
342 		} else {
343 			sourceDetails = (SourceDetails) knownSources.get (sourceId);
344 		}
345 
346 		
347 		LogInterpreter logInterpreter = sourceDetails.getLogInterpreter ();
348 		LogMessageDataContent logMessageDataContent = (LogMessageDataContent) currentPacket
349 				.getPacketDataContent ();
350 		ArrayList logMessages = new ArrayList ();
351 		String logData = null;
352 		
353 		try {
354 			logData = logMessageDataContent.getLogMessage ();
355 		} catch (RemoteProtocolException e) {
356 			
357 			logData = "";
358 		}
359 		
360 		
361 		if (!loadedNamespaces.contains(sourceDetails.getLogNamespace())) {
362 			loadedNamespaces.add(sourceDetails.getLogNamespace());
363 		}
364 
365 		BufferedReader logReader = new BufferedReader (new StringReader (
366 				logData));
367 		try {
368 			while (logReader.ready ()) {
369 				String line = logReader.readLine ();
370 				if (null != line) {
371 					LogEvent [] logEvents = logInterpreter
372 							.parseLogMessage(line);
373 					logMessages.addAll (Arrays.asList (logEvents));
374 				} else {
375 					break;
376 				}
377 			}
378 		} catch (IOException ioe) {
379 			
380 		}
381 
382 		return logMessages;
383 	}
384 
385 		
386 
387 
388 		private void populatePacketQueue () {
389 			DatagramPacket packet = new DatagramPacket (
390 					new byte [DataPacket.MAX_PACKET_SIZE],
391 					DataPacket.MAX_PACKET_SIZE);
392 			try {
393 				socket.receive (packet);
394 
395 				try {
396 					PacketHeader header = new PacketHeader (packet
397 							.getData ());
398 					
399 					
400 					byte [] messageData = new byte [packet.getLength ()];
401 					System.arraycopy (packet.getData (), 0, messageData, 0,
402 										packet.getLength ());
403 
404 					PacketDataContent content = null;
405 
406 					switch (header.getMessageType ().getType ()) {
407 						case MessageType.STARTUP_MESSAGE_TYPE:
408 							content = new StartupMessageDataContent (
409 									messageData, header);
410 							break;
411 						case MessageType.LOG_DATA_MESSAGE_TYPE:
412 							content = new LogMessageDataContent (
413 									messageData, header);
414 							break;
415 						case MessageType.SHUTDOWN_MESSAGE_TYPE:
416 							content = new NullMessageDataContent ();
417 							content
418 									.setMessageType (MessageType.SHUTDOWN_MESSAGE);
419 							break;
420 						default:
421 					
422 					}
423 
424 					if (null != content) {
425 						DataPacket dataPacket = new DataPacket (content,
426 								header);
427 						synchronized (packetQueue) {
428 							packetQueue.addLast (dataPacket);
429 						}
430 					}
431 				} catch (RemoteProtocolException e) {
432 					
433 
434 				}					
435 			} catch (SocketTimeoutException ste) {
436 				
437 			} catch (IOException ioex) {
438 				
439 				ioex.printStackTrace ();
440 			} catch (Exception ie) {
441 				
442 				ie.printStackTrace();
443 			}
444 		}
445 
446 	
447 
448 
449 
450 
451 
452 
453 
454 
455 
456 	protected class SourceDetails implements Serializable {
457 
458 		
459 		
460 		
461 
462 		
463 
464 
465 		private static final long serialVersionUID = -4363523350633442160L;
466 
467 		
468 		
469 		
470 
471 		
472 
473 
474 		private LogNamespace logNamespace;
475 
476 		
477 
478 
479 		private short sourceId;
480 
481 		
482 
483 
484 		private boolean isLogNamespaceSet = false;
485 
486 		
487 
488 
489 		private LogInterpreter logInterpreter;
490 
491 		
492 		
493 		
494 
495 		
496 
497 
498 
499 
500 
501 		public SourceDetails (short sourceId) {
502 
503 			this.sourceId = sourceId;
504 		}
505 
506 		
507 		
508 		
509 
510 		
511 
512 
513 
514 
515 		public boolean isLogNamespaceSet () {
516 
517 			return isLogNamespaceSet;
518 		}
519 
520 
521 		
522 
523 
524 
525 
526 
527 		public void setLogNamespaceSet (boolean isLogNamespaceSet) {
528 
529 			this.isLogNamespaceSet = isLogNamespaceSet;
530 		}
531 
532 
533 		
534 
535 
536 
537 
538 		public LogNamespace getLogNamespace () {
539 
540 			return logNamespace;
541 		}
542 
543 
544 		
545 
546 
547 
548 
549 
550 		public void setLogNamespace (LogNamespace logNamespace) {
551 
552 			this.logNamespace = logNamespace;
553 		}
554 
555 
556 		
557 
558 
559 
560 
561 		public short getSourceId () {
562 
563 			return sourceId;
564 		}
565 
566 
567 		
568 
569 
570 
571 
572 		public LogInterpreter getLogInterpreter () {
573 
574 			return logInterpreter;
575 		}
576 
577 
578 		
579 
580 
581 
582 
583 
584 		public void setLogInterpreter (LogInterpreter logInterpreter) {
585 
586 			this.logInterpreter = logInterpreter;
587 		}
588 	}
589 }