1 /**
2  * Stream
3  * 
4  * Copyright:
5  * (C) 2012-2015 Tatsuhiro Tsujikawa
6  * (C) 2014-2015 Etienne Cimon
7  *
8  * License: 
9  * Distributed under the terms of the MIT license with an additional section 1.2 of the curl/libcurl project. 
10  * Consult the provided LICENSE.md file for details
11  */
12 module libhttp2.stream;
13 
14 import libhttp2.constants;
15 import libhttp2.types;
16 import libhttp2.frame;
17 import libhttp2.session;
18 import std.algorithm : max;
19 
20 const MAX_DEP_TREE_LENGTH = 100;
21 
22 class StreamRoots
23 {
24 	void free() { }
25 
26     void add(Stream stream) {
27         if (head) {
28             stream.m_root_next = head;
29             head.m_root_prev = stream;
30         }
31         
32         head = stream;
33     }
34     
35 	void remove(Stream stream) 
36     {
37         Stream root_prev, root_next;
38         
39         root_prev = stream.m_root_prev;
40         root_next = stream.m_root_next;
41         
42         if (root_prev) {
43             root_prev.m_root_next = root_next;
44             
45             if (root_next) {
46                 root_next.m_root_prev = root_prev;
47             }
48         } else {
49             if (root_next) {
50                 root_next.m_root_prev = null;
51             }
52             
53             head = root_next;
54         }
55         
56         stream.m_root_prev = null;
57         stream.m_root_next = null;
58     }
59     
60     void removeAll() {
61         Stream si, next;
62         
63         for (si = head; si;) {
64             next = si.m_root_next;
65             
66             si.m_root_prev = null;
67             si.m_root_next = null;
68             
69             si = next;
70         }
71         
72         head = null;
73     }
74 
75     Stream head;
76     int num_streams;
77 }
78 
79 class Stream {
80 
81 	this(int stream_id,
82 		StreamFlags flags,
83 		StreamState initial_state,
84 		int weight,
85 		StreamRoots roots,
86 		int remote_initial_window_size,
87 		int local_initial_window_size,
88 		void *stream_user_data)
89 	{
90 		initialize(stream_id, flags, initial_state, weight, roots, remote_initial_window_size, local_initial_window_size, stream_user_data);
91 	}
92 
93 	void free() { userData = null; } // We don't free stream.item. It is deleted in ActiveOutboundItem.reset(), Sessioin.free() or PriorityQueue
94 
95     package void initialize(int stream_id,
96 							StreamFlags flags,
97 							StreamState initial_state,
98 							int weight,
99 							StreamRoots roots,
100 							int remote_initial_window_size,
101 					        int local_initial_window_size,
102 					        void *stream_user_data) 
103 	{
104         m_id = stream_id;
105         m_flags = flags;
106         m_state = initial_state;
107         m_weight = weight;
108         m_effective_weight = m_weight;
109 		m_roots = roots;
110 		m_remote_window_size = remote_initial_window_size;
111 		m_local_window_size = local_initial_window_size;
112 		m_stream_user_data = stream_user_data;
113     }
114     
115     /*
116      * Disallow either further receptions or transmissions, or both.
117      * |flag| is bitwise OR of one or more of ShutdownFlag.
118      */
119     void shutdown(ShutdownFlag flag)
120 	{
121         m_shut_flags |= flag;
122     }
123 
124     /*
125      * Computes distributed weight of a stream of the |weight| under the
126      * $(D Stream) if $(D Stream) is removed from a dependency tree.  The result
127      * is computed using m_weight rather than m_effective_weight.
128      */
129     int distributedWeight(int weight) 
130 	{
131         weight = m_weight * weight / m_sum_dep_weight;
132         
133         return max(1, weight);
134     }
135     
136     /*
137 	 * Computes effective weight of a stream of the |weight| under the
138 	 * $(D Stream).  The result is computed using m_effective_weight
139 	 * rather than m_weight.  This function is used to determine
140 	 * weight in dependency tree.
141 	 */
142     int distributedEffectiveWeight(int weight) {
143         if (m_sum_norest_weight == 0)
144             return m_effective_weight;        
145         weight = m_effective_weight * weight / m_sum_norest_weight;
146         
147         return max(1, weight);
148     }
149     
150 
151     
152     /*
153 	 * Attaches |item| to $(D Stream).  Updates dpri members in this
154 	 * dependency tree.
155 	 */
156     void attachItem(OutboundItem item, Session session) 
157 	{
158         assert((m_flags & StreamFlags.DEFERRED_ALL) == 0);
159         assert(!m_item);
160         
161         LOGF("stream: stream=%d attach item=%s", m_id, item);
162         
163         m_item = item;
164         
165 		updateOnAttachItem(session);
166     }
167     
168     /*
169 	 * Detaches |m_item|.  Updates dpri members in this dependency
170 	 * tree.  This function does not free |m_item|.  The caller must
171 	 * free it.
172 	 */
173     void detachItem(Session session) 
174 	{
175         LOGF("stream: stream=%d detach item=%s", m_id, m_item);
176         
177         m_item = null;
178         m_flags &= ~StreamFlags.DEFERRED_ALL;
179         
180 		updateDepOnDetachItem(session);
181     }
182     
183     /*
184 	 * Defer |m_item|.  We won't call this function in the situation
185 	 * where |m_item| is null.  The |flags| is bitwise OR of zero or
186 	 * more of StreamFlags.DEFERRED_USER and
187 	 * StreamFlags.DEFERRED_FLOW_CONTROL.  The |flags| indicates
188 	 * the reason of this action.
189 	 */
190     void deferItem(StreamFlags flags, Session session) 
191 	{
192         assert(m_item);
193         
194         LOGF("stream: stream=%d defer item=%s cause=%02x", m_id, m_item, flags);
195         
196         m_flags |= flags;
197         
198 		updateDepOnDetachItem(session);
199     }
200     
201     /*
202 	 * Put back deferred data in this stream to active state.  The |flags|
203 	 * are one or more of bitwise OR of the following values:
204 	 * StreamFlags.DEFERRED_USER and
205 	 * StreamFlags.DEFERRED_FLOW_CONTROL and given masks are
206 	 * cleared if they are set.  So even if this function is called, if
207 	 * one of flag is still set, data does not become active.
208 	 */
209     void resumeDeferredItem(StreamFlags flag, Session session)
210 	{
211         assert(m_item);
212         
213         LOGF("stream: stream=%d resume item=%s flags=%02x", m_id, m_item, flags);
214         
215         m_flags &= ~flags;
216         
217         if (m_flags & StreamFlags.DEFERRED_ALL) {
218             return;
219         }
220         
221 		updateOnAttachItem(session);
222     }
223     
224     /*
225 	 * Returns nonzero if item is deferred by whatever reason.
226 	 */
227     bool isItemDeferred() 
228 	{
229         return m_item && (m_flags & StreamFlags.DEFERRED_ALL);
230     }
231     
232     /*
233 	 * Returns nonzero if item is deferred by flow control.
234 	 */
235     bool isDeferredByFlowControl() 
236 	{
237         return m_item && (m_flags & StreamFlags.DEFERRED_FLOW_CONTROL);
238     }
239 
240     
241     /*
242 	 * Updates the remote window size with the new value
243 	 * |new_initial_window_size|. The |old_initial_window_size| is used to
244 	 * calculate the current window size.
245 	 *
246 	 * This function returns true if it succeeds or false. The failure is due to
247 	 * overflow.
248 	 */
249     bool updateRemoteInitialWindowSize(int new_initial_window_size, int old_initial_window_size)
250 	{
251 		return updateInitialWindowSize(m_remote_window_size, new_initial_window_size, old_initial_window_size);
252     }
253     
254     /*
255 	 * Updates the local window size with the new value
256 	 * |new_initial_window_size|. The |old_initial_window_size| is used to
257 	 * calculate the current window size.
258 	 *
259 	 * This function returns true if it succeeds or false. The failure is due to
260 	 * overflow.
261 	 */
262     bool updateLocalInitialWindowSize(int new_initial_window_size, int old_initial_window_size) 
263 	{
264 		return updateInitialWindowSize(m_local_window_size, new_initial_window_size, old_initial_window_size);
265     }
266     
267     /*
268      * Call this function if promised stream $(D Stream) is replied with
269      * HEADERS.  This function changes the state of the $(D Stream) to
270      * OPENED.
271      */
272     void promiseFulfilled() {
273 		m_state = StreamState.OPENED;
274         m_flags &= ~StreamFlags.PUSH;
275     }
276     
277     /*
278      * Returns the stream positioned in root of the dependency tree the
279      * $(D Stream) belongs to.
280      */
281     Stream getRoot() {
282 		Stream stream = this;
283         for (;;) {
284             if (stream.m_sib_prev) {
285                 stream = stream.m_sib_prev;
286                 
287                 continue;
288             }
289             
290             if (stream.m_dep_prev) {
291                 stream = stream.m_dep_prev;
292                 
293                 continue;
294             }
295             
296             break;
297         }
298         
299         return stream;
300     }
301     
302     /*
303      * Returns true if |target| is found in subtree of $(D Stream).
304      */
305     bool subtreeContains(Stream target) {
306         
307         if (this is target)
308             return true;
309         
310 		if (m_sib_next && m_sib_next.subtreeContains(target))
311             return true;
312         
313 		return m_dep_next?m_dep_next.subtreeContains(target):false;
314     }
315     
316     /*
317      * Makes the $(D Stream) depend on the |dep_stream|.  This dependency is
318      * exclusive.  All existing direct descendants of |dep_stream| become
319      * the descendants of the $(D Stream).  This function assumes
320      * |m_data| is null and no dpri members are changed in this
321      * dependency tree.
322      */
323     void insert(Stream stream) {
324         Stream si;
325         Stream root_stream;
326         
327         assert(!m_item);
328         
329 		LOGF("stream: dep_insert dep_stream(%s)=%d, stream(%s)=%d", this, m_id, stream, stream.m_id);
330         
331 		stream.m_sum_dep_weight = m_sum_dep_weight;
332 		m_sum_dep_weight = stream.m_weight;
333         
334 		if (m_dep_next) {
335 			for (si = m_dep_next; si; si = si.m_sib_next) {
336                 stream.m_num_substreams += si.m_num_substreams;
337             }
338             
339 			stream.m_dep_next = m_dep_next;
340             stream.m_dep_next.m_dep_prev = stream;
341         }
342         
343 		m_dep_next = stream;
344         stream.m_dep_prev = this;
345         
346 		root_stream = updateLength(1);
347         
348 		root_stream.updateSumNorestWeight();
349 		root_stream.updateEffectiveWeight();
350         
351         ++stream.m_roots.num_streams;
352     }
353      
354     /*
355      * Makes the $(D Stream) depend on the |dep_stream|.  This dependency is
356      * not exclusive.  This function assumes |m_data| is null and no
357      * dpri members are changed in this dependency tree.
358      */
359     void add(Stream stream) {
360         Stream root_stream;
361         
362         assert(!stream.m_item);
363         
364         LOGF("stream: dep_add dep_stream(%s=%d, stream(%s)=%d", this, m_id, stream, stream.m_id);
365         
366         root_stream = updateLength(1);
367         
368 		m_sum_dep_weight += stream.m_weight;
369         
370         if (!m_dep_next) {
371             linkDependency(stream);
372         } else {
373             insertLinkDependency(stream);
374         }
375         
376 		root_stream.updateSumNorestWeight();
377 		root_stream.updateEffectiveWeight();
378         
379         ++stream.m_roots.num_streams;
380     }
381     
382     /*
383      * Removes the $(D Stream) from the current dependency tree.  This
384      * function assumes |m_data| is null.
385      */
386 	void remove() {
387         Stream prev, next, dep_prev, si, root_stream;
388         int sum_dep_weight_delta;
389         
390         LOGF("stream: dep_remove stream(%s=%d", this, m_id);
391         
392         /* Distribute weight of $(D Stream) to direct descendants */
393         sum_dep_weight_delta = -m_weight;
394         
395         for (si = m_dep_next; si; si = si.m_sib_next) {
396             si.m_weight = distributedWeight(si.m_weight);
397             
398             sum_dep_weight_delta += si.m_weight;
399         }
400         
401         prev = firstSibling();
402         
403         dep_prev = prev.m_dep_prev;
404         
405         if (dep_prev) {
406 			root_stream = dep_prev.updateLength(-1);
407             
408             dep_prev.m_sum_dep_weight += sum_dep_weight_delta;
409         }
410         
411         if (m_sib_prev) {
412             unlinkSibling();
413         } else if (m_dep_prev) {
414             unlinkDependency();
415         } else {
416             m_roots.remove(this);
417             
418             /* stream is a root of tree.  Removing stream makes its
419                 descendants a root of its own subtree. */
420             
421             for (si = m_dep_next; si;) {
422                 next = si.m_sib_next;
423                 
424                 si.m_dep_prev = null;
425                 si.m_sib_prev = null;
426                 si.m_sib_next = null;
427                 
428                 /* We already distributed weight of $(D Stream) to this. */
429                 si.m_effective_weight = si.m_weight;
430                 
431                 si.m_roots.add(si);
432                 
433                 si = next;
434             }
435         }
436         
437         if (root_stream) {
438 			root_stream.updateSumNorestWeight();
439 			root_stream.updateEffectiveWeight();
440         }
441         
442         m_num_substreams = 1;
443         m_sum_dep_weight = 0;
444         
445         m_dep_prev = null;
446         m_dep_next = null;
447         m_sib_prev = null;
448         m_sib_next = null;
449         
450         --m_roots.num_streams;
451     }
452     
453     /*
454      * Makes the $(D Stream) depend on the |dep_stream|.  This dependency is
455      * exclusive.  Updates dpri members in this dependency tree.
456      */
457     void insertSubtree(Stream stream, Session session) {
458         Stream last_sib;
459         Stream dep_next;
460         Stream root_stream;
461         size_t delta_substreams;
462         
463         LOGF("stream: dep_insert_subtree dep_stream(%s=%d stream(%s)=%d", this, m_id, stream, stream.m_id);
464         
465 		delta_substreams = stream.m_num_substreams;
466         
467 		stream.updateSetRest();
468         
469         if (m_dep_next) {
470             /* m_num_substreams includes dep_stream itself */
471 			stream.m_num_substreams += m_num_substreams - 1;
472             
473 			stream.m_sum_dep_weight += m_sum_dep_weight;
474 			m_sum_dep_weight = stream.m_weight;
475             
476             dep_next = m_dep_next;
477             
478 			if (dep_next) dep_next.updateSetRest();
479             
480             linkDependency(stream);
481             
482 			if (stream.m_dep_next) {
483 				last_sib = stream.m_dep_next.lastSibling();
484                 
485 				last_sib.linkSibling(dep_next);
486                 
487                 dep_next.m_dep_prev = null;
488             } else {
489                 stream.linkDependency(dep_next);
490             }
491         } else {
492             linkDependency(stream);
493             
494             assert(m_sum_dep_weight == 0);
495 			m_sum_dep_weight = stream.m_weight;
496         }
497         
498         root_stream = updateLength(delta_substreams);
499         
500 		root_stream.updateSetTop();
501         
502 		root_stream.updateSumNorestWeight();
503 		root_stream.updateEffectiveWeight();
504         
505 		root_stream.updateQueueTop(session);
506     }
507     
508     
509     /*
510      * Makes the $(D Stream) depend on the |dep_stream|.  This dependency is
511      * not exclusive.  Updates dpri members in this dependency tree.
512      */
513     void addSubtree(Stream stream, Session session) 
514 	{
515         Stream root_stream;
516         
517         LOGF("stream: dep_add_subtree dep_stream(%s=%d stream(%s)=%d", this, m_id, stream, stream.m_id);
518         
519         stream.updateSetRest();
520         
521         if (m_dep_next) {
522             m_sum_dep_weight += stream.m_weight;
523             
524             insertLinkDependency(stream);
525         } else {
526             linkDependency(stream);
527             
528             assert(m_sum_dep_weight == 0);
529 			m_sum_dep_weight = stream.m_weight;
530         }
531         
532         root_stream = updateLength(stream.m_num_substreams);
533         
534 		root_stream.updateSetTop();
535         
536 		root_stream.updateSumNorestWeight();
537 		root_stream.updateEffectiveWeight();
538         
539 		root_stream.updateQueueTop(session);
540     }
541     
542     /*
543      * Removes subtree whose root stream is $(D Stream).  Removing subtree
544      * does not change dpri values.  The effective_weight of streams in
545      * removed subtree is not updated.
546      */
547     void removeSubtree() 
548 	{
549         Stream prev, next, dep_prev, root_stream;
550         
551         LOGF("stream: dep_remove_subtree stream(%s=%d", this, m_id);
552         
553         if (m_sib_prev) {
554             prev = m_sib_prev;
555             
556             prev.m_sib_next = m_sib_next;
557             if (prev.m_sib_next) {
558                 prev.m_sib_next.m_sib_prev = prev;
559             }
560             
561 			prev = prev.firstSibling();
562             
563             dep_prev = prev.m_dep_prev;
564             
565         } else if (m_dep_prev) {
566             dep_prev = m_dep_prev;
567             next = m_sib_next;
568             
569             dep_prev.m_dep_next = next;
570             
571             if (next) {
572                 next.m_dep_prev = dep_prev;
573                 
574                 next.m_sib_prev = null;
575             }
576             
577         } else {
578 			m_roots.remove(this);
579             
580             dep_prev = null;
581         }
582         
583         if (dep_prev) {
584             dep_prev.m_sum_dep_weight -= m_weight;
585             
586 			root_stream = dep_prev.updateLength(-m_num_substreams);
587             
588 			root_stream.updateSumNorestWeight();
589 			root_stream.updateEffectiveWeight();
590         }
591         
592         m_sib_prev = null;
593         m_sib_next = null;
594         m_dep_prev = null;
595     }
596     
597     /*
598      * Makes the $(D Stream) as root.  Updates dpri members in this
599      * dependency tree.
600      */
601     void makeRoot(Session session)
602 	{
603         LOGF("stream: dep_make_root stream(%s=%d", this, m_id);
604         
605 		m_roots.add(this);
606         
607         updateSetRest();
608         
609         m_effective_weight = m_weight;
610         
611         updateSetTop();
612         
613         updateSumNorestWeight();
614         updateEffectiveWeight();
615         
616 		updateQueueTop(session);
617     }
618     
619     /*
620      * Makes the $(D Stream) as root and all existing root streams become
621      * direct children of $(D Stream).
622      */
623 	void makeTopmostRoot(Session session)
624     {
625         Stream first, si;
626         
627         LOGF("stream: ALL YOUR STREAM ARE BELONG TO US stream(%s=%d", this, m_id);
628         
629         first = m_roots.head;
630         
631         /* stream must not be include in m_roots.head list */
632         assert(first !is this);
633         
634         if (first) {
635             Stream prev;
636             
637             prev = first;
638             
639             LOGF("stream: root stream(%s=%d", first, first.m_id);
640             
641             m_sum_dep_weight += first.m_weight;
642             m_num_substreams += first.m_num_substreams;
643             
644             for (si = first.m_root_next; si; si = si.m_root_next) {
645                 
646                 assert(si !is this);
647                 
648                 LOGF("stream: root stream(%s=%d", si, si.m_id);
649                 
650                 m_sum_dep_weight += si.m_weight;
651                 m_num_substreams += si.m_num_substreams;
652                 
653 				prev.linkSibling(si);
654                 
655                 prev = si;
656             }
657             
658             if (m_dep_next) {
659                 Stream sib_next;
660                 
661                 sib_next = m_dep_next;
662                 
663                 sib_next.m_dep_prev = null;
664                 
665 				first.linkSibling(sib_next);
666 				linkDependency(prev);
667             } else {
668 				linkDependency(first);
669             }
670         }
671         
672         m_roots.removeAll();
673         
674 		makeRoot(session);
675     }
676     
677     /*
678      * Returns true if $(D Stream) is in any dependency tree.
679      */
680     bool inDepTree() {
681         return m_dep_prev || m_dep_next || m_sib_prev ||
682                m_sib_next || m_root_next || m_root_prev ||
683                m_roots.head is this;
684     }
685 
686 private:
687 
688 
689 	bool updateInitialWindowSize(ref int window_size, int new_initial_window_size, int old_initial_window_size)
690 	{
691 		long new_window_size = ( cast(long)window_size ) + new_initial_window_size - old_initial_window_size;
692 
693 		if (int.min > new_window_size || new_window_size > MAX_WINDOW_SIZE)
694 			return false;
695 
696 		window_size = cast(int) new_window_size;
697 
698 		return true;
699 	}
700 
701 	void pushItem(Session session) 
702 	{
703 		OutboundItem item;
704 		
705 		assert(m_item);
706 		assert(m_item.queued == 0);
707 		
708 		item = m_item;
709 		
710 		/* If item is now sent, don't push it to the queue.  Otherwise, we may push same item twice. */
711 		if (session.aob.item is item)
712 			return;
713 		
714 		if (item.weight > m_effective_weight)
715 			item.weight = m_effective_weight;
716 		
717 		item.cycle = session.last_cycle;
718 		
719 		switch (item.frame.hd.type) {
720 			case FrameType.DATA:
721 				session.ob_da_pq.push(item);
722 				break;
723 			case FrameType.HEADERS:
724 				if (m_state == StreamState.RESERVED)
725 					session.ob_ss_pq.push(item);
726 				else
727 					session.ob_pq.push(item);
728 				break;
729 			default:
730 				/* should not reach here */
731 				assert(0);
732 		}
733 		
734 		item.queued = 1;
735 	}
736 
737 	int distributedTopEffectiveWeight(int weight) {
738 		if (m_sum_top_weight == 0)
739 			return m_effective_weight;
740 
741 		weight = m_effective_weight * weight / m_sum_top_weight;
742 
743 		return max(1, weight);
744 	}
745 	
746 	/* Updates effective_weight of descendant streams in subtree of $(D Stream).  We assume that m_effective_weight is already set right. */
747 	void updateEffectiveWeight()
748 	{
749 		Stream si;
750 		
751 		LOGF("stream: update_dep_effective_weight "
752 				"stream(%s=%d, weight=%d, sum_norest_weight=%d, "
753 				"sum_top_weight=%d",
754 				this, m_id, m_weight,
755 				m_sum_norest_weight, m_sum_top_weight);
756 		
757 		/* m_sum_norest_weight == 0 means there is no StreamDPRI.TOP under stream */
758 		if (m_dpri != StreamDPRI.NO_ITEM ||
759 			m_sum_norest_weight == 0) {
760 			return;
761 		}
762 		
763 		/* If there is no direct descendant whose dpri is StreamDPRI.TOP, indirect descendants have
764 		 * the chance to send data, so recursively set weight for descendants. */
765 		if (m_sum_top_weight == 0) {
766 			for (si = m_dep_next; si; si = si.m_sib_next) {
767 				if (si.m_dpri != StreamDPRI.REST) {
768 					si.m_effective_weight =
769 						distributedEffectiveWeight(si.m_weight);
770 				}
771 				
772 				si.updateEffectiveWeight();
773 			}
774 			return;
775 		}
776 		
777 		/* If there is at least one direct descendant whose dpri is
778 		   StreamDPRI.TOP, we won't give a chance to indirect
779 		   descendants, since closed or blocked stream's weight is
780 		   distributed among its siblings */
781 		for (si = m_dep_next; si; si = si.m_sib_next) {
782 			if (si.m_dpri == StreamDPRI.TOP) {
783 				si.m_effective_weight = distributedTopEffectiveWeight(si.m_weight);				
784 				LOGF("stream: stream=%d top eweight=%d", si.m_id, si.m_effective_weight);
785 				
786 				continue;
787 			}
788 			
789 			if (si.m_dpri == StreamDPRI.NO_ITEM) {
790 				LOGF("stream: stream=%d no_item, ignored", si.m_id);
791 				
792 				/* Since we marked StreamDPRI.TOP under si, we make them StreamDPRI.REST again. */
793 				if (si.m_dep_next) si.m_dep_next.updateSetRest();
794 			} else {
795 				LOGF("stream: stream=%d rest, ignored", si.m_id);
796 			}
797 		}
798 	}
799 	
800 	void updateSetRest() 
801 	{
802 		LOGF("stream: stream=%d is rest", m_id);
803 		
804 		if (m_dpri == StreamDPRI.REST)
805 			return;
806 		
807 		if (m_dpri == StreamDPRI.TOP) 
808 		{
809 			m_dpri = StreamDPRI.REST;
810 			
811 			if (m_sib_next)
812 				m_sib_next.updateSetRest();
813 			
814 			return;
815 		}
816 
817 		if (m_sib_next)
818 			m_sib_next.updateSetRest();
819 		if (m_dep_next)
820 			m_dep_next.updateSetRest();
821 	}
822 	
823 	/*
824 	 * Performs dfs starting $(D Stream), search stream which can become
825 	 * StreamDPRI.TOP and set its dpri.
826 	 */
827 	void updateSetTop() 
828 	{
829 		Stream si;
830 		
831 		if (m_dpri == StreamDPRI.TOP)
832 			return;
833 		
834 		if (m_dpri == StreamDPRI.REST) 
835 		{
836 			LOGF("stream: stream=%d item is top", m_id);
837 			
838 			m_dpri = StreamDPRI.TOP;
839 			
840 			return;
841 		}
842 		
843 		for (si = m_dep_next; si; si = si.m_sib_next)
844 			si.updateSetTop();
845 
846 	}
847 	
848 	/*
849 	 * Performs dfs starting $(D Stream), and queue stream whose dpri is
850 	 * StreamDPRI.TOP and has not been queued yet.
851 	 */
852 	void updateQueueTop(Session session)
853 	{
854 		Stream si;
855 		
856 		if (m_dpri == StreamDPRI.REST) 
857 			return;
858 		
859 		if (m_dpri == StreamDPRI.TOP) {
860 			if (!m_item.queued) {
861 				LOGF("stream: stream=%d enqueue", m_id);
862 				pushItem(session);
863 			}
864 			
865 			return;
866 		}
867 		
868 		for (si = m_dep_next; si; si = si.m_sib_next)
869 			si.updateQueueTop(session);
870 		
871 
872 	}
873 	
874 	/*
875 	 * Updates m_sum_norest_weight and m_sum_top_weight
876 	 * recursively.  We have to gather effective sum of weight of
877 	 * descendants.  If m_dpri == StreamDPRI.NO_ITEM, we
878 	 * have to go deeper and check that any of its descendants has dpri
879 	 * value of StreamDPRI.TOP.  If so, we have to add weight of
880 	 * its direct descendants to m_sum_norest_weight.  To make this
881 	 * work, this function returns true if any of its descendants has dpri
882 	 * value of StreamDPRI.TOP, otherwise false.
883 	 *
884 	 * Calculating m_sum_top-weight is very simple compared to
885 	 * m_sum_norest_weight.  It just adds up the weight of direct
886 	 * descendants whose dpri is StreamDPRI.TOP.
887 	 */
888 	bool updateSumNorestWeight() 
889 	{
890 		Stream si;
891 		bool ret;
892 		
893 		m_sum_norest_weight = 0;
894 		m_sum_top_weight = 0;
895 		
896 		if (m_dpri == StreamDPRI.TOP) 
897 			return true;
898 		
899 		if (m_dpri == StreamDPRI.REST)
900 			return false;
901 		
902 		ret = false;
903 		
904 		for (si = m_dep_next; si; si = si.m_sib_next) {
905 			
906 			if (si.updateSumNorestWeight()) {
907 				ret = true;
908 				m_sum_norest_weight += si.m_weight;
909 			}
910 			
911 			if (si.m_dpri == StreamDPRI.TOP)
912 				m_sum_top_weight += si.m_weight;
913 		}
914 		
915 		return ret;
916 	}
917 	
918 	void updateOnAttachItem(Session session) 
919 	{
920 		Stream root_stream;
921 		
922 		m_dpri = StreamDPRI.REST;
923 		
924 		if (m_dep_next) m_dep_next.updateSetRest();
925 		
926 		root_stream = getRoot();
927 		
928 		LOGF("root=%s, stream=%s", root_stream, this);
929 		
930 		root_stream.updateSetTop();
931 		
932 		root_stream.updateSumNorestWeight();
933 		root_stream.updateEffectiveWeight();
934 		
935 		root_stream.updateQueueTop(session);
936 	}
937 	
938 	void updateDepOnDetachItem(Session session) {
939 		Stream root_stream;
940 		
941 		m_dpri = StreamDPRI.NO_ITEM;
942 
943 		root_stream = getRoot();
944 		
945 		root_stream.updateSetTop();
946 
947 		root_stream.updateSumNorestWeight();
948 		root_stream.updateEffectiveWeight();
949 		
950 		root_stream.updateQueueTop(session);
951 	}
952 
953 	void linkDependency(Stream stream) {
954 		m_dep_next = stream;
955 		stream.m_dep_prev = this;
956 	}
957 	
958 	void linkSibling(Stream stream) 
959 	{
960 		m_sib_next = stream;
961 		stream.m_sib_prev = this;
962 	}
963 	
964 	void insertLinkDependency(Stream stream)
965 	{
966 		Stream sib_next;
967 		
968 		assert(!stream.m_sib_prev);
969 		
970 		sib_next = m_dep_next;
971 		
972 		stream.linkSibling(sib_next);
973 		
974 		sib_next.m_dep_prev = null;
975 		
976 		linkDependency(stream);
977 	}
978 
979 	Stream firstSibling() 
980 	{
981 		Stream stream = this;
982 		for (; stream.m_sib_prev; stream = stream.m_sib_prev)
983 			continue;
984 		
985 		return stream;
986 	}
987 	
988 	Stream lastSibling()
989 	{
990 		Stream stream = this;
991 		for (; stream.m_sib_next; stream = stream.m_sib_next)
992 			continue;
993 
994 		return stream;
995 	}
996 	
997 	Stream updateLength(size_t delta) 
998 	{
999 		m_num_substreams += delta;
1000 
1001 		Stream stream = firstSibling();
1002 		
1003 		if (stream.m_dep_prev)
1004 			return stream.m_dep_prev.updateLength(delta);
1005 
1006 		return stream;
1007 	}
1008 
1009 	void unlinkSibling() {
1010 		Stream prev, next, dep_next;
1011 		
1012 		prev = m_sib_prev;
1013 		dep_next = m_dep_next;
1014 		
1015 		assert(prev);
1016 		
1017 		if (dep_next) {
1018 			/*
1019              *  prev--stream(--sib_next--...)
1020              *         |
1021              *        dep_next
1022              */
1023 			dep_next.m_dep_prev = null;
1024 			
1025 			prev.linkSibling(dep_next);
1026 			
1027 			if (m_sib_next) {
1028 				dep_next.lastSibling().linkSibling(m_sib_next);
1029 			}
1030 		} else {
1031 			/*
1032              *  prev--stream(--sib_next--...)
1033              */
1034 			next = m_sib_next;
1035 			
1036 			prev.m_sib_next = next;
1037 			
1038 			if (next) {
1039 				next.m_sib_prev = prev;
1040 			}
1041 		}
1042 	}
1043 	
1044 	void unlinkDependency() {
1045 		Stream prev, next, dep_next;
1046 		
1047 		prev = m_dep_prev;
1048 		dep_next = m_dep_next;
1049 		
1050 		assert(prev);
1051 		
1052 		if (dep_next) {
1053 			/*
1054              * prev
1055              *   |
1056              * stream(--sib_next--...)
1057              *   |
1058              * dep_next
1059              */
1060 			prev.linkDependency(dep_next);
1061 			
1062 			if (m_sib_next) {
1063 				dep_next.lastSibling().linkSibling(m_sib_next);
1064 			}
1065 		} else if (m_sib_next) {
1066 			/*
1067              * prev
1068              *   |
1069              * stream--sib_next
1070              */
1071 			next = m_sib_next;
1072 			
1073 			next.m_sib_prev = null;
1074 			
1075 			prev.linkDependency(next);
1076 		} else {
1077 			prev.m_dep_next = null;
1078 		}
1079 	}
1080 
1081 package:
1082 	/*
1083 	 * This function is called when trailer header (for both request and
1084 	 * response) is received.  This function performs validation and
1085 	 * returns true if it succeeds, or false.
1086 	 */
1087 	bool validateTrailerHeaders(in Frame frame) const
1088 	{
1089 		if ((frame.hd.flags & FrameFlags.END_STREAM) == 0)
1090 			return false;
1091 		
1092 		return true;
1093 	}
1094 	
1095 	/*
1096 	 * This function is called when END_STREAM flag is seen in incoming
1097 	 * frame.  This function performs validation and returns true if it
1098 	 * succeeds, or false.
1099 	 */
1100 	bool validateRemoteEndStream() const
1101 	{
1102 		if (m_http_flags & HTTPFlags.EXPECT_FINAL_RESPONSE) 
1103 			return false;
1104 		
1105 		if (m_content_length != -1 && m_content_length != m_recv_content_length)
1106 			return false;
1107 
1108 		return true;
1109 	}
1110 	
1111 	/*
1112 	 * This function is called when chunk of data is received.  This
1113 	 * function also performs validation and returns true if it succeeds, or false.
1114 	 */
1115 	bool onDataChunk(size_t n)
1116 	{
1117 		m_recv_content_length += n;
1118 
1119 		if ((m_http_flags & HTTPFlags.EXPECT_FINAL_RESPONSE) ||
1120 			(m_content_length != -1 && m_recv_content_length > m_content_length))
1121 		{
1122 			return false;
1123 		}
1124 		
1125 		return true;
1126 	}
1127 	
1128 	/*
1129 	 * This function inspects header field in |frame| and records its
1130 	 * method in stream.http_flags.  If frame.hd.type is neither
1131 	 * FrameType.HEADERS nor FrameType.PUSH_PROMISE, this function does
1132 	 * nothing.
1133 	 */
1134 	void setRequestMethod(Frame frame)
1135 	{
1136 		HeaderField[] hfa;
1137 		size_t i;
1138 		
1139 		with(FrameType) switch (frame.hd.type) {
1140 			case HEADERS:
1141 				hfa = frame.headers.hfa;
1142 				break;
1143 			case PUSH_PROMISE:
1144 				hfa = frame.push_promise.hfa;
1145 				break;
1146 			default:
1147 				return;
1148 		}
1149 		
1150 		/* TODO we should do this strictly. */
1151 		foreach(ref hf; hfa) {
1152 			import libhttp2.helpers : parseToken;
1153 			if (parseToken(hf.name) != Token._METHOD) {
1154 				continue;
1155 			}
1156 			if (hf.value == "CONNECT") {
1157 				m_http_flags |= HTTPFlags.METH_CONNECT;
1158 				return;
1159 			}
1160 			if (hf.value == "HEAD") {
1161 				m_http_flags |= HTTPFlags.METH_HEAD;
1162 				return;
1163 			}
1164 			return;
1165 		}
1166 	}
1167 
1168 	/*
1169 	 * This function is called when request header is received.  
1170 	 * This function performs validation and returns true if it succeeds, or false.
1171  	 */
1172 	bool onRequestHeaders(Frame frame) 
1173 	{
1174 		if (m_http_flags & HTTPFlags.METH_CONNECT) 
1175 		{
1176 			if ((m_http_flags & HTTPFlags._AUTHORITY) == 0) 
1177 				return false;
1178 			
1179 			m_content_length = -1;
1180 
1181 		} else {
1182 			if ((m_http_flags & HTTPFlags.REQ_HEADERS) != HTTPFlags.REQ_HEADERS ||
1183 				(m_http_flags & (HTTPFlags._AUTHORITY | HTTPFlags.HOST)) == 0) 
1184 			{
1185 				return false;
1186 			}
1187 			if (!checkPath())
1188 				return false;
1189 		}
1190 		
1191 		if (frame.hd.type == FrameType.PUSH_PROMISE) {
1192 			/* we are going to reuse data fields for upcoming response. Clear them now, except for method flags. */
1193 			m_http_flags &= HTTPFlags.METH_ALL;
1194 			m_content_length = -1;
1195 		}
1196 		
1197 		return true;
1198 	}
1199 
1200 	/*
1201  	 * This function is called when response header is received.  This
1202  	 * function performs validation and returns true if it succeeds, or false.
1203  	 */
1204 	bool onResponseHeaders() {
1205 		if ((m_http_flags & HTTPFlags._STATUS) == 0)
1206 			return false;
1207 		
1208 		if (m_status_code / 100 == 1)
1209 		{
1210 			/* non-final response */
1211 			m_http_flags = (m_http_flags & HTTPFlags.METH_ALL) | HTTPFlags.EXPECT_FINAL_RESPONSE;
1212 			m_content_length = -1;
1213 			m_status_code = -1;
1214 			return true;
1215 		}
1216 		
1217 		m_http_flags &= ~HTTPFlags.EXPECT_FINAL_RESPONSE;
1218 		bool has_response_body = (m_http_flags & HTTPFlags.METH_HEAD) == 0 && m_status_code / 100 != 1 && m_status_code != 304 && m_status_code != 204;
1219 		if (!has_response_body)
1220 			m_content_length = 0;
1221 		else if (m_http_flags & HTTPFlags.METH_CONNECT)
1222 			m_content_length = -1;
1223 		return true;
1224 	}
1225 
1226 	/* For "http" or "https" URIs, OPTIONS request may have "*" in :path
1227 	   header field to represent system-wide OPTIONS request.  Otherwise,
1228 	   :path header field value must start with "/".  This function must
1229 	   be called after ":method" header field was received.  This function
1230 	   returns nonzero if path is valid.*/
1231 	bool checkPath() {
1232 		return (httpFlags & HTTPFlags.SCHEME_HTTP) == 0 ||
1233 				((httpFlags & HTTPFlags.PATH_REGULAR)   ||
1234 				((httpFlags & HTTPFlags.METH_OPTIONS)   &&
1235 				(httpFlags & HTTPFlags.PATH_ASTERISK)));
1236 	}
1237 
1238 private:
1239 
1240 
1241     /// Stream ID
1242     int m_id;
1243 
1244     /// Pointers to form dependency tree.  If multiple streams depend on a stream, only one stream (left most) has non-null dep_prev 
1245     /// which points to the stream it depends on. The remaining streams are linked using sib_prev and sib_next.  
1246     /// The stream which has non-null dep_prev always null sib_prev.  The right most stream has null sib_next.  If this stream is
1247     /// a root of dependency tree, dep_prev and sib_prev are null.
1248     Stream m_dep_prev, m_dep_next;
1249     Stream m_sib_prev, m_sib_next;
1250 
1251     /// pointers to track dependency tree root streams.  This is doubly-linked list and first element is pointed by roots.head.
1252     Stream m_root_prev, m_root_next;
1253     /* When stream is kept after closure, it may be kept in doubly
1254      linked list pointed by Session.closed_stream_head.
1255      closed_next points to the next stream object if it is the element
1256      of the list. */
1257     Stream m_closed_prev, m_closed_next;
1258 
1259     /// pointer to roots, which tracks dependency tree roots
1260     StreamRoots m_roots;
1261 
1262     /// The arbitrary data provided by user for this stream.
1263     void *m_stream_user_data;
1264 
1265     /// Item to send
1266     OutboundItem m_item;
1267 
1268     /// categorized priority of this stream.  Only stream bearing $(D TOP) can send item.
1269 	StreamDPRI m_dpri = StreamDPRI.NO_ITEM;
1270 
1271     /// the number of streams in subtree 
1272 	int m_num_substreams = 1;
1273 
1274     /// Current remote window size. This value is computed against the current initial window size of remote endpoint. 
1275     int m_remote_window_size;
1276 
1277     /// Keep track of the number of bytes received without WINDOW_UPDATE.
1278     /// This could be negative after submitting negative value to WINDOW_UPDATE
1279     int m_recv_window_size;
1280 
1281     /// The number of bytes consumed by the application and now is subject to WINDOW_UPDATE.  
1282     /// This is only used when auto WINDOW_UPDATE is turned off.
1283     int m_consumed_size;
1284 
1285     /// The amount of recv_window_size cut using submitting negative value to WINDOW_UPDATE
1286     int m_recv_reduction;
1287 
1288     /// window size for local flow control. It is initially set to INITIAL_WINDOW_SIZE and could be increased/decreased by
1289     /// submitting WINDOW_UPDATE. See submit_window_update().
1290     int m_local_window_size;
1291 
1292     /// weight of this stream 
1293     int m_weight;
1294 
1295     /// effective weight of this stream in belonging dependency tree
1296     int m_effective_weight;
1297 
1298     /// sum of weight (not effective_weight) of direct descendants
1299     int m_sum_dep_weight;
1300 
1301     /// sum of weight of direct descendants which have at least one descendant with dpri == $(D StreamDPRI.TOP).  We use this value to calculate effective weight.
1302     int m_sum_norest_weight;
1303 
1304     /// sum of weight of direct descendants whose dpri value is $(D StreamDPRI.TOP)
1305     int m_sum_top_weight;
1306 
1307     StreamState m_state;
1308 
1309     /// This is bitwise-OR of 0 or more of StreamFlags.
1310     StreamFlags m_flags;
1311 
1312     /// Bitwise OR of zero or more ShutdownFlag values
1313 	ShutdownFlag m_shut_flags = ShutdownFlag.NONE;
1314 
1315     /// Content-Length of request/response body. -1 if unknown.
1316 	long m_content_length = -1;
1317 
1318     /// Received body so far 
1319     long m_recv_content_length;
1320 
1321     /// status code from remote server
1322 	short m_status_code = -1;
1323 
1324     /// Bitwise OR of zero or more HTTPFlags values 
1325 	HTTPFlags m_http_flags = HTTPFlags.NONE;
1326 
1327 package: // used by Session
1328 	@property int id() { return m_id; }
1329 	@property StreamDPRI dpri() { return m_dpri; }
1330 	@property StreamState state() { return m_state; }
1331 	@property void state(StreamState state) { m_state = state; }
1332 	@property OutboundItem item() { return m_item; }
1333 	@property int effectiveWeight() { return m_effective_weight; }
1334 	@property int remoteWindowSize() { return m_remote_window_size; }
1335 	@property void remoteWindowSize(int rws) { m_remote_window_size = rws; }
1336 	@property ref int localWindowSize() { return m_local_window_size; }
1337 	@property ref int recvWindowSize() { return m_recv_window_size; }
1338 	@property ref int recvReduction() { return m_recv_reduction; }
1339 	@property void recvWindowSize(int sz) { m_recv_window_size = sz; }
1340 	@property ref int consumedSize() { return m_consumed_size; }
1341 	@property void consumedSize(int sz) { m_consumed_size = sz; }
1342 	@property void* userData() { return m_stream_user_data; }
1343 	@property void userData(void* ptr) { m_stream_user_data = ptr; }
1344 	@property ShutdownFlag shutFlags() { return m_shut_flags; }
1345 	@property void shutFlags(ShutdownFlag sf) { m_shut_flags = sf; }
1346 	@property HTTPFlags httpFlags() { return m_http_flags; }
1347 	@property void httpFlags(HTTPFlags flags) { m_http_flags = flags; }
1348 	@property StreamFlags flags() { return m_flags; }
1349 	@property void flags(StreamFlags f) { m_flags = f; }
1350 	@property void weight(int w) { m_weight = w; }
1351 	@property int weight() { return m_weight; }
1352 	@property Stream closedPrev() { return m_closed_prev; }
1353 	@property Stream closedNext() { return m_closed_next; } 
1354 	@property void closedPrev(Stream s) { m_closed_prev = s; }
1355 	@property void closedNext(Stream s) { m_closed_next = s; } 
1356 	@property long contentLength() { return m_content_length; }
1357 	@property short statusCode() { return m_status_code; }
1358 	@property void statusCode(short status) { m_status_code = status; }
1359 	@property void contentLength(long len) { m_content_length = len; }
1360 	// tests
1361 	@property int subStreams() { return m_num_substreams; }
1362 	@property int sumDepWeight() { return m_sum_dep_weight; }
1363 	@property int sumNorestWeight() { return m_sum_norest_weight; }
1364 	@property Stream rootNext() { return m_root_next; } 
1365 	@property Stream depPrev() { return m_dep_prev; } 
1366 	@property Stream depNext() { return m_dep_next; } 
1367 	@property Stream sibPrev() { return m_sib_prev; }
1368 	@property Stream sibNext() { return m_sib_next; } 
1369 
1370 }