BOSH bugfixes, added response timer/queue.

tong [09-12-09 02:08]
BOSH bugfixes, added response timer/queue.
Filename
jabber/BOSHConnection.hx
diff --git a/jabber/BOSHConnection.hx b/jabber/BOSHConnection.hx
index 50fff47..ec728f4 100644
--- a/jabber/BOSHConnection.hx
+++ b/jabber/BOSHConnection.hx
@@ -17,6 +17,7 @@
 */
 package jabber;

+import util.Timer;
 #if flash
 import flash.events.Event;
 import flash.events.HTTPStatusEvent;
@@ -44,9 +45,14 @@ import flash.events.SecurityErrorEvent;
 */
 class BOSHConnection extends jabber.stream.Connection {

+	static function __init__() {
+		XMLNS = "http://jabber.org/protocol/httpbind";
+		XMLNS_XMPP = "urn:xmpp:xbosh";
+	}
+
 	public static inline var BOSH_VERSION = "1.6";
-	public static inline var XMLNS = "http://jabber.org/protocol/httpbind";
-	public static inline var XMLNS_XMPP = "urn:xmpp:xbosh";
+	public static var XMLNS(default,null) : String;
+	public static var XMLNS_XMPP(default,null) : String;

 	/** BOSH path */
 	public var path(default,null) : String;
@@ -54,7 +60,7 @@ class BOSHConnection extends jabber.stream.Connection {
 	public var hold(default,null) : Int;
 	/** Longest time (in seconds) that the connection manager is allowed to wait before responding to any request during the session. */
 	public var wait(default,null) : Int;
-	/** BOSH Session id */
+	/** BOSH session id */
 	public var sid(default,null) : String;
 	/** */
 	public var maxConcurrentRequests(default,null) : Int;
@@ -63,17 +69,19 @@ class BOSHConnection extends jabber.stream.Connection {

 	var initialized : Bool;
 	var rid : Int;
-	var maxPause : Int;
 	var requestCount : Int;
-	var requestQueue : Array<String>;
-	var inactivity : Int;
-	var pauseEnabled : Bool;
+	var requestQueue : Array<Xml>;
+	var responseTimer : util.Timer;
+	var responseQueue : Array<Xml>;
 	var pollingEnabled : Bool;
-	//var responseTimer : Timer;
-	var pauseTimer : util.Timer;
+	var pauseEnabled : Bool;
+	var pauseTimer : Timer;
+	var inactivity : Int;
+	var maxPause : Int;

 	public function new( host : String, path : String,
-						 hold : Int = 1, wait : Int = 30,
+						 hold : Int = 1,
+						 wait : Int = 30,
 						 secure : Bool = true,
 						 maxConcurrentRequests : Int = 2 ) {
 		super( host );
@@ -83,18 +91,22 @@ class BOSHConnection extends jabber.stream.Connection {
 		this.secure = secure;
 		this.maxConcurrentRequests = maxConcurrentRequests;
 		initialized = false;
-		rid = Std.int( Math.random()*10000000 );
-		requestCount = 0;
-		requestQueue = new Array();
-		pauseEnabled = pollingEnabled = false;
+		pauseEnabled = false;
+		pollingEnabled = true;
 	}

 	/**
 	*/
 	public override function connect() {
-		if( connected ) {
+		if( initialized && connected ) {
 			restart();
 		} else {
+			initialized = true;
+			rid = Std.int( Math.random()*10000000 );
+			requestCount = 0;
+			requestQueue = new Array();
+			responseQueue = new Array();
+			responseTimer = new Timer( 1 );
 			var b = Xml.createElement( "body" );
 			b.set( 'xml:lang', 'en' );
 			b.set( 'xmlns', XMLNS );
@@ -106,8 +118,10 @@ class BOSHConnection extends jabber.stream.Connection {
 			b.set( 'wait', Std.string( wait ) );
 			b.set( 'to', host );
 			b.set( 'secure', Std.string( secure ) );
-			initialized = true;
-			sendRequests( b.toString() );
+			#if XMPP_DEBUG
+			XMPPDebug.out( "/BOSH/ "+b.toString() );
+			#end
+			sendRequests( b );
 		}
 	}

@@ -119,7 +133,7 @@ class BOSHConnection extends jabber.stream.Connection {
 			r.set( "type", "terminate" );
 			r.addChild( new xmpp.Presence(null,null,null,xmpp.PresenceType.unavailable).toXml() );
 			//sendQueuedRequests( r.toString() );
-			sendRequests( r.toString() );
+			sendRequests( r );
 			cleanup();
 			//onDisconnect();
 		}
@@ -128,7 +142,8 @@ class BOSHConnection extends jabber.stream.Connection {
 	/**
 	*/
 	public override function write( t : String ) : Bool {
-		return sendQueuedRequests( t );
+		 sendQueuedRequests( Xml.parse(t) ); //TODO not the xml parse again!
+		return true;
 	}

 	/**
@@ -143,10 +158,10 @@ class BOSHConnection extends jabber.stream.Connection {
 			secs = inactivity;
 		if( !pauseEnabled || secs > maxPause )
 			return false;
-		pollingEnabled = false;
+//		pollingEnabled = false;
 		var r = createRequest();
 		r.set( "pause", Std.string( secs ) );
-		sendRequests( r.toString() );
+		sendRequests( r );
 		pauseTimer = new util.Timer( secs*1000 );
 		pauseTimer.run = handlePauseTimeout;
 		return true;
@@ -159,167 +174,190 @@ class BOSHConnection extends jabber.stream.Connection {
 		r.set( 'xmlns', XMLNS );
 		r.set( "xml:lang", "en" );
 		r.set( "to", host );
-		sendRequests( r.toString() );
+		#if XMPP_DEBUG
+		XMPPDebug.out( "/BOSH/ "+r.toString() );
+		#end
+		sendRequests( r );
 	}

-	function sendQueuedRequests( ?t : String ) : Bool {
+	function sendQueuedRequests( ?t : Xml ) : Bool {
 		if( t != null )
 			requestQueue.push( t );
 		else if( requestQueue.length == 0 )
 			return false;
-		return sendRequests();
+		return sendRequests( null );
 	}

-	function sendRequests( ?t : String, poll : Bool = false ) : Bool {
+	function sendRequests( ?t : Xml, poll : Bool = false ) : Bool {
 		if( requestCount >= maxConcurrentRequests ) {
-			#if JABBER_DEBUG
-			trace( "maxConcurrentRequests limit reached ("+maxConcurrentRequests+")" );
-			#end
+			#if JABBER_DEBUG trace( "max concurrent request limit reached ("+requestCount+","+maxConcurrentRequests+")", "info" ); #end
+			//requestQueue.push(t);
 			return false;
 		}
 		requestCount++;
-		var out : Xml = null;
 		if( t == null ) {
 			if( poll ) {
-				out = createRequest();
+				t = createRequest();
 			} else {
 				var i = 0;
-				var tmp = new Array<String>();
+				var tmp = new Array<Xml>();
 				while( i++ < 10 && requestQueue.length > 0 )
 					tmp.push( requestQueue.shift() );
-				 out= createRequest( tmp );
+				 t = createRequest( tmp );
 			}
 		}
-		if( out == null )
-			out = untyped t; //HACK
-		#if flash
+		#if js
+		var r = new haxe.Http( getHTTPPath() );
+		//r.onStatus = handleHTTPStatus;
+		r.onError = handleHTTPError;
+		r.onData = handleHTTPData;
+		r.setPostData( t.toString() );
+		r.request( true );
+		#elseif flash
 		var r = new flash.net.URLRequest( getHTTPPath() );
 		r.method = flash.net.URLRequestMethod.POST;
 		r.contentType = "text/xml";
-		r.data = out.toString();
+		r.data = t.toString();
 		r.requestHeaders.push( new flash.net.URLRequestHeader( "Accept", "text/xml" ) );
 		var me = this;
 		var l = new flash.net.URLLoader();
 		l.addEventListener( Event.COMPLETE, function(e) me.handleHTTPData( e.target.data ) );
 		l.addEventListener( IOErrorEvent.IO_ERROR, handleHTTPError );
-		l.addEventListener( HTTPStatusEvent.HTTP_STATUS, function(e) me.handleHTTPStatus( e.status ) );
+		//l.addEventListener( HTTPStatusEvent.HTTP_STATUS, function(e) me.handleHTTPStatus( e.status ) );
 		l.addEventListener( SecurityErrorEvent.SECURITY_ERROR, handleHTTPError );
 		l.load( r );
-		#elseif js
-		var r = new haxe.Http( getHTTPPath() );
-		r.onStatus = handleHTTPStatus;
-		r.onError = handleHTTPError;
-		r.onData = handleHTTPData;
-	//	#if neko
-	//	r.setHeader( "Host", "127.0.0.1" );
-	//	r.setHeader( "User-Agent", "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.1.3) Gecko/20090910 Ubuntu/9.04 (jaunty) Shiretoko/3.5.3" );
-	//	r.setHeader( "Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" );
-	//	r.setHeader( "Accept-Encoding", "gzip,deflate" );
-	//	r.setHeader( "Accept-Charset", "ISO-8859-1,utf-8;q=0.7,*;q=0.7" );
-	//	r.setHeader( "Keep-Alive", "300" );
-		//r.setHeader( "Referer", "http://127.0.0.1/xiki_3/www/" );
-	//	r.setHeader( "Content-Length", Std.string( t.toString().length ) );
-	//	r.setHeader( "Content-Type", "text/plain; charset=UTF-8" );
-	//	r.noShutdown = true;
-	//	#end
-		r.setPostData( out.toString() );
-		r.request( true );
 		#end
-	//	if( poll ) {
-	//	}
 		return true;
 	}

+	/*
 	function handleHTTPStatus( s : Int ) {
 		//trace( "handleHTTPStatus "+s );
 	}
+	*/

 	function handleHTTPError( e : String ) {
-		//trace("handleHTTPError "+e);
+		trace("handleHTTPError "+e);
 		cleanup();
-		onError( e );
+		__onError( e );
 	}

 	function handleHTTPData( t : String ) {
-		var x = Xml.parse( t ).firstElement();
+		var x : Xml = null;
+		try {
+			x = Xml.parse( t ).firstElement();
+		} catch( e : Dynamic ) {
+			#if JABBER_DEBUG trace( "Invalid XML" ); #end
+			return;
+		}
+		if( x.get( "xmlns" ) != XMLNS ) {
+			#if JABBER_DEBUG trace( "Invalid BOSH body" ); #end
+			return;
+		}
 		requestCount--;
 		if( connected ) {
 			switch( x.get( "type" ) ) {
 			case "terminate" :
 				cleanup();
-				onDisconnect();
+				#if JABBER_DEBUG
+				trace( "BOSH stream terminated by server", "warn" );
+				#end
+				__onDisconnect();
 				return;
 			case "error" :
-				//TODO
+				//TODO
+				return;
 			}
 			var c = x.firstElement();
 			if( c == null ) {
-				sendQueuedRequests();
-				if( requestCount == 0 )
-					poll();
-				/*
 				if( requestCount == 0 )
 					poll();
 				else
 					sendQueuedRequests();
-				*/
 				return;
-			}
-			var b = haxe.io.Bytes.ofString( c.toString() );
-			onData( b, 0, b.length );
-			if( requestCount == 0 && !sendQueuedRequests() ) {
-				poll();
+			}
+		//	var b = haxe.io.Bytes.ofString( c.toString() );
+		//	__onData( b, 0, b.length );
+			for( e in x.elements() ) {
+				responseQueue.push( e );
+			}
+			resetResponseProcessor();
+			if( requestCount == 0 &&
+				!sendQueuedRequests() ) {
+				( responseQueue.length > 0 ) ? Timer.delay( poll, 0 ) : poll();
 			}

 		} else {
-			if( initialized ) {
-				sid = x.get( "sid" );
-				if( sid == null ) {
-					cleanup();
-					onDisconnect();
-				}
-				wait = Std.parseInt( x.get( "wait" ) );
-				var t = x.get( "ver" );
-				if( t != null &&  t != BOSH_VERSION ) {
-					onError( "Invalid BOSH version ("+t+")" );
-					return;
-				}
-				var t = x.get( "maxpause" );
-				if( t != null ) {
-					maxPause = Std.parseInt( t )*1000;
-					pauseEnabled = true;
-				}
-				var t = x.get( "requests" );
-				if( t != null ) maxConcurrentRequests = Std.parseInt( t );
-				var t = x.get( "inactivity" );
-				if( t != null ) inactivity = Std.parseInt( t );
-				onConnect();
-				connected = true;
-				var b = haxe.io.Bytes.ofString( x.toString() );
-				onData( b, 0, b.length );
-			} else {
-				//trace("????????? "+ x );
+			if( !initialized )
+				return;
+			sid = x.get( "sid" );
+			if( sid == null ) {
+				cleanup();
+				__onError( "Invalid SID" );
+				return;
+			}
+			wait = Std.parseInt( x.get( "wait" ) );
+			var t = x.get( "ver" );
+			if( t != null &&  t != BOSH_VERSION ) {
+				cleanup();
+				__onError( "Invalid BOSH version ("+t+")" );
+				return;
+			}
+			t = null;
+			t = x.get( "maxpause" );
+			if( t != null ) {
+				maxPause = Std.parseInt( t )*1000;
+				pauseEnabled = true;
 			}
+			t = null;
+			t = x.get( "requests" );
+			if( t != null ) maxConcurrentRequests = Std.parseInt( t );
+			t = null;
+			t = x.get( "inactivity" );
+			if( t != null ) inactivity = Std.parseInt( t );
+			__onConnect();
+			connected = true;
+			var b = haxe.io.Bytes.ofString( x.toString() );
+			__onData( b, 0, b.length );
 		}
 	}

 	function handlePauseTimeout() {
 		pauseTimer.stop();
-		pauseTimer = null;
 		pollingEnabled = true;
 		poll();
 	}

-	function createRequest( ?t : Iterable<String> ) : Xml {
+	function processResponse() {
+		responseTimer.stop();
+		var x = responseQueue.shift();
+		var b = haxe.io.Bytes.ofString( x.toString() );
+		__onData( b, 0, b.length );
+		resetResponseProcessor();
+	}
+
+	function resetResponseProcessor() {
+		if( responseQueue.length > 0 ) {
+			responseTimer.stop();
+			responseTimer = new Timer( 0 );
+			responseTimer.run = processResponse;
+		}
+	}
+
+	function poll() {
+		if( !connected || !pollingEnabled || requestCount > 0 || sendQueuedRequests() )
+			return;
+		sendRequests( null, true );
+	}
+
+	function createRequest( ?t : Iterable<Xml> ) : Xml {
 		var x = Xml.createElement( "body" );
 		x.set( "xmlns", XMLNS );
 		x.set( "xml:lang", "en" );
 		x.set( "rid", Std.string( ++rid ) );
 		x.set( "sid", sid );
 		if( t != null ) {
-			for( e in t ) {
-				x.addChild( Xml.createPCData( e ) );
-			}
+			for( e in t ) { x.addChild( e ); }
 		}
 		return x;
 	}
@@ -333,20 +371,13 @@ class BOSHConnection extends jabber.stream.Connection {
 		return b.toString();
 	}

-	inline function poll() {
-		if( pollingEnabled ) sendRequests( null, true );
-	}
-
 	function cleanup() {
-		if( pauseTimer != null ) {
-			pauseTimer.stop();
-			pauseTimer = null;
-		}
+		responseTimer.stop();
 		connected = initialized = false;
 		sid = null;
-		rid = Std.int( Math.random()*10000000 );
 		requestCount = 0;
-		requestQueue = new Array();
+		requestQueue = null;
+		responseQueue = null;
 	}

 }
ViewGit