1 /**
2  * Stream
3  * 
4  * Copyright:
5  * (C) 2012-2015 Tatsuhiro Tsujikawa
6  * (C) 2014-2015 Etienne Cimon
7  *
8  * License: 
9  * Distributed under the terms of the MIT license with an additional section 1.2 of the curl/libcurl project. 
10  * Consult the provided LICENSE.md file for details
11  */
12 module libhttp2.stream;
13 
14 import libhttp2.constants;
15 import libhttp2.types;
16 import libhttp2.frame;
17 import libhttp2.session;
18 import std.algorithm : max;
19 
20 const MAX_DEP_TREE_LENGTH = 100;
21 
22 align(8)
23 final class StreamRoots
24 {
25 	void free() { }
26 
27     void add(Stream stream) {
28         if (head) {
29             stream.m_root_next = head;
30             head.m_root_prev = stream;
31         }
32         
33         head = stream;
34     }
35     
36 	void remove(Stream stream) 
37     {
38         Stream root_prev, root_next;
39         
40         root_prev = stream.m_root_prev;
41         root_next = stream.m_root_next;
42         
43         if (root_prev) {
44             root_prev.m_root_next = root_next;
45             
46             if (root_next) {
47                 root_next.m_root_prev = root_prev;
48             }
49         } else {
50             if (root_next) {
51                 root_next.m_root_prev = null;
52             }
53             
54             head = root_next;
55         }
56         
57         stream.m_root_prev = null;
58         stream.m_root_next = null;
59     }
60     
61     void removeAll() {
62         Stream si, next;
63         
64         for (si = head; si;) {
65             next = si.m_root_next;
66             
67             si.m_root_prev = null;
68             si.m_root_next = null;
69             
70             si = next;
71         }
72         
73         head = null;
74     }
75 
76     Stream head;
77     int num_streams;
78 }
79 
80 align(8)
81 final class Stream {
82 
83 	this(int stream_id,
84 		StreamFlags flags,
85 		StreamState initial_state,
86 		int weight,
87 		StreamRoots roots,
88 		int remote_initial_window_size,
89 		int local_initial_window_size,
90 		void *stream_user_data)
91 	{
92 		initialize(stream_id, flags, initial_state, weight, roots, remote_initial_window_size, local_initial_window_size, stream_user_data);
93 	}
94 
95 	void free() { userData = null; } // We don't free stream.item. It is deleted in ActiveOutboundItem.reset(), Sessioin.free() or PriorityQueue
96 
97     package void initialize(int stream_id,
98 							StreamFlags flags,
99 							StreamState initial_state,
100 							int weight,
101 							StreamRoots roots,
102 							int remote_initial_window_size,
103 					        int local_initial_window_size,
104 					        void *stream_user_data) 
105 	{
106         m_id = stream_id;
107         m_flags = flags;
108         m_state = initial_state;
109         m_weight = weight;
110         m_effective_weight = m_weight;
111 		m_roots = roots;
112 		m_remote_window_size = remote_initial_window_size;
113 		m_local_window_size = local_initial_window_size;
114 		m_stream_user_data = stream_user_data;
115     }
116     
117     /*
118      * Disallow either further receptions or transmissions, or both.
119      * |flag| is bitwise OR of one or more of ShutdownFlag.
120      */
121     void shutdown(ShutdownFlag flag)
122 	{
123         m_shut_flags |= flag;
124     }
125 
126     /*
127      * Computes distributed weight of a stream of the |weight| under the
128      * $(D Stream) if $(D Stream) is removed from a dependency tree.  The result
129      * is computed using m_weight rather than m_effective_weight.
130      */
131     int distributedWeight(int weight) 
132 	{
133         weight = m_weight * weight / m_sum_dep_weight;
134         
135         return max(1, weight);
136     }
137     
138     /*
139 	 * Computes effective weight of a stream of the |weight| under the
140 	 * $(D Stream).  The result is computed using m_effective_weight
141 	 * rather than m_weight.  This function is used to determine
142 	 * weight in dependency tree.
143 	 */
144     int distributedEffectiveWeight(int weight) {
145         if (m_sum_norest_weight == 0)
146             return m_effective_weight;        
147         weight = m_effective_weight * weight / m_sum_norest_weight;
148         
149         return max(1, weight);
150     }
151     
152 
153     
154     /*
155 	 * Attaches |item| to $(D Stream).  Updates dpri members in this
156 	 * dependency tree.
157 	 */
158     void attachItem(OutboundItem item, Session session) 
159 	{
160         assert((m_flags & StreamFlags.DEFERRED_ALL) == 0);
161         assert(!m_item);
162         
163         LOGF("stream: stream=%d attach item=%s", m_id, item);
164         
165         m_item = item;
166         
167 		updateOnAttachItem(session);
168     }
169     
170     /*
171 	 * Detaches |m_item|.  Updates dpri members in this dependency
172 	 * tree.  This function does not free |m_item|.  The caller must
173 	 * free it.
174 	 */
175     void detachItem(Session session) 
176 	{
177         LOGF("stream: stream=%d detach item=%s", m_id, m_item);
178         
179         m_item = null;
180         m_flags &= ~cast(int)StreamFlags.DEFERRED_ALL;
181         
182 		updateDepOnDetachItem(session);
183     }
184     
185     /*
186 	 * Defer |m_item|.  We won't call this function in the situation
187 	 * where |m_item| is null.  The |flags| is bitwise OR of zero or
188 	 * more of StreamFlags.DEFERRED_USER and
189 	 * StreamFlags.DEFERRED_FLOW_CONTROL.  The |flags| indicates
190 	 * the reason of this action.
191 	 */
192     void deferItem(StreamFlags flags, Session session) 
193 	{
194         assert(m_item);
195         
196         LOGF("stream: stream=%d defer item=%s cause=%02x", m_id, m_item, flags);
197         
198         m_flags |= flags;
199         
200 		updateDepOnDetachItem(session);
201     }
202     
203     /*
204 	 * Put back deferred data in this stream to active state.  The |flags|
205 	 * are one or more of bitwise OR of the following values:
206 	 * StreamFlags.DEFERRED_USER and
207 	 * StreamFlags.DEFERRED_FLOW_CONTROL and given masks are
208 	 * cleared if they are set.  So even if this function is called, if
209 	 * one of flag is still set, data does not become active.
210 	 */
211     void resumeDeferredItem(StreamFlags flag, Session session)
212 	{
213         assert(m_item);
214         
215         LOGF("stream: stream=%d resume item=%s flags=%02x", m_id, m_item, flags);
216         
217         m_flags &= ~cast(int)flags;
218         
219         if (m_flags & StreamFlags.DEFERRED_ALL) {
220             return;
221         }
222         
223 		updateOnAttachItem(session);
224     }
225     
226     /*
227 	 * Returns nonzero if item is deferred by whatever reason.
228 	 */
229     bool isItemDeferred() 
230 	{
231         return m_item && (m_flags & StreamFlags.DEFERRED_ALL);
232     }
233     
234     /*
235 	 * Returns nonzero if item is deferred by flow control.
236 	 */
237     bool isDeferredByFlowControl() 
238 	{
239         return m_item && (m_flags & StreamFlags.DEFERRED_FLOW_CONTROL);
240     }
241 
242     
243     /*
244 	 * Updates the remote window size with the new value
245 	 * |new_initial_window_size|. The |old_initial_window_size| is used to
246 	 * calculate the current window size.
247 	 *
248 	 * This function returns true if it succeeds or false. The failure is due to
249 	 * overflow.
250 	 */
251     bool updateRemoteInitialWindowSize(int new_initial_window_size, int old_initial_window_size)
252 	{
253 		return updateInitialWindowSize(m_remote_window_size, new_initial_window_size, old_initial_window_size);
254     }
255     
256     /*
257 	 * Updates the local window size with the new value
258 	 * |new_initial_window_size|. The |old_initial_window_size| is used to
259 	 * calculate the current window size.
260 	 *
261 	 * This function returns true if it succeeds or false. The failure is due to
262 	 * overflow.
263 	 */
264     bool updateLocalInitialWindowSize(int new_initial_window_size, int old_initial_window_size) 
265 	{
266 		return updateInitialWindowSize(m_local_window_size, new_initial_window_size, old_initial_window_size);
267     }
268     
269     /*
270      * Call this function if promised stream $(D Stream) is replied with
271      * HEADERS.  This function changes the state of the $(D Stream) to
272      * OPENED.
273      */
274     void promiseFulfilled() {
275 		m_state = StreamState.OPENED;
276         m_flags &= ~cast(int)StreamFlags.PUSH;
277     }
278     
279     /*
280      * Returns the stream positioned in root of the dependency tree the
281      * $(D Stream) belongs to.
282      */
283     Stream getRoot() {
284 		Stream stream = this;
285         for (;;) {
286             if (stream.m_sib_prev) {
287                 stream = stream.m_sib_prev;
288                 
289                 continue;
290             }
291             
292             if (stream.m_dep_prev) {
293                 stream = stream.m_dep_prev;
294                 
295                 continue;
296             }
297             
298             break;
299         }
300         
301         return stream;
302     }
303     
304     /*
305      * Returns true if |target| is found in subtree of $(D Stream).
306      */
307     bool subtreeContains(Stream target) {
308         
309         if (this is target)
310             return true;
311         
312 		if (m_sib_next && m_sib_next.subtreeContains(target))
313             return true;
314         
315 		return m_dep_next?m_dep_next.subtreeContains(target):false;
316     }
317     
318     /*
319      * Makes the $(D Stream) depend on the |dep_stream|.  This dependency is
320      * exclusive.  All existing direct descendants of |dep_stream| become
321      * the descendants of the $(D Stream).  This function assumes
322      * |m_data| is null and no dpri members are changed in this
323      * dependency tree.
324      */
325     void insert(Stream stream) {
326         Stream si;
327         Stream root_stream;
328         
329         assert(!m_item);
330         
331 		LOGF("stream: dep_insert dep_stream(%s)=%d, stream(%s)=%d", this, m_id, stream, stream.m_id);
332         
333 		stream.m_sum_dep_weight = m_sum_dep_weight;
334 		m_sum_dep_weight = stream.m_weight;
335         
336 		if (m_dep_next) {
337 			for (si = m_dep_next; si; si = si.m_sib_next) {
338                 stream.m_num_substreams += si.m_num_substreams;
339             }
340             
341 			stream.m_dep_next = m_dep_next;
342             stream.m_dep_next.m_dep_prev = stream;
343         }
344         
345 		m_dep_next = stream;
346         stream.m_dep_prev = this;
347         
348 		root_stream = updateLength(1);
349         
350 		root_stream.updateSumNorestWeight();
351 		root_stream.updateEffectiveWeight();
352         
353         ++stream.m_roots.num_streams;
354     }
355      
356     /*
357      * Makes the $(D Stream) depend on the |dep_stream|.  This dependency is
358      * not exclusive.  This function assumes |m_data| is null and no
359      * dpri members are changed in this dependency tree.
360      */
361     void add(Stream stream) {
362         Stream root_stream;
363         
364         assert(!stream.m_item);
365         
366         LOGF("stream: dep_add dep_stream(%s=%d, stream(%s)=%d", this, m_id, stream, stream.m_id);
367         
368         root_stream = updateLength(1);
369         
370 		m_sum_dep_weight += stream.m_weight;
371         
372         if (!m_dep_next) {
373             linkDependency(stream);
374         } else {
375             insertLinkDependency(stream);
376         }
377         
378 		root_stream.updateSumNorestWeight();
379 		root_stream.updateEffectiveWeight();
380         
381         ++stream.m_roots.num_streams;
382     }
383     
384     /*
385      * Removes the $(D Stream) from the current dependency tree.  This
386      * function assumes |m_data| is null.
387      */
388 	void remove() {
389         Stream prev, next, dep_prev, si, root_stream;
390         int sum_dep_weight_delta;
391         
392         LOGF("stream: dep_remove stream(%s=%d", this, m_id);
393         
394         /* Distribute weight of $(D Stream) to direct descendants */
395         sum_dep_weight_delta = -m_weight;
396         
397         for (si = m_dep_next; si; si = si.m_sib_next) {
398             si.m_weight = distributedWeight(si.m_weight);
399             
400             sum_dep_weight_delta += si.m_weight;
401         }
402         
403         prev = firstSibling();
404         
405         dep_prev = prev.m_dep_prev;
406         
407         if (dep_prev) {
408 			root_stream = dep_prev.updateLength(-1);
409             
410             dep_prev.m_sum_dep_weight += sum_dep_weight_delta;
411         }
412         
413         if (m_sib_prev) {
414             unlinkSibling();
415         } else if (m_dep_prev) {
416             unlinkDependency();
417         } else {
418             m_roots.remove(this);
419             
420             /* stream is a root of tree.  Removing stream makes its
421                 descendants a root of its own subtree. */
422             
423             for (si = m_dep_next; si;) {
424                 next = si.m_sib_next;
425                 
426                 si.m_dep_prev = null;
427                 si.m_sib_prev = null;
428                 si.m_sib_next = null;
429                 
430                 /* We already distributed weight of $(D Stream) to this. */
431                 si.m_effective_weight = si.m_weight;
432                 
433                 si.m_roots.add(si);
434                 
435                 si = next;
436             }
437         }
438         
439         if (root_stream) {
440 			root_stream.updateSumNorestWeight();
441 			root_stream.updateEffectiveWeight();
442         }
443         
444         m_num_substreams = 1;
445         m_sum_dep_weight = 0;
446         
447         m_dep_prev = null;
448         m_dep_next = null;
449         m_sib_prev = null;
450         m_sib_next = null;
451         
452         --m_roots.num_streams;
453     }
454     
455     /*
456      * Makes the $(D Stream) depend on the |dep_stream|.  This dependency is
457      * exclusive.  Updates dpri members in this dependency tree.
458      */
459     void insertSubtree(Stream stream, Session session) {
460         Stream last_sib;
461         Stream dep_next;
462         Stream root_stream;
463         size_t delta_substreams;
464         
465         LOGF("stream: dep_insert_subtree dep_stream(%s=%d stream(%s)=%d", this, m_id, stream, stream.m_id);
466         
467 		delta_substreams = stream.m_num_substreams;
468         
469 		stream.updateSetRest();
470         
471         if (m_dep_next) {
472             /* m_num_substreams includes dep_stream itself */
473 			stream.m_num_substreams += m_num_substreams - 1;
474             
475 			stream.m_sum_dep_weight += m_sum_dep_weight;
476 			m_sum_dep_weight = stream.m_weight;
477             
478             dep_next = m_dep_next;
479             
480 			if (dep_next) dep_next.updateSetRest();
481             
482             linkDependency(stream);
483             
484 			if (stream.m_dep_next) {
485 				last_sib = stream.m_dep_next.lastSibling();
486                 
487 				last_sib.linkSibling(dep_next);
488                 
489                 dep_next.m_dep_prev = null;
490             } else {
491                 stream.linkDependency(dep_next);
492             }
493         } else {
494             linkDependency(stream);
495             
496             assert(m_sum_dep_weight == 0);
497 			m_sum_dep_weight = stream.m_weight;
498         }
499         
500         root_stream = updateLength(delta_substreams);
501         
502 		root_stream.updateSetTop();
503         
504 		root_stream.updateSumNorestWeight();
505 		root_stream.updateEffectiveWeight();
506         
507 		root_stream.updateQueueTop(session);
508     }
509     
510     
511     /*
512      * Makes the $(D Stream) depend on the |dep_stream|.  This dependency is
513      * not exclusive.  Updates dpri members in this dependency tree.
514      */
515     void addSubtree(Stream stream, Session session) 
516 	{
517         Stream root_stream;
518         
519         LOGF("stream: dep_add_subtree dep_stream(%s=%d stream(%s)=%d", this, m_id, stream, stream.m_id);
520         
521         stream.updateSetRest();
522         
523         if (m_dep_next) {
524             m_sum_dep_weight += stream.m_weight;
525             
526             insertLinkDependency(stream);
527         } else {
528             linkDependency(stream);
529             
530             assert(m_sum_dep_weight == 0);
531 			m_sum_dep_weight = stream.m_weight;
532         }
533         
534         root_stream = updateLength(stream.m_num_substreams);
535         
536 		root_stream.updateSetTop();
537         
538 		root_stream.updateSumNorestWeight();
539 		root_stream.updateEffectiveWeight();
540         
541 		root_stream.updateQueueTop(session);
542     }
543     
544     /*
545      * Removes subtree whose root stream is $(D Stream).  Removing subtree
546      * does not change dpri values.  The effective_weight of streams in
547      * removed subtree is not updated.
548      */
549     void removeSubtree() 
550 	{
551         Stream prev, next, dep_prev, root_stream;
552         
553         LOGF("stream: dep_remove_subtree stream(%s=%d", this, m_id);
554         
555         if (m_sib_prev) {
556             prev = m_sib_prev;
557             
558             prev.m_sib_next = m_sib_next;
559             if (prev.m_sib_next) {
560                 prev.m_sib_next.m_sib_prev = prev;
561             }
562             
563 			prev = prev.firstSibling();
564             
565             dep_prev = prev.m_dep_prev;
566             
567         } else if (m_dep_prev) {
568             dep_prev = m_dep_prev;
569             next = m_sib_next;
570             
571             dep_prev.m_dep_next = next;
572             
573             if (next) {
574                 next.m_dep_prev = dep_prev;
575                 
576                 next.m_sib_prev = null;
577             }
578             
579         } else {
580 			m_roots.remove(this);
581             
582             dep_prev = null;
583         }
584         
585         if (dep_prev) {
586             dep_prev.m_sum_dep_weight -= m_weight;
587             
588 			root_stream = dep_prev.updateLength(-m_num_substreams);
589             
590 			root_stream.updateSumNorestWeight();
591 			root_stream.updateEffectiveWeight();
592         }
593         
594         m_sib_prev = null;
595         m_sib_next = null;
596         m_dep_prev = null;
597     }
598     
599     /*
600      * Makes the $(D Stream) as root.  Updates dpri members in this
601      * dependency tree.
602      */
603     void makeRoot(Session session)
604 	{
605         LOGF("stream: dep_make_root stream(%s=%d", this, m_id);
606         
607 		m_roots.add(this);
608         
609         updateSetRest();
610         
611         m_effective_weight = m_weight;
612         
613         updateSetTop();
614         
615         updateSumNorestWeight();
616         updateEffectiveWeight();
617         
618 		updateQueueTop(session);
619     }
620     
621     /*
622      * Makes the $(D Stream) as root and all existing root streams become
623      * direct children of $(D Stream).
624      */
625 	void makeTopmostRoot(Session session)
626     {
627         Stream first, si;
628         
629         LOGF("stream: ALL YOUR STREAM ARE BELONG TO US stream(%s=%d", this, m_id);
630         
631         first = m_roots.head;
632         
633         /* stream must not be include in m_roots.head list */
634         assert(first !is this);
635         
636         if (first) {
637             Stream prev;
638             
639             prev = first;
640             
641             LOGF("stream: root stream(%s=%d", first, first.m_id);
642             
643             m_sum_dep_weight += first.m_weight;
644             m_num_substreams += first.m_num_substreams;
645             
646             for (si = first.m_root_next; si; si = si.m_root_next) {
647                 
648                 assert(si !is this);
649                 
650                 LOGF("stream: root stream(%s=%d", si, si.m_id);
651                 
652                 m_sum_dep_weight += si.m_weight;
653                 m_num_substreams += si.m_num_substreams;
654                 
655 				prev.linkSibling(si);
656                 
657                 prev = si;
658             }
659             
660             if (m_dep_next) {
661                 Stream sib_next;
662                 
663                 sib_next = m_dep_next;
664                 
665                 sib_next.m_dep_prev = null;
666                 
667 				first.linkSibling(sib_next);
668 				linkDependency(prev);
669             } else {
670 				linkDependency(first);
671             }
672         }
673         
674         m_roots.removeAll();
675         
676 		makeRoot(session);
677     }
678     
679     /*
680      * Returns true if $(D Stream) is in any dependency tree.
681      */
682     bool inDepTree() {
683         return m_dep_prev || m_dep_next || m_sib_prev ||
684                m_sib_next || m_root_next || m_root_prev ||
685                m_roots.head is this;
686     }
687 
688 private:
689 
690 
691 	bool updateInitialWindowSize(ref int window_size, int new_initial_window_size, int old_initial_window_size)
692 	{
693 		long new_window_size = ( cast(long)window_size ) + new_initial_window_size - old_initial_window_size;
694 
695 		if (int.min > new_window_size || new_window_size > MAX_WINDOW_SIZE)
696 			return false;
697 
698 		window_size = cast(int) new_window_size;
699 
700 		return true;
701 	}
702 
703 	void pushItem(Session session) 
704 	{
705 		OutboundItem item;
706 		
707 		assert(m_item);
708 		assert(m_item.queued == 0);
709 		
710 		item = m_item;
711 		
712 		/* If item is now sent, don't push it to the queue.  Otherwise, we may push same item twice. */
713 		if (session.aob.item is item)
714 			return;
715 		
716 		if (item.weight > m_effective_weight)
717 			item.weight = m_effective_weight;
718 		
719 		item.cycle = session.last_cycle;
720 		
721 		switch (item.frame.hd.type) {
722 			case FrameType.DATA:
723 				session.ob_da_pq.push(item);
724 				break;
725 			case FrameType.HEADERS:
726 				if (m_state == StreamState.RESERVED)
727 					session.ob_ss_pq.push(item);
728 				else
729 					session.ob_pq.push(item);
730 				break;
731 			default:
732 				/* should not reach here */
733 				assert(0);
734 		}
735 		
736 		item.queued = 1;
737 	}
738 
739 	int distributedTopEffectiveWeight(int weight) {
740 		if (m_sum_top_weight == 0)
741 			return m_effective_weight;
742 
743 		weight = m_effective_weight * weight / m_sum_top_weight;
744 
745 		return max(1, weight);
746 	}
747 	
748 	/* Updates effective_weight of descendant streams in subtree of $(D Stream).  We assume that m_effective_weight is already set right. */
749 	void updateEffectiveWeight()
750 	{
751 		Stream si;
752 		
753 		LOGF("stream: update_dep_effective_weight stream(%s=%d, weight=%d, sum_norest_weight=%d, sum_top_weight=%d",
754 				this, m_id, m_weight,
755 				m_sum_norest_weight, m_sum_top_weight);
756 		
757 		/* m_sum_norest_weight == 0 means there is no StreamDPRI.TOP under stream */
758 		if (m_dpri != StreamDPRI.NO_ITEM ||
759 			m_sum_norest_weight == 0) {
760 			return;
761 		}
762 		
763 		/* If there is no direct descendant whose dpri is StreamDPRI.TOP, indirect descendants have
764 		 * the chance to send data, so recursively set weight for descendants. */
765 		if (m_sum_top_weight == 0) {
766 			for (si = m_dep_next; si; si = si.m_sib_next) {
767 				if (si.m_dpri != StreamDPRI.REST) {
768 					si.m_effective_weight =
769 						distributedEffectiveWeight(si.m_weight);
770 				}
771 				
772 				si.updateEffectiveWeight();
773 			}
774 			return;
775 		}
776 		
777 		/* If there is at least one direct descendant whose dpri is
778 		   StreamDPRI.TOP, we won't give a chance to indirect
779 		   descendants, since closed or blocked stream's weight is
780 		   distributed among its siblings */
781 		for (si = m_dep_next; si; si = si.m_sib_next) {
782 			if (si.m_dpri == StreamDPRI.TOP) {
783 				si.m_effective_weight = distributedTopEffectiveWeight(si.m_weight);				
784 				LOGF("stream: stream=%d top eweight=%d", si.m_id, si.m_effective_weight);
785 				
786 				continue;
787 			}
788 			
789 			if (si.m_dpri == StreamDPRI.NO_ITEM) {
790 				LOGF("stream: stream=%d no_item, ignored", si.m_id);
791 				
792 				/* Since we marked StreamDPRI.TOP under si, we make them StreamDPRI.REST again. */
793 				if (si.m_dep_next) si.m_dep_next.updateSetRest();
794 			} else {
795 				LOGF("stream: stream=%d rest, ignored", si.m_id);
796 			}
797 		}
798 	}
799 	
800 	void updateSetRest() 
801 	{
802 		LOGF("stream: stream=%d is rest", m_id);
803 		
804 		if (m_dpri == StreamDPRI.REST)
805 			return;
806 		
807 		if (m_dpri == StreamDPRI.TOP) 
808 		{
809 			m_dpri = StreamDPRI.REST;
810 			
811 			if (m_sib_next)
812 				m_sib_next.updateSetRest();
813 			
814 			return;
815 		}
816 
817 		if (m_sib_next)
818 			m_sib_next.updateSetRest();
819 		if (m_dep_next)
820 			m_dep_next.updateSetRest();
821 	}
822 	
823 	/*
824 	 * Performs dfs starting $(D Stream), search stream which can become
825 	 * StreamDPRI.TOP and set its dpri.
826 	 */
827 	void updateSetTop() 
828 	{
829 		Stream si;
830 		
831 		if (m_dpri == StreamDPRI.TOP)
832 			return;
833 		
834 		if (m_dpri == StreamDPRI.REST) 
835 		{
836 			LOGF("stream: stream=%d item is top", m_id);
837 			
838 			m_dpri = StreamDPRI.TOP;
839 			
840 			return;
841 		}
842 		
843 		for (si = m_dep_next; si; si = si.m_sib_next)
844 			si.updateSetTop();
845 
846 	}
847 	
848 	/*
849 	 * Performs dfs starting $(D Stream), and queue stream whose dpri is
850 	 * StreamDPRI.TOP and has not been queued yet.
851 	 */
852 	void updateQueueTop(Session session)
853 	{
854 		Stream si;
855 		
856 		if (m_dpri == StreamDPRI.REST) 
857 			return;
858 		
859 		if (m_dpri == StreamDPRI.TOP) {
860 			if (!m_item.queued) {
861 				LOGF("stream: stream=%d enqueue", m_id);
862 				pushItem(session);
863 			}
864 			
865 			return;
866 		}
867 		
868 		for (si = m_dep_next; si; si = si.m_sib_next)
869 			si.updateQueueTop(session);
870 		
871 
872 	}
873 	
874 	/*
875 	 * Updates m_sum_norest_weight and m_sum_top_weight
876 	 * recursively.  We have to gather effective sum of weight of
877 	 * descendants.  If m_dpri == StreamDPRI.NO_ITEM, we
878 	 * have to go deeper and check that any of its descendants has dpri
879 	 * value of StreamDPRI.TOP.  If so, we have to add weight of
880 	 * its direct descendants to m_sum_norest_weight.  To make this
881 	 * work, this function returns true if any of its descendants has dpri
882 	 * value of StreamDPRI.TOP, otherwise false.
883 	 *
884 	 * Calculating m_sum_top-weight is very simple compared to
885 	 * m_sum_norest_weight.  It just adds up the weight of direct
886 	 * descendants whose dpri is StreamDPRI.TOP.
887 	 */
888 	bool updateSumNorestWeight() 
889 	{
890 		Stream si;
891 		bool ret;
892 		
893 		m_sum_norest_weight = 0;
894 		m_sum_top_weight = 0;
895 		
896 		if (m_dpri == StreamDPRI.TOP) 
897 			return true;
898 		
899 		if (m_dpri == StreamDPRI.REST)
900 			return false;
901 		
902 		ret = false;
903 		
904 		for (si = m_dep_next; si; si = si.m_sib_next) {
905 			
906 			if (si.updateSumNorestWeight()) {
907 				ret = true;
908 				m_sum_norest_weight += si.m_weight;
909 			}
910 			
911 			if (si.m_dpri == StreamDPRI.TOP)
912 				m_sum_top_weight += si.m_weight;
913 		}
914 		
915 		return ret;
916 	}
917 	
918 	void updateOnAttachItem(Session session) 
919 	{
920 		Stream root_stream;
921 		
922 		m_dpri = StreamDPRI.REST;
923 		
924 		if (m_dep_next) m_dep_next.updateSetRest();
925 		
926 		root_stream = getRoot();
927 		
928 		LOGF("root=%s, stream=%s", root_stream, this);
929 		
930 		root_stream.updateSetTop();
931 		
932 		root_stream.updateSumNorestWeight();
933 		root_stream.updateEffectiveWeight();
934 		
935 		root_stream.updateQueueTop(session);
936 	}
937 	
938 	void updateDepOnDetachItem(Session session) {
939 		Stream root_stream;
940 		
941 		m_dpri = StreamDPRI.NO_ITEM;
942 
943 		root_stream = getRoot();
944 		
945 		root_stream.updateSetTop();
946 
947 		root_stream.updateSumNorestWeight();
948 		root_stream.updateEffectiveWeight();
949 		
950 		root_stream.updateQueueTop(session);
951 	}
952 
953 	void linkDependency(Stream stream) {
954 		m_dep_next = stream;
955 		stream.m_dep_prev = this;
956 	}
957 	
958 	void linkSibling(Stream stream) 
959 	{
960 		m_sib_next = stream;
961 		stream.m_sib_prev = this;
962 	}
963 	
964 	void insertLinkDependency(Stream stream)
965 	{
966 		Stream sib_next;
967 		
968 		assert(!stream.m_sib_prev);
969 		
970 		sib_next = m_dep_next;
971 		
972 		stream.linkSibling(sib_next);
973 		
974 		sib_next.m_dep_prev = null;
975 		
976 		linkDependency(stream);
977 	}
978 
979 	Stream firstSibling() 
980 	{
981 		Stream stream = this;
982 		for (; stream.m_sib_prev; stream = stream.m_sib_prev)
983 			continue;
984 		
985 		return stream;
986 	}
987 	
988 	Stream lastSibling()
989 	{
990 		Stream stream = this;
991 		for (; stream.m_sib_next; stream = stream.m_sib_next)
992 			continue;
993 
994 		return stream;
995 	}
996 	
997 	Stream updateLength(size_t delta) 
998 	{
999 		m_num_substreams += delta;
1000 
1001 		Stream stream = firstSibling();
1002 		
1003 		if (stream.m_dep_prev)
1004 			return stream.m_dep_prev.updateLength(delta);
1005 
1006 		return stream;
1007 	}
1008 
1009 	void unlinkSibling() {
1010 		Stream prev, next, dep_next;
1011 		
1012 		prev = m_sib_prev;
1013 		dep_next = m_dep_next;
1014 		
1015 		assert(prev);
1016 		
1017 		if (dep_next) {
1018 			/*
1019              *  prev--stream(--sib_next--...)
1020              *         |
1021              *        dep_next
1022              */
1023 			dep_next.m_dep_prev = null;
1024 			
1025 			prev.linkSibling(dep_next);
1026 			
1027 			if (m_sib_next) {
1028 				dep_next.lastSibling().linkSibling(m_sib_next);
1029 			}
1030 		} else {
1031 			/*
1032              *  prev--stream(--sib_next--...)
1033              */
1034 			next = m_sib_next;
1035 			
1036 			prev.m_sib_next = next;
1037 			
1038 			if (next) {
1039 				next.m_sib_prev = prev;
1040 			}
1041 		}
1042 	}
1043 	
1044 	void unlinkDependency() {
1045 		Stream prev, next, dep_next;
1046 		
1047 		prev = m_dep_prev;
1048 		dep_next = m_dep_next;
1049 		
1050 		assert(prev);
1051 		
1052 		if (dep_next) {
1053 			/*
1054              * prev
1055              *   |
1056              * stream(--sib_next--...)
1057              *   |
1058              * dep_next
1059              */
1060 			prev.linkDependency(dep_next);
1061 			
1062 			if (m_sib_next) {
1063 				dep_next.lastSibling().linkSibling(m_sib_next);
1064 			}
1065 		} else if (m_sib_next) {
1066 			/*
1067              * prev
1068              *   |
1069              * stream--sib_next
1070              */
1071 			next = m_sib_next;
1072 			
1073 			next.m_sib_prev = null;
1074 			
1075 			prev.linkDependency(next);
1076 		} else {
1077 			prev.m_dep_next = null;
1078 		}
1079 	}
1080 
1081 package:
1082 	/*
1083 	 * This function is called when trailer header (for both request and
1084 	 * response) is received.  This function performs validation and
1085 	 * returns true if it succeeds, or false.
1086 	 */
1087 	bool validateTrailerHeaders(in Frame frame) const
1088 	{
1089 		if ((frame.hd.flags & FrameFlags.END_STREAM) == 0)
1090 			return false;
1091 		
1092 		return true;
1093 	}
1094 	
1095 	/*
1096 	 * This function is called when END_STREAM flag is seen in incoming
1097 	 * frame.  This function performs validation and returns true if it
1098 	 * succeeds, or false.
1099 	 */
1100 	bool validateRemoteEndStream() const
1101 	{
1102 		if (m_http_flags & HTTPFlags.EXPECT_FINAL_RESPONSE) 
1103 			return false;
1104 		
1105 		if (m_content_length != -1 && m_content_length != m_recv_content_length)
1106 			return false;
1107 
1108 		return true;
1109 	}
1110 	
1111 	/*
1112 	 * This function is called when chunk of data is received.  This
1113 	 * function also performs validation and returns true if it succeeds, or false.
1114 	 */
1115 	bool onDataChunk(size_t n)
1116 	{
1117 		m_recv_content_length += n;
1118 
1119 		if ((m_http_flags & HTTPFlags.EXPECT_FINAL_RESPONSE) ||
1120 			(m_content_length != -1 && m_recv_content_length > m_content_length))
1121 		{
1122 			return false;
1123 		}
1124 		
1125 		return true;
1126 	}
1127 	
1128 	/*
1129 	 * This function inspects header field in |frame| and records its
1130 	 * method in stream.http_flags.  If frame.hd.type is neither
1131 	 * FrameType.HEADERS nor FrameType.PUSH_PROMISE, this function does
1132 	 * nothing.
1133 	 */
1134 	void setRequestMethod(Frame frame)
1135 	{
1136 		HeaderField[] hfa;
1137 		size_t i;
1138 		
1139 		with(FrameType) switch (frame.hd.type) {
1140 			case HEADERS:
1141 				hfa = frame.headers.hfa;
1142 				break;
1143 			case PUSH_PROMISE:
1144 				hfa = frame.push_promise.hfa;
1145 				break;
1146 			default:
1147 				return;
1148 		}
1149 		
1150 		/* TODO we should do this strictly. */
1151 		foreach(ref hf; hfa) {
1152 			import libhttp2.helpers : parseToken;
1153 			if (parseToken(hf.name) != Token._METHOD) {
1154 				continue;
1155 			}
1156 			if (hf.value == "CONNECT") {
1157 				m_http_flags |= HTTPFlags.METH_CONNECT;
1158 				return;
1159 			}
1160 			if (hf.value == "HEAD") {
1161 				m_http_flags |= HTTPFlags.METH_HEAD;
1162 				return;
1163 			}
1164 			return;
1165 		}
1166 	}
1167 
1168 	/*
1169 	 * This function is called when request header is received.  
1170 	 * This function performs validation and returns true if it succeeds, or false.
1171  	 */
1172 	bool onRequestHeaders(Frame frame) 
1173 	{
1174 		if (m_http_flags & HTTPFlags.METH_CONNECT) 
1175 		{
1176 			if ((m_http_flags & HTTPFlags._AUTHORITY) == 0) 
1177 				return false;
1178 			
1179 			m_content_length = -1;
1180 
1181 		} else {
1182 			if ((m_http_flags & HTTPFlags.REQ_HEADERS) != HTTPFlags.REQ_HEADERS ||
1183 				(m_http_flags & (HTTPFlags._AUTHORITY | HTTPFlags.HOST)) == 0) 
1184 			{
1185 				return false;
1186 			}
1187 			if (!checkPath())
1188 				return false;
1189 		}
1190 		
1191 		if (frame.hd.type == FrameType.PUSH_PROMISE) {
1192 			/* we are going to reuse data fields for upcoming response. Clear them now, except for method flags. */
1193 			m_http_flags &= HTTPFlags.METH_ALL;
1194 			m_content_length = -1;
1195 		}
1196 		
1197 		return true;
1198 	}
1199 
1200 	/*
1201  	 * This function is called when response header is received.  This
1202  	 * function performs validation and returns true if it succeeds, or false.
1203  	 */
1204 	bool onResponseHeaders() {
1205 		if ((m_http_flags & HTTPFlags._STATUS) == 0)
1206 			return false;
1207 		
1208 		if (m_status_code / 100 == 1)
1209 		{
1210 			/* non-final response */
1211 			m_http_flags = (m_http_flags & HTTPFlags.METH_ALL) | HTTPFlags.EXPECT_FINAL_RESPONSE;
1212 			m_content_length = -1;
1213 			m_status_code = -1;
1214 			return true;
1215 		}
1216 		
1217 		m_http_flags &= ~HTTPFlags.EXPECT_FINAL_RESPONSE;
1218 		bool has_response_body = (m_http_flags & HTTPFlags.METH_HEAD) == 0 && m_status_code / 100 != 1 && m_status_code != 304 && m_status_code != 204;
1219 		if (!has_response_body)
1220 			m_content_length = 0;
1221 		else if (m_http_flags & HTTPFlags.METH_CONNECT)
1222 			m_content_length = -1;
1223 		return true;
1224 	}
1225 
1226 	/* For "http" or "https" URIs, OPTIONS request may have "*" in :path
1227 	   header field to represent system-wide OPTIONS request.  Otherwise,
1228 	   :path header field value must start with "/".  This function must
1229 	   be called after ":method" header field was received.  This function
1230 	   returns nonzero if path is valid.*/
1231 	bool checkPath() {
1232 		return (httpFlags & HTTPFlags.SCHEME_HTTP) == 0 ||
1233 				((httpFlags & HTTPFlags.PATH_REGULAR)   ||
1234 				((httpFlags & HTTPFlags.METH_OPTIONS)   &&
1235 				(httpFlags & HTTPFlags.PATH_ASTERISK)));
1236 	}
1237 
1238 private:
1239 
1240 
1241     /// Stream ID
1242     int m_id;
1243 
1244     /// Pointers to form dependency tree.  If multiple streams depend on a stream, only one stream (left most) has non-null dep_prev 
1245     /// which points to the stream it depends on. The remaining streams are linked using sib_prev and sib_next.  
1246     /// The stream which has non-null dep_prev always null sib_prev.  The right most stream has null sib_next.  If this stream is
1247     /// a root of dependency tree, dep_prev and sib_prev are null.
1248     Stream m_dep_prev, m_dep_next;
1249     Stream m_sib_prev, m_sib_next;
1250 
1251     /// pointers to track dependency tree root streams.  This is doubly-linked list and first element is pointed by roots.head.
1252     Stream m_root_prev, m_root_next;
1253     /* When stream is kept after closure, it may be kept in doubly
1254      linked list pointed by Session.closed_stream_head.
1255      closed_next points to the next stream object if it is the element
1256      of the list. */
1257     Stream m_closed_prev, m_closed_next;
1258 
1259     /// pointer to roots, which tracks dependency tree roots
1260     StreamRoots m_roots;
1261 
1262     /// The arbitrary data provided by user for this stream.
1263     void *m_stream_user_data;
1264 
1265     /// Item to send
1266     OutboundItem m_item;
1267 
1268     /// categorized priority of this stream.  Only stream bearing $(D TOP) can send item.
1269 	StreamDPRI m_dpri = StreamDPRI.NO_ITEM;
1270 
1271     /// the number of streams in subtree 
1272 	int m_num_substreams = 1;
1273 
1274     /// Current remote window size. This value is computed against the current initial window size of remote endpoint. 
1275     int m_remote_window_size;
1276 
1277     /// Keep track of the number of bytes received without WINDOW_UPDATE.
1278     /// This could be negative after submitting negative value to WINDOW_UPDATE
1279     int m_recv_window_size;
1280 
1281     /// The number of bytes consumed by the application and now is subject to WINDOW_UPDATE.  
1282     /// This is only used when auto WINDOW_UPDATE is turned off.
1283     int m_consumed_size;
1284 
1285     /// The amount of recv_window_size cut using submitting negative value to WINDOW_UPDATE
1286     int m_recv_reduction;
1287 
1288     /// window size for local flow control. It is initially set to INITIAL_WINDOW_SIZE and could be increased/decreased by
1289     /// submitting WINDOW_UPDATE. See submit_window_update().
1290     int m_local_window_size;
1291 
1292     /// weight of this stream 
1293     int m_weight;
1294 
1295     /// effective weight of this stream in belonging dependency tree
1296     int m_effective_weight;
1297 
1298     /// sum of weight (not effective_weight) of direct descendants
1299     int m_sum_dep_weight;
1300 
1301     /// sum of weight of direct descendants which have at least one descendant with dpri == $(D StreamDPRI.TOP).  We use this value to calculate effective weight.
1302     int m_sum_norest_weight;
1303 
1304     /// sum of weight of direct descendants whose dpri value is $(D StreamDPRI.TOP)
1305     int m_sum_top_weight;
1306 
1307     StreamState m_state;
1308 
1309     /// This is bitwise-OR of 0 or more of StreamFlags.
1310     StreamFlags m_flags;
1311 
1312     /// Bitwise OR of zero or more ShutdownFlag values
1313 	ShutdownFlag m_shut_flags = ShutdownFlag.NONE;
1314 
1315     /// Content-Length of request/response body. -1 if unknown.
1316 	long m_content_length = -1;
1317 
1318     /// Received body so far 
1319     long m_recv_content_length;
1320 
1321     /// status code from remote server
1322 	short m_status_code = -1;
1323 
1324     /// Bitwise OR of zero or more HTTPFlags values 
1325 	HTTPFlags m_http_flags = HTTPFlags.NONE;
1326 
1327 package: // used by Session
1328 	@property int id() { return m_id; }
1329 	@property StreamDPRI dpri() { return m_dpri; }
1330 	@property StreamState state() { return m_state; }
1331 	@property void state(StreamState state) { m_state = state; }
1332 	@property OutboundItem item() { return m_item; }
1333 	@property int effectiveWeight() { return m_effective_weight; }
1334 	@property int remoteWindowSize() { return m_remote_window_size; }
1335 	@property void remoteWindowSize(int rws) { m_remote_window_size = rws; }
1336 	@property ref int localWindowSize() { return m_local_window_size; }
1337 	@property ref int recvWindowSize() { return m_recv_window_size; }
1338 	@property ref int recvReduction() { return m_recv_reduction; }
1339 	@property void recvWindowSize(int sz) { m_recv_window_size = sz; }
1340 	@property ref int consumedSize() { return m_consumed_size; }
1341 	@property void consumedSize(int sz) { m_consumed_size = sz; }
1342 	@property void* userData() { return m_stream_user_data; }
1343 	@property void userData(void* ptr) { m_stream_user_data = ptr; }
1344 	@property ShutdownFlag shutFlags() { return m_shut_flags; }
1345 	@property void shutFlags(ShutdownFlag sf) { m_shut_flags = sf; }
1346 	@property HTTPFlags httpFlags() { return m_http_flags; }
1347 	@property void httpFlags(HTTPFlags flags) { m_http_flags = flags; }
1348 	@property StreamFlags flags() { return m_flags; }
1349 	@property void flags(StreamFlags f) { m_flags = f; }
1350 	@property void weight(int w) { m_weight = w; }
1351 	@property int weight() { return m_weight; }
1352 	@property Stream closedPrev() { return m_closed_prev; }
1353 	@property Stream closedNext() { return m_closed_next; } 
1354 	@property void closedPrev(Stream s) { m_closed_prev = s; }
1355 	@property void closedNext(Stream s) { m_closed_next = s; } 
1356 	@property long contentLength() { return m_content_length; }
1357 	@property short statusCode() { return m_status_code; }
1358 	@property void statusCode(short status) { m_status_code = status; }
1359 	@property void contentLength(long len) { m_content_length = len; }
1360 	// tests
1361 	@property int subStreams() { return m_num_substreams; }
1362 	@property int sumDepWeight() { return m_sum_dep_weight; }
1363 	@property int sumNorestWeight() { return m_sum_norest_weight; }
1364 	@property Stream rootNext() { return m_root_next; } 
1365 	@property Stream depPrev() { return m_dep_prev; } 
1366 	@property Stream depNext() { return m_dep_next; } 
1367 	@property Stream sibPrev() { return m_sib_prev; }
1368 	@property Stream sibNext() { return m_sib_next; } 
1369 
1370 }