1 /** 2 * Stream 3 * 4 * Copyright: 5 * (C) 2012-2015 Tatsuhiro Tsujikawa 6 * (C) 2014-2015 Etienne Cimon 7 * 8 * License: 9 * Distributed under the terms of the MIT license with an additional section 1.2 of the curl/libcurl project. 10 * Consult the provided LICENSE.md file for details 11 */ 12 module libhttp2.stream; 13 14 import libhttp2.constants; 15 import libhttp2.types; 16 import libhttp2.frame; 17 import libhttp2.session; 18 import std.algorithm : max; 19 20 const MAX_DEP_TREE_LENGTH = 100; 21 22 class StreamRoots 23 { 24 void free() { } 25 26 void add(Stream stream) { 27 if (head) { 28 stream.m_root_next = head; 29 head.m_root_prev = stream; 30 } 31 32 head = stream; 33 } 34 35 void remove(Stream stream) 36 { 37 Stream root_prev, root_next; 38 39 root_prev = stream.m_root_prev; 40 root_next = stream.m_root_next; 41 42 if (root_prev) { 43 root_prev.m_root_next = root_next; 44 45 if (root_next) { 46 root_next.m_root_prev = root_prev; 47 } 48 } else { 49 if (root_next) { 50 root_next.m_root_prev = null; 51 } 52 53 head = root_next; 54 } 55 56 stream.m_root_prev = null; 57 stream.m_root_next = null; 58 } 59 60 void removeAll() { 61 Stream si, next; 62 63 for (si = head; si;) { 64 next = si.m_root_next; 65 66 si.m_root_prev = null; 67 si.m_root_next = null; 68 69 si = next; 70 } 71 72 head = null; 73 } 74 75 Stream head; 76 int num_streams; 77 } 78 79 class Stream { 80 81 this(int stream_id, 82 StreamFlags flags, 83 StreamState initial_state, 84 int weight, 85 StreamRoots roots, 86 int remote_initial_window_size, 87 int local_initial_window_size, 88 void *stream_user_data) 89 { 90 initialize(stream_id, flags, initial_state, weight, roots, remote_initial_window_size, local_initial_window_size, stream_user_data); 91 } 92 93 void free() { userData = null; } // We don't free stream.item. It is deleted in ActiveOutboundItem.reset(), Sessioin.free() or PriorityQueue 94 95 package void initialize(int stream_id, 96 StreamFlags flags, 97 StreamState initial_state, 98 int weight, 99 StreamRoots roots, 100 int remote_initial_window_size, 101 int local_initial_window_size, 102 void *stream_user_data) 103 { 104 m_id = stream_id; 105 m_flags = flags; 106 m_state = initial_state; 107 m_weight = weight; 108 m_effective_weight = m_weight; 109 m_roots = roots; 110 m_remote_window_size = remote_initial_window_size; 111 m_local_window_size = local_initial_window_size; 112 m_stream_user_data = stream_user_data; 113 } 114 115 /* 116 * Disallow either further receptions or transmissions, or both. 117 * |flag| is bitwise OR of one or more of ShutdownFlag. 118 */ 119 void shutdown(ShutdownFlag flag) 120 { 121 m_shut_flags |= flag; 122 } 123 124 /* 125 * Computes distributed weight of a stream of the |weight| under the 126 * $(D Stream) if $(D Stream) is removed from a dependency tree. The result 127 * is computed using m_weight rather than m_effective_weight. 128 */ 129 int distributedWeight(int weight) 130 { 131 weight = m_weight * weight / m_sum_dep_weight; 132 133 return max(1, weight); 134 } 135 136 /* 137 * Computes effective weight of a stream of the |weight| under the 138 * $(D Stream). The result is computed using m_effective_weight 139 * rather than m_weight. This function is used to determine 140 * weight in dependency tree. 141 */ 142 int distributedEffectiveWeight(int weight) { 143 if (m_sum_norest_weight == 0) 144 return m_effective_weight; 145 weight = m_effective_weight * weight / m_sum_norest_weight; 146 147 return max(1, weight); 148 } 149 150 151 152 /* 153 * Attaches |item| to $(D Stream). Updates dpri members in this 154 * dependency tree. 155 */ 156 void attachItem(OutboundItem item, Session session) 157 { 158 assert((m_flags & StreamFlags.DEFERRED_ALL) == 0); 159 assert(!m_item); 160 161 LOGF("stream: stream=%d attach item=%s", m_id, item); 162 163 m_item = item; 164 165 updateOnAttachItem(session); 166 } 167 168 /* 169 * Detaches |m_item|. Updates dpri members in this dependency 170 * tree. This function does not free |m_item|. The caller must 171 * free it. 172 */ 173 void detachItem(Session session) 174 { 175 LOGF("stream: stream=%d detach item=%s", m_id, m_item); 176 177 m_item = null; 178 m_flags &= ~StreamFlags.DEFERRED_ALL; 179 180 updateDepOnDetachItem(session); 181 } 182 183 /* 184 * Defer |m_item|. We won't call this function in the situation 185 * where |m_item| is null. The |flags| is bitwise OR of zero or 186 * more of StreamFlags.DEFERRED_USER and 187 * StreamFlags.DEFERRED_FLOW_CONTROL. The |flags| indicates 188 * the reason of this action. 189 */ 190 void deferItem(StreamFlags flags, Session session) 191 { 192 assert(m_item); 193 194 LOGF("stream: stream=%d defer item=%s cause=%02x", m_id, m_item, flags); 195 196 m_flags |= flags; 197 198 updateDepOnDetachItem(session); 199 } 200 201 /* 202 * Put back deferred data in this stream to active state. The |flags| 203 * are one or more of bitwise OR of the following values: 204 * StreamFlags.DEFERRED_USER and 205 * StreamFlags.DEFERRED_FLOW_CONTROL and given masks are 206 * cleared if they are set. So even if this function is called, if 207 * one of flag is still set, data does not become active. 208 */ 209 void resumeDeferredItem(StreamFlags flag, Session session) 210 { 211 assert(m_item); 212 213 LOGF("stream: stream=%d resume item=%s flags=%02x", m_id, m_item, flags); 214 215 m_flags &= ~flags; 216 217 if (m_flags & StreamFlags.DEFERRED_ALL) { 218 return; 219 } 220 221 updateOnAttachItem(session); 222 } 223 224 /* 225 * Returns nonzero if item is deferred by whatever reason. 226 */ 227 bool isItemDeferred() 228 { 229 return m_item && (m_flags & StreamFlags.DEFERRED_ALL); 230 } 231 232 /* 233 * Returns nonzero if item is deferred by flow control. 234 */ 235 bool isDeferredByFlowControl() 236 { 237 return m_item && (m_flags & StreamFlags.DEFERRED_FLOW_CONTROL); 238 } 239 240 241 /* 242 * Updates the remote window size with the new value 243 * |new_initial_window_size|. The |old_initial_window_size| is used to 244 * calculate the current window size. 245 * 246 * This function returns true if it succeeds or false. The failure is due to 247 * overflow. 248 */ 249 bool updateRemoteInitialWindowSize(int new_initial_window_size, int old_initial_window_size) 250 { 251 return updateInitialWindowSize(m_remote_window_size, new_initial_window_size, old_initial_window_size); 252 } 253 254 /* 255 * Updates the local window size with the new value 256 * |new_initial_window_size|. The |old_initial_window_size| is used to 257 * calculate the current window size. 258 * 259 * This function returns true if it succeeds or false. The failure is due to 260 * overflow. 261 */ 262 bool updateLocalInitialWindowSize(int new_initial_window_size, int old_initial_window_size) 263 { 264 return updateInitialWindowSize(m_local_window_size, new_initial_window_size, old_initial_window_size); 265 } 266 267 /* 268 * Call this function if promised stream $(D Stream) is replied with 269 * HEADERS. This function changes the state of the $(D Stream) to 270 * OPENED. 271 */ 272 void promiseFulfilled() { 273 m_state = StreamState.OPENED; 274 m_flags &= ~StreamFlags.PUSH; 275 } 276 277 /* 278 * Returns the stream positioned in root of the dependency tree the 279 * $(D Stream) belongs to. 280 */ 281 Stream getRoot() { 282 Stream stream = this; 283 for (;;) { 284 if (stream.m_sib_prev) { 285 stream = stream.m_sib_prev; 286 287 continue; 288 } 289 290 if (stream.m_dep_prev) { 291 stream = stream.m_dep_prev; 292 293 continue; 294 } 295 296 break; 297 } 298 299 return stream; 300 } 301 302 /* 303 * Returns true if |target| is found in subtree of $(D Stream). 304 */ 305 bool subtreeContains(Stream target) { 306 307 if (this is target) 308 return true; 309 310 if (m_sib_next && m_sib_next.subtreeContains(target)) 311 return true; 312 313 return m_dep_next?m_dep_next.subtreeContains(target):false; 314 } 315 316 /* 317 * Makes the $(D Stream) depend on the |dep_stream|. This dependency is 318 * exclusive. All existing direct descendants of |dep_stream| become 319 * the descendants of the $(D Stream). This function assumes 320 * |m_data| is null and no dpri members are changed in this 321 * dependency tree. 322 */ 323 void insert(Stream stream) { 324 Stream si; 325 Stream root_stream; 326 327 assert(!m_item); 328 329 LOGF("stream: dep_insert dep_stream(%s)=%d, stream(%s)=%d", this, m_id, stream, stream.m_id); 330 331 stream.m_sum_dep_weight = m_sum_dep_weight; 332 m_sum_dep_weight = stream.m_weight; 333 334 if (m_dep_next) { 335 for (si = m_dep_next; si; si = si.m_sib_next) { 336 stream.m_num_substreams += si.m_num_substreams; 337 } 338 339 stream.m_dep_next = m_dep_next; 340 stream.m_dep_next.m_dep_prev = stream; 341 } 342 343 m_dep_next = stream; 344 stream.m_dep_prev = this; 345 346 root_stream = updateLength(1); 347 348 root_stream.updateSumNorestWeight(); 349 root_stream.updateEffectiveWeight(); 350 351 ++stream.m_roots.num_streams; 352 } 353 354 /* 355 * Makes the $(D Stream) depend on the |dep_stream|. This dependency is 356 * not exclusive. This function assumes |m_data| is null and no 357 * dpri members are changed in this dependency tree. 358 */ 359 void add(Stream stream) { 360 Stream root_stream; 361 362 assert(!stream.m_item); 363 364 LOGF("stream: dep_add dep_stream(%s=%d, stream(%s)=%d", this, m_id, stream, stream.m_id); 365 366 root_stream = updateLength(1); 367 368 m_sum_dep_weight += stream.m_weight; 369 370 if (!m_dep_next) { 371 linkDependency(stream); 372 } else { 373 insertLinkDependency(stream); 374 } 375 376 root_stream.updateSumNorestWeight(); 377 root_stream.updateEffectiveWeight(); 378 379 ++stream.m_roots.num_streams; 380 } 381 382 /* 383 * Removes the $(D Stream) from the current dependency tree. This 384 * function assumes |m_data| is null. 385 */ 386 void remove() { 387 Stream prev, next, dep_prev, si, root_stream; 388 int sum_dep_weight_delta; 389 390 LOGF("stream: dep_remove stream(%s=%d", this, m_id); 391 392 /* Distribute weight of $(D Stream) to direct descendants */ 393 sum_dep_weight_delta = -m_weight; 394 395 for (si = m_dep_next; si; si = si.m_sib_next) { 396 si.m_weight = distributedWeight(si.m_weight); 397 398 sum_dep_weight_delta += si.m_weight; 399 } 400 401 prev = firstSibling(); 402 403 dep_prev = prev.m_dep_prev; 404 405 if (dep_prev) { 406 root_stream = dep_prev.updateLength(-1); 407 408 dep_prev.m_sum_dep_weight += sum_dep_weight_delta; 409 } 410 411 if (m_sib_prev) { 412 unlinkSibling(); 413 } else if (m_dep_prev) { 414 unlinkDependency(); 415 } else { 416 m_roots.remove(this); 417 418 /* stream is a root of tree. Removing stream makes its 419 descendants a root of its own subtree. */ 420 421 for (si = m_dep_next; si;) { 422 next = si.m_sib_next; 423 424 si.m_dep_prev = null; 425 si.m_sib_prev = null; 426 si.m_sib_next = null; 427 428 /* We already distributed weight of $(D Stream) to this. */ 429 si.m_effective_weight = si.m_weight; 430 431 si.m_roots.add(si); 432 433 si = next; 434 } 435 } 436 437 if (root_stream) { 438 root_stream.updateSumNorestWeight(); 439 root_stream.updateEffectiveWeight(); 440 } 441 442 m_num_substreams = 1; 443 m_sum_dep_weight = 0; 444 445 m_dep_prev = null; 446 m_dep_next = null; 447 m_sib_prev = null; 448 m_sib_next = null; 449 450 --m_roots.num_streams; 451 } 452 453 /* 454 * Makes the $(D Stream) depend on the |dep_stream|. This dependency is 455 * exclusive. Updates dpri members in this dependency tree. 456 */ 457 void insertSubtree(Stream stream, Session session) { 458 Stream last_sib; 459 Stream dep_next; 460 Stream root_stream; 461 size_t delta_substreams; 462 463 LOGF("stream: dep_insert_subtree dep_stream(%s=%d stream(%s)=%d", this, m_id, stream, stream.m_id); 464 465 delta_substreams = stream.m_num_substreams; 466 467 stream.updateSetRest(); 468 469 if (m_dep_next) { 470 /* m_num_substreams includes dep_stream itself */ 471 stream.m_num_substreams += m_num_substreams - 1; 472 473 stream.m_sum_dep_weight += m_sum_dep_weight; 474 m_sum_dep_weight = stream.m_weight; 475 476 dep_next = m_dep_next; 477 478 if (dep_next) dep_next.updateSetRest(); 479 480 linkDependency(stream); 481 482 if (stream.m_dep_next) { 483 last_sib = stream.m_dep_next.lastSibling(); 484 485 last_sib.linkSibling(dep_next); 486 487 dep_next.m_dep_prev = null; 488 } else { 489 stream.linkDependency(dep_next); 490 } 491 } else { 492 linkDependency(stream); 493 494 assert(m_sum_dep_weight == 0); 495 m_sum_dep_weight = stream.m_weight; 496 } 497 498 root_stream = updateLength(delta_substreams); 499 500 root_stream.updateSetTop(); 501 502 root_stream.updateSumNorestWeight(); 503 root_stream.updateEffectiveWeight(); 504 505 root_stream.updateQueueTop(session); 506 } 507 508 509 /* 510 * Makes the $(D Stream) depend on the |dep_stream|. This dependency is 511 * not exclusive. Updates dpri members in this dependency tree. 512 */ 513 void addSubtree(Stream stream, Session session) 514 { 515 Stream root_stream; 516 517 LOGF("stream: dep_add_subtree dep_stream(%s=%d stream(%s)=%d", this, m_id, stream, stream.m_id); 518 519 stream.updateSetRest(); 520 521 if (m_dep_next) { 522 m_sum_dep_weight += stream.m_weight; 523 524 insertLinkDependency(stream); 525 } else { 526 linkDependency(stream); 527 528 assert(m_sum_dep_weight == 0); 529 m_sum_dep_weight = stream.m_weight; 530 } 531 532 root_stream = updateLength(stream.m_num_substreams); 533 534 root_stream.updateSetTop(); 535 536 root_stream.updateSumNorestWeight(); 537 root_stream.updateEffectiveWeight(); 538 539 root_stream.updateQueueTop(session); 540 } 541 542 /* 543 * Removes subtree whose root stream is $(D Stream). Removing subtree 544 * does not change dpri values. The effective_weight of streams in 545 * removed subtree is not updated. 546 */ 547 void removeSubtree() 548 { 549 Stream prev, next, dep_prev, root_stream; 550 551 LOGF("stream: dep_remove_subtree stream(%s=%d", this, m_id); 552 553 if (m_sib_prev) { 554 prev = m_sib_prev; 555 556 prev.m_sib_next = m_sib_next; 557 if (prev.m_sib_next) { 558 prev.m_sib_next.m_sib_prev = prev; 559 } 560 561 prev = prev.firstSibling(); 562 563 dep_prev = prev.m_dep_prev; 564 565 } else if (m_dep_prev) { 566 dep_prev = m_dep_prev; 567 next = m_sib_next; 568 569 dep_prev.m_dep_next = next; 570 571 if (next) { 572 next.m_dep_prev = dep_prev; 573 574 next.m_sib_prev = null; 575 } 576 577 } else { 578 m_roots.remove(this); 579 580 dep_prev = null; 581 } 582 583 if (dep_prev) { 584 dep_prev.m_sum_dep_weight -= m_weight; 585 586 root_stream = dep_prev.updateLength(-m_num_substreams); 587 588 root_stream.updateSumNorestWeight(); 589 root_stream.updateEffectiveWeight(); 590 } 591 592 m_sib_prev = null; 593 m_sib_next = null; 594 m_dep_prev = null; 595 } 596 597 /* 598 * Makes the $(D Stream) as root. Updates dpri members in this 599 * dependency tree. 600 */ 601 void makeRoot(Session session) 602 { 603 LOGF("stream: dep_make_root stream(%s=%d", this, m_id); 604 605 m_roots.add(this); 606 607 updateSetRest(); 608 609 m_effective_weight = m_weight; 610 611 updateSetTop(); 612 613 updateSumNorestWeight(); 614 updateEffectiveWeight(); 615 616 updateQueueTop(session); 617 } 618 619 /* 620 * Makes the $(D Stream) as root and all existing root streams become 621 * direct children of $(D Stream). 622 */ 623 void makeTopmostRoot(Session session) 624 { 625 Stream first, si; 626 627 LOGF("stream: ALL YOUR STREAM ARE BELONG TO US stream(%s=%d", this, m_id); 628 629 first = m_roots.head; 630 631 /* stream must not be include in m_roots.head list */ 632 assert(first !is this); 633 634 if (first) { 635 Stream prev; 636 637 prev = first; 638 639 LOGF("stream: root stream(%s=%d", first, first.m_id); 640 641 m_sum_dep_weight += first.m_weight; 642 m_num_substreams += first.m_num_substreams; 643 644 for (si = first.m_root_next; si; si = si.m_root_next) { 645 646 assert(si !is this); 647 648 LOGF("stream: root stream(%s=%d", si, si.m_id); 649 650 m_sum_dep_weight += si.m_weight; 651 m_num_substreams += si.m_num_substreams; 652 653 prev.linkSibling(si); 654 655 prev = si; 656 } 657 658 if (m_dep_next) { 659 Stream sib_next; 660 661 sib_next = m_dep_next; 662 663 sib_next.m_dep_prev = null; 664 665 first.linkSibling(sib_next); 666 linkDependency(prev); 667 } else { 668 linkDependency(first); 669 } 670 } 671 672 m_roots.removeAll(); 673 674 makeRoot(session); 675 } 676 677 /* 678 * Returns true if $(D Stream) is in any dependency tree. 679 */ 680 bool inDepTree() { 681 return m_dep_prev || m_dep_next || m_sib_prev || 682 m_sib_next || m_root_next || m_root_prev || 683 m_roots.head is this; 684 } 685 686 private: 687 688 689 bool updateInitialWindowSize(ref int window_size, int new_initial_window_size, int old_initial_window_size) 690 { 691 long new_window_size = ( cast(long)window_size ) + new_initial_window_size - old_initial_window_size; 692 693 if (int.min > new_window_size || new_window_size > MAX_WINDOW_SIZE) 694 return false; 695 696 window_size = cast(int) new_window_size; 697 698 return true; 699 } 700 701 void pushItem(Session session) 702 { 703 OutboundItem item; 704 705 assert(m_item); 706 assert(m_item.queued == 0); 707 708 item = m_item; 709 710 /* If item is now sent, don't push it to the queue. Otherwise, we may push same item twice. */ 711 if (session.aob.item is item) 712 return; 713 714 if (item.weight > m_effective_weight) 715 item.weight = m_effective_weight; 716 717 item.cycle = session.last_cycle; 718 719 switch (item.frame.hd.type) { 720 case FrameType.DATA: 721 session.ob_da_pq.push(item); 722 break; 723 case FrameType.HEADERS: 724 if (m_state == StreamState.RESERVED) 725 session.ob_ss_pq.push(item); 726 else 727 session.ob_pq.push(item); 728 break; 729 default: 730 /* should not reach here */ 731 assert(0); 732 } 733 734 item.queued = 1; 735 } 736 737 int distributedTopEffectiveWeight(int weight) { 738 if (m_sum_top_weight == 0) 739 return m_effective_weight; 740 741 weight = m_effective_weight * weight / m_sum_top_weight; 742 743 return max(1, weight); 744 } 745 746 /* Updates effective_weight of descendant streams in subtree of $(D Stream). We assume that m_effective_weight is already set right. */ 747 void updateEffectiveWeight() 748 { 749 Stream si; 750 751 LOGF("stream: update_dep_effective_weight " 752 "stream(%s=%d, weight=%d, sum_norest_weight=%d, " 753 "sum_top_weight=%d", 754 this, m_id, m_weight, 755 m_sum_norest_weight, m_sum_top_weight); 756 757 /* m_sum_norest_weight == 0 means there is no StreamDPRI.TOP under stream */ 758 if (m_dpri != StreamDPRI.NO_ITEM || 759 m_sum_norest_weight == 0) { 760 return; 761 } 762 763 /* If there is no direct descendant whose dpri is StreamDPRI.TOP, indirect descendants have 764 * the chance to send data, so recursively set weight for descendants. */ 765 if (m_sum_top_weight == 0) { 766 for (si = m_dep_next; si; si = si.m_sib_next) { 767 if (si.m_dpri != StreamDPRI.REST) { 768 si.m_effective_weight = 769 distributedEffectiveWeight(si.m_weight); 770 } 771 772 si.updateEffectiveWeight(); 773 } 774 return; 775 } 776 777 /* If there is at least one direct descendant whose dpri is 778 StreamDPRI.TOP, we won't give a chance to indirect 779 descendants, since closed or blocked stream's weight is 780 distributed among its siblings */ 781 for (si = m_dep_next; si; si = si.m_sib_next) { 782 if (si.m_dpri == StreamDPRI.TOP) { 783 si.m_effective_weight = distributedTopEffectiveWeight(si.m_weight); 784 LOGF("stream: stream=%d top eweight=%d", si.m_id, si.m_effective_weight); 785 786 continue; 787 } 788 789 if (si.m_dpri == StreamDPRI.NO_ITEM) { 790 LOGF("stream: stream=%d no_item, ignored", si.m_id); 791 792 /* Since we marked StreamDPRI.TOP under si, we make them StreamDPRI.REST again. */ 793 if (si.m_dep_next) si.m_dep_next.updateSetRest(); 794 } else { 795 LOGF("stream: stream=%d rest, ignored", si.m_id); 796 } 797 } 798 } 799 800 void updateSetRest() 801 { 802 LOGF("stream: stream=%d is rest", m_id); 803 804 if (m_dpri == StreamDPRI.REST) 805 return; 806 807 if (m_dpri == StreamDPRI.TOP) 808 { 809 m_dpri = StreamDPRI.REST; 810 811 if (m_sib_next) 812 m_sib_next.updateSetRest(); 813 814 return; 815 } 816 817 if (m_sib_next) 818 m_sib_next.updateSetRest(); 819 if (m_dep_next) 820 m_dep_next.updateSetRest(); 821 } 822 823 /* 824 * Performs dfs starting $(D Stream), search stream which can become 825 * StreamDPRI.TOP and set its dpri. 826 */ 827 void updateSetTop() 828 { 829 Stream si; 830 831 if (m_dpri == StreamDPRI.TOP) 832 return; 833 834 if (m_dpri == StreamDPRI.REST) 835 { 836 LOGF("stream: stream=%d item is top", m_id); 837 838 m_dpri = StreamDPRI.TOP; 839 840 return; 841 } 842 843 for (si = m_dep_next; si; si = si.m_sib_next) 844 si.updateSetTop(); 845 846 } 847 848 /* 849 * Performs dfs starting $(D Stream), and queue stream whose dpri is 850 * StreamDPRI.TOP and has not been queued yet. 851 */ 852 void updateQueueTop(Session session) 853 { 854 Stream si; 855 856 if (m_dpri == StreamDPRI.REST) 857 return; 858 859 if (m_dpri == StreamDPRI.TOP) { 860 if (!m_item.queued) { 861 LOGF("stream: stream=%d enqueue", m_id); 862 pushItem(session); 863 } 864 865 return; 866 } 867 868 for (si = m_dep_next; si; si = si.m_sib_next) 869 si.updateQueueTop(session); 870 871 872 } 873 874 /* 875 * Updates m_sum_norest_weight and m_sum_top_weight 876 * recursively. We have to gather effective sum of weight of 877 * descendants. If m_dpri == StreamDPRI.NO_ITEM, we 878 * have to go deeper and check that any of its descendants has dpri 879 * value of StreamDPRI.TOP. If so, we have to add weight of 880 * its direct descendants to m_sum_norest_weight. To make this 881 * work, this function returns true if any of its descendants has dpri 882 * value of StreamDPRI.TOP, otherwise false. 883 * 884 * Calculating m_sum_top-weight is very simple compared to 885 * m_sum_norest_weight. It just adds up the weight of direct 886 * descendants whose dpri is StreamDPRI.TOP. 887 */ 888 bool updateSumNorestWeight() 889 { 890 Stream si; 891 bool ret; 892 893 m_sum_norest_weight = 0; 894 m_sum_top_weight = 0; 895 896 if (m_dpri == StreamDPRI.TOP) 897 return true; 898 899 if (m_dpri == StreamDPRI.REST) 900 return false; 901 902 ret = false; 903 904 for (si = m_dep_next; si; si = si.m_sib_next) { 905 906 if (si.updateSumNorestWeight()) { 907 ret = true; 908 m_sum_norest_weight += si.m_weight; 909 } 910 911 if (si.m_dpri == StreamDPRI.TOP) 912 m_sum_top_weight += si.m_weight; 913 } 914 915 return ret; 916 } 917 918 void updateOnAttachItem(Session session) 919 { 920 Stream root_stream; 921 922 m_dpri = StreamDPRI.REST; 923 924 if (m_dep_next) m_dep_next.updateSetRest(); 925 926 root_stream = getRoot(); 927 928 LOGF("root=%s, stream=%s", root_stream, this); 929 930 root_stream.updateSetTop(); 931 932 root_stream.updateSumNorestWeight(); 933 root_stream.updateEffectiveWeight(); 934 935 root_stream.updateQueueTop(session); 936 } 937 938 void updateDepOnDetachItem(Session session) { 939 Stream root_stream; 940 941 m_dpri = StreamDPRI.NO_ITEM; 942 943 root_stream = getRoot(); 944 945 root_stream.updateSetTop(); 946 947 root_stream.updateSumNorestWeight(); 948 root_stream.updateEffectiveWeight(); 949 950 root_stream.updateQueueTop(session); 951 } 952 953 void linkDependency(Stream stream) { 954 m_dep_next = stream; 955 stream.m_dep_prev = this; 956 } 957 958 void linkSibling(Stream stream) 959 { 960 m_sib_next = stream; 961 stream.m_sib_prev = this; 962 } 963 964 void insertLinkDependency(Stream stream) 965 { 966 Stream sib_next; 967 968 assert(!stream.m_sib_prev); 969 970 sib_next = m_dep_next; 971 972 stream.linkSibling(sib_next); 973 974 sib_next.m_dep_prev = null; 975 976 linkDependency(stream); 977 } 978 979 Stream firstSibling() 980 { 981 Stream stream = this; 982 for (; stream.m_sib_prev; stream = stream.m_sib_prev) 983 continue; 984 985 return stream; 986 } 987 988 Stream lastSibling() 989 { 990 Stream stream = this; 991 for (; stream.m_sib_next; stream = stream.m_sib_next) 992 continue; 993 994 return stream; 995 } 996 997 Stream updateLength(size_t delta) 998 { 999 m_num_substreams += delta; 1000 1001 Stream stream = firstSibling(); 1002 1003 if (stream.m_dep_prev) 1004 return stream.m_dep_prev.updateLength(delta); 1005 1006 return stream; 1007 } 1008 1009 void unlinkSibling() { 1010 Stream prev, next, dep_next; 1011 1012 prev = m_sib_prev; 1013 dep_next = m_dep_next; 1014 1015 assert(prev); 1016 1017 if (dep_next) { 1018 /* 1019 * prev--stream(--sib_next--...) 1020 * | 1021 * dep_next 1022 */ 1023 dep_next.m_dep_prev = null; 1024 1025 prev.linkSibling(dep_next); 1026 1027 if (m_sib_next) { 1028 dep_next.lastSibling().linkSibling(m_sib_next); 1029 } 1030 } else { 1031 /* 1032 * prev--stream(--sib_next--...) 1033 */ 1034 next = m_sib_next; 1035 1036 prev.m_sib_next = next; 1037 1038 if (next) { 1039 next.m_sib_prev = prev; 1040 } 1041 } 1042 } 1043 1044 void unlinkDependency() { 1045 Stream prev, next, dep_next; 1046 1047 prev = m_dep_prev; 1048 dep_next = m_dep_next; 1049 1050 assert(prev); 1051 1052 if (dep_next) { 1053 /* 1054 * prev 1055 * | 1056 * stream(--sib_next--...) 1057 * | 1058 * dep_next 1059 */ 1060 prev.linkDependency(dep_next); 1061 1062 if (m_sib_next) { 1063 dep_next.lastSibling().linkSibling(m_sib_next); 1064 } 1065 } else if (m_sib_next) { 1066 /* 1067 * prev 1068 * | 1069 * stream--sib_next 1070 */ 1071 next = m_sib_next; 1072 1073 next.m_sib_prev = null; 1074 1075 prev.linkDependency(next); 1076 } else { 1077 prev.m_dep_next = null; 1078 } 1079 } 1080 1081 package: 1082 /* 1083 * This function is called when trailer header (for both request and 1084 * response) is received. This function performs validation and 1085 * returns true if it succeeds, or false. 1086 */ 1087 bool validateTrailerHeaders(in Frame frame) const 1088 { 1089 if ((frame.hd.flags & FrameFlags.END_STREAM) == 0) 1090 return false; 1091 1092 return true; 1093 } 1094 1095 /* 1096 * This function is called when END_STREAM flag is seen in incoming 1097 * frame. This function performs validation and returns true if it 1098 * succeeds, or false. 1099 */ 1100 bool validateRemoteEndStream() const 1101 { 1102 if (m_http_flags & HTTPFlags.EXPECT_FINAL_RESPONSE) 1103 return false; 1104 1105 if (m_content_length != -1 && m_content_length != m_recv_content_length) 1106 return false; 1107 1108 return true; 1109 } 1110 1111 /* 1112 * This function is called when chunk of data is received. This 1113 * function also performs validation and returns true if it succeeds, or false. 1114 */ 1115 bool onDataChunk(size_t n) 1116 { 1117 m_recv_content_length += n; 1118 1119 if ((m_http_flags & HTTPFlags.EXPECT_FINAL_RESPONSE) || 1120 (m_content_length != -1 && m_recv_content_length > m_content_length)) 1121 { 1122 return false; 1123 } 1124 1125 return true; 1126 } 1127 1128 /* 1129 * This function inspects header field in |frame| and records its 1130 * method in stream.http_flags. If frame.hd.type is neither 1131 * FrameType.HEADERS nor FrameType.PUSH_PROMISE, this function does 1132 * nothing. 1133 */ 1134 void setRequestMethod(Frame frame) 1135 { 1136 HeaderField[] hfa; 1137 size_t i; 1138 1139 with(FrameType) switch (frame.hd.type) { 1140 case HEADERS: 1141 hfa = frame.headers.hfa; 1142 break; 1143 case PUSH_PROMISE: 1144 hfa = frame.push_promise.hfa; 1145 break; 1146 default: 1147 return; 1148 } 1149 1150 /* TODO we should do this strictly. */ 1151 foreach(ref hf; hfa) { 1152 import libhttp2.helpers : parseToken; 1153 if (parseToken(hf.name) != Token._METHOD) { 1154 continue; 1155 } 1156 if (hf.value == "CONNECT") { 1157 m_http_flags |= HTTPFlags.METH_CONNECT; 1158 return; 1159 } 1160 if (hf.value == "HEAD") { 1161 m_http_flags |= HTTPFlags.METH_HEAD; 1162 return; 1163 } 1164 return; 1165 } 1166 } 1167 1168 /* 1169 * This function is called when request header is received. 1170 * This function performs validation and returns true if it succeeds, or false. 1171 */ 1172 bool onRequestHeaders(Frame frame) 1173 { 1174 if (m_http_flags & HTTPFlags.METH_CONNECT) 1175 { 1176 if ((m_http_flags & HTTPFlags._AUTHORITY) == 0) 1177 return false; 1178 1179 m_content_length = -1; 1180 1181 } else { 1182 if ((m_http_flags & HTTPFlags.REQ_HEADERS) != HTTPFlags.REQ_HEADERS || 1183 (m_http_flags & (HTTPFlags._AUTHORITY | HTTPFlags.HOST)) == 0) 1184 { 1185 return false; 1186 } 1187 if (!checkPath()) 1188 return false; 1189 } 1190 1191 if (frame.hd.type == FrameType.PUSH_PROMISE) { 1192 /* we are going to reuse data fields for upcoming response. Clear them now, except for method flags. */ 1193 m_http_flags &= HTTPFlags.METH_ALL; 1194 m_content_length = -1; 1195 } 1196 1197 return true; 1198 } 1199 1200 /* 1201 * This function is called when response header is received. This 1202 * function performs validation and returns true if it succeeds, or false. 1203 */ 1204 bool onResponseHeaders() { 1205 if ((m_http_flags & HTTPFlags._STATUS) == 0) 1206 return false; 1207 1208 if (m_status_code / 100 == 1) 1209 { 1210 /* non-final response */ 1211 m_http_flags = (m_http_flags & HTTPFlags.METH_ALL) | HTTPFlags.EXPECT_FINAL_RESPONSE; 1212 m_content_length = -1; 1213 m_status_code = -1; 1214 return true; 1215 } 1216 1217 m_http_flags &= ~HTTPFlags.EXPECT_FINAL_RESPONSE; 1218 bool has_response_body = (m_http_flags & HTTPFlags.METH_HEAD) == 0 && m_status_code / 100 != 1 && m_status_code != 304 && m_status_code != 204; 1219 if (!has_response_body) 1220 m_content_length = 0; 1221 else if (m_http_flags & HTTPFlags.METH_CONNECT) 1222 m_content_length = -1; 1223 return true; 1224 } 1225 1226 /* For "http" or "https" URIs, OPTIONS request may have "*" in :path 1227 header field to represent system-wide OPTIONS request. Otherwise, 1228 :path header field value must start with "/". This function must 1229 be called after ":method" header field was received. This function 1230 returns nonzero if path is valid.*/ 1231 bool checkPath() { 1232 return (httpFlags & HTTPFlags.SCHEME_HTTP) == 0 || 1233 ((httpFlags & HTTPFlags.PATH_REGULAR) || 1234 ((httpFlags & HTTPFlags.METH_OPTIONS) && 1235 (httpFlags & HTTPFlags.PATH_ASTERISK))); 1236 } 1237 1238 private: 1239 1240 1241 /// Stream ID 1242 int m_id; 1243 1244 /// Pointers to form dependency tree. If multiple streams depend on a stream, only one stream (left most) has non-null dep_prev 1245 /// which points to the stream it depends on. The remaining streams are linked using sib_prev and sib_next. 1246 /// The stream which has non-null dep_prev always null sib_prev. The right most stream has null sib_next. If this stream is 1247 /// a root of dependency tree, dep_prev and sib_prev are null. 1248 Stream m_dep_prev, m_dep_next; 1249 Stream m_sib_prev, m_sib_next; 1250 1251 /// pointers to track dependency tree root streams. This is doubly-linked list and first element is pointed by roots.head. 1252 Stream m_root_prev, m_root_next; 1253 /* When stream is kept after closure, it may be kept in doubly 1254 linked list pointed by Session.closed_stream_head. 1255 closed_next points to the next stream object if it is the element 1256 of the list. */ 1257 Stream m_closed_prev, m_closed_next; 1258 1259 /// pointer to roots, which tracks dependency tree roots 1260 StreamRoots m_roots; 1261 1262 /// The arbitrary data provided by user for this stream. 1263 void *m_stream_user_data; 1264 1265 /// Item to send 1266 OutboundItem m_item; 1267 1268 /// categorized priority of this stream. Only stream bearing $(D TOP) can send item. 1269 StreamDPRI m_dpri = StreamDPRI.NO_ITEM; 1270 1271 /// the number of streams in subtree 1272 int m_num_substreams = 1; 1273 1274 /// Current remote window size. This value is computed against the current initial window size of remote endpoint. 1275 int m_remote_window_size; 1276 1277 /// Keep track of the number of bytes received without WINDOW_UPDATE. 1278 /// This could be negative after submitting negative value to WINDOW_UPDATE 1279 int m_recv_window_size; 1280 1281 /// The number of bytes consumed by the application and now is subject to WINDOW_UPDATE. 1282 /// This is only used when auto WINDOW_UPDATE is turned off. 1283 int m_consumed_size; 1284 1285 /// The amount of recv_window_size cut using submitting negative value to WINDOW_UPDATE 1286 int m_recv_reduction; 1287 1288 /// window size for local flow control. It is initially set to INITIAL_WINDOW_SIZE and could be increased/decreased by 1289 /// submitting WINDOW_UPDATE. See submit_window_update(). 1290 int m_local_window_size; 1291 1292 /// weight of this stream 1293 int m_weight; 1294 1295 /// effective weight of this stream in belonging dependency tree 1296 int m_effective_weight; 1297 1298 /// sum of weight (not effective_weight) of direct descendants 1299 int m_sum_dep_weight; 1300 1301 /// sum of weight of direct descendants which have at least one descendant with dpri == $(D StreamDPRI.TOP). We use this value to calculate effective weight. 1302 int m_sum_norest_weight; 1303 1304 /// sum of weight of direct descendants whose dpri value is $(D StreamDPRI.TOP) 1305 int m_sum_top_weight; 1306 1307 StreamState m_state; 1308 1309 /// This is bitwise-OR of 0 or more of StreamFlags. 1310 StreamFlags m_flags; 1311 1312 /// Bitwise OR of zero or more ShutdownFlag values 1313 ShutdownFlag m_shut_flags = ShutdownFlag.NONE; 1314 1315 /// Content-Length of request/response body. -1 if unknown. 1316 long m_content_length = -1; 1317 1318 /// Received body so far 1319 long m_recv_content_length; 1320 1321 /// status code from remote server 1322 short m_status_code = -1; 1323 1324 /// Bitwise OR of zero or more HTTPFlags values 1325 HTTPFlags m_http_flags = HTTPFlags.NONE; 1326 1327 package: // used by Session 1328 @property int id() { return m_id; } 1329 @property StreamDPRI dpri() { return m_dpri; } 1330 @property StreamState state() { return m_state; } 1331 @property void state(StreamState state) { m_state = state; } 1332 @property OutboundItem item() { return m_item; } 1333 @property int effectiveWeight() { return m_effective_weight; } 1334 @property int remoteWindowSize() { return m_remote_window_size; } 1335 @property void remoteWindowSize(int rws) { m_remote_window_size = rws; } 1336 @property ref int localWindowSize() { return m_local_window_size; } 1337 @property ref int recvWindowSize() { return m_recv_window_size; } 1338 @property ref int recvReduction() { return m_recv_reduction; } 1339 @property void recvWindowSize(int sz) { m_recv_window_size = sz; } 1340 @property ref int consumedSize() { return m_consumed_size; } 1341 @property void consumedSize(int sz) { m_consumed_size = sz; } 1342 @property void* userData() { return m_stream_user_data; } 1343 @property void userData(void* ptr) { m_stream_user_data = ptr; } 1344 @property ShutdownFlag shutFlags() { return m_shut_flags; } 1345 @property void shutFlags(ShutdownFlag sf) { m_shut_flags = sf; } 1346 @property HTTPFlags httpFlags() { return m_http_flags; } 1347 @property void httpFlags(HTTPFlags flags) { m_http_flags = flags; } 1348 @property StreamFlags flags() { return m_flags; } 1349 @property void flags(StreamFlags f) { m_flags = f; } 1350 @property void weight(int w) { m_weight = w; } 1351 @property int weight() { return m_weight; } 1352 @property Stream closedPrev() { return m_closed_prev; } 1353 @property Stream closedNext() { return m_closed_next; } 1354 @property void closedPrev(Stream s) { m_closed_prev = s; } 1355 @property void closedNext(Stream s) { m_closed_next = s; } 1356 @property long contentLength() { return m_content_length; } 1357 @property short statusCode() { return m_status_code; } 1358 @property void statusCode(short status) { m_status_code = status; } 1359 @property void contentLength(long len) { m_content_length = len; } 1360 // tests 1361 @property int subStreams() { return m_num_substreams; } 1362 @property int sumDepWeight() { return m_sum_dep_weight; } 1363 @property int sumNorestWeight() { return m_sum_norest_weight; } 1364 @property Stream rootNext() { return m_root_next; } 1365 @property Stream depPrev() { return m_dep_prev; } 1366 @property Stream depNext() { return m_dep_next; } 1367 @property Stream sibPrev() { return m_sib_prev; } 1368 @property Stream sibNext() { return m_sib_next; } 1369 1370 }