1 /** 2 * Stream 3 * 4 * Copyright: 5 * (C) 2012-2015 Tatsuhiro Tsujikawa 6 * (C) 2014-2015 Etienne Cimon 7 * 8 * License: 9 * Distributed under the terms of the MIT license with an additional section 1.2 of the curl/libcurl project. 10 * Consult the provided LICENSE.md file for details 11 */ 12 module libhttp2.stream; 13 14 import libhttp2.constants; 15 import libhttp2.types; 16 import libhttp2.frame; 17 import libhttp2.session; 18 import std.algorithm : max; 19 20 const MAX_DEP_TREE_LENGTH = 100; 21 22 align(8) 23 final class StreamRoots 24 { 25 void free() { } 26 27 void add(Stream stream) { 28 if (head) { 29 stream.m_root_next = head; 30 head.m_root_prev = stream; 31 } 32 33 head = stream; 34 } 35 36 void remove(Stream stream) 37 { 38 Stream root_prev, root_next; 39 40 root_prev = stream.m_root_prev; 41 root_next = stream.m_root_next; 42 43 if (root_prev) { 44 root_prev.m_root_next = root_next; 45 46 if (root_next) { 47 root_next.m_root_prev = root_prev; 48 } 49 } else { 50 if (root_next) { 51 root_next.m_root_prev = null; 52 } 53 54 head = root_next; 55 } 56 57 stream.m_root_prev = null; 58 stream.m_root_next = null; 59 } 60 61 void removeAll() { 62 Stream si, next; 63 64 for (si = head; si;) { 65 next = si.m_root_next; 66 67 si.m_root_prev = null; 68 si.m_root_next = null; 69 70 si = next; 71 } 72 73 head = null; 74 } 75 76 Stream head; 77 int num_streams; 78 } 79 80 align(8) 81 final class Stream { 82 83 this(int stream_id, 84 StreamFlags flags, 85 StreamState initial_state, 86 int weight, 87 StreamRoots roots, 88 int remote_initial_window_size, 89 int local_initial_window_size, 90 void *stream_user_data) 91 { 92 initialize(stream_id, flags, initial_state, weight, roots, remote_initial_window_size, local_initial_window_size, stream_user_data); 93 } 94 95 void free() { userData = null; } // We don't free stream.item. It is deleted in ActiveOutboundItem.reset(), Sessioin.free() or PriorityQueue 96 97 package void initialize(int stream_id, 98 StreamFlags flags, 99 StreamState initial_state, 100 int weight, 101 StreamRoots roots, 102 int remote_initial_window_size, 103 int local_initial_window_size, 104 void *stream_user_data) 105 { 106 m_id = stream_id; 107 m_flags = flags; 108 m_state = initial_state; 109 m_weight = weight; 110 m_effective_weight = m_weight; 111 m_roots = roots; 112 m_remote_window_size = remote_initial_window_size; 113 m_local_window_size = local_initial_window_size; 114 m_stream_user_data = stream_user_data; 115 } 116 117 /* 118 * Disallow either further receptions or transmissions, or both. 119 * |flag| is bitwise OR of one or more of ShutdownFlag. 120 */ 121 void shutdown(ShutdownFlag flag) 122 { 123 m_shut_flags |= flag; 124 } 125 126 /* 127 * Computes distributed weight of a stream of the |weight| under the 128 * $(D Stream) if $(D Stream) is removed from a dependency tree. The result 129 * is computed using m_weight rather than m_effective_weight. 130 */ 131 int distributedWeight(int weight) 132 { 133 weight = m_weight * weight / m_sum_dep_weight; 134 135 return max(1, weight); 136 } 137 138 /* 139 * Computes effective weight of a stream of the |weight| under the 140 * $(D Stream). The result is computed using m_effective_weight 141 * rather than m_weight. This function is used to determine 142 * weight in dependency tree. 143 */ 144 int distributedEffectiveWeight(int weight) { 145 if (m_sum_norest_weight == 0) 146 return m_effective_weight; 147 weight = m_effective_weight * weight / m_sum_norest_weight; 148 149 return max(1, weight); 150 } 151 152 153 154 /* 155 * Attaches |item| to $(D Stream). Updates dpri members in this 156 * dependency tree. 157 */ 158 void attachItem(OutboundItem item, Session session) 159 { 160 assert((m_flags & StreamFlags.DEFERRED_ALL) == 0); 161 assert(!m_item); 162 163 LOGF("stream: stream=%d attach item=%s", m_id, item); 164 165 m_item = item; 166 167 updateOnAttachItem(session); 168 } 169 170 /* 171 * Detaches |m_item|. Updates dpri members in this dependency 172 * tree. This function does not free |m_item|. The caller must 173 * free it. 174 */ 175 void detachItem(Session session) 176 { 177 LOGF("stream: stream=%d detach item=%s", m_id, m_item); 178 179 m_item = null; 180 m_flags &= ~cast(int)StreamFlags.DEFERRED_ALL; 181 182 updateDepOnDetachItem(session); 183 } 184 185 /* 186 * Defer |m_item|. We won't call this function in the situation 187 * where |m_item| is null. The |flags| is bitwise OR of zero or 188 * more of StreamFlags.DEFERRED_USER and 189 * StreamFlags.DEFERRED_FLOW_CONTROL. The |flags| indicates 190 * the reason of this action. 191 */ 192 void deferItem(StreamFlags flags, Session session) 193 { 194 assert(m_item); 195 196 LOGF("stream: stream=%d defer item=%s cause=%02x", m_id, m_item, flags); 197 198 m_flags |= flags; 199 200 updateDepOnDetachItem(session); 201 } 202 203 /* 204 * Put back deferred data in this stream to active state. The |flags| 205 * are one or more of bitwise OR of the following values: 206 * StreamFlags.DEFERRED_USER and 207 * StreamFlags.DEFERRED_FLOW_CONTROL and given masks are 208 * cleared if they are set. So even if this function is called, if 209 * one of flag is still set, data does not become active. 210 */ 211 void resumeDeferredItem(StreamFlags flag, Session session) 212 { 213 assert(m_item); 214 215 LOGF("stream: stream=%d resume item=%s flags=%02x", m_id, m_item, flags); 216 217 m_flags &= ~cast(int)flags; 218 219 if (m_flags & StreamFlags.DEFERRED_ALL) { 220 return; 221 } 222 223 updateOnAttachItem(session); 224 } 225 226 /* 227 * Returns nonzero if item is deferred by whatever reason. 228 */ 229 bool isItemDeferred() 230 { 231 return m_item && (m_flags & StreamFlags.DEFERRED_ALL); 232 } 233 234 /* 235 * Returns nonzero if item is deferred by flow control. 236 */ 237 bool isDeferredByFlowControl() 238 { 239 return m_item && (m_flags & StreamFlags.DEFERRED_FLOW_CONTROL); 240 } 241 242 243 /* 244 * Updates the remote window size with the new value 245 * |new_initial_window_size|. The |old_initial_window_size| is used to 246 * calculate the current window size. 247 * 248 * This function returns true if it succeeds or false. The failure is due to 249 * overflow. 250 */ 251 bool updateRemoteInitialWindowSize(int new_initial_window_size, int old_initial_window_size) 252 { 253 return updateInitialWindowSize(m_remote_window_size, new_initial_window_size, old_initial_window_size); 254 } 255 256 /* 257 * Updates the local window size with the new value 258 * |new_initial_window_size|. The |old_initial_window_size| is used to 259 * calculate the current window size. 260 * 261 * This function returns true if it succeeds or false. The failure is due to 262 * overflow. 263 */ 264 bool updateLocalInitialWindowSize(int new_initial_window_size, int old_initial_window_size) 265 { 266 return updateInitialWindowSize(m_local_window_size, new_initial_window_size, old_initial_window_size); 267 } 268 269 /* 270 * Call this function if promised stream $(D Stream) is replied with 271 * HEADERS. This function changes the state of the $(D Stream) to 272 * OPENED. 273 */ 274 void promiseFulfilled() { 275 m_state = StreamState.OPENED; 276 m_flags &= ~cast(int)StreamFlags.PUSH; 277 } 278 279 /* 280 * Returns the stream positioned in root of the dependency tree the 281 * $(D Stream) belongs to. 282 */ 283 Stream getRoot() { 284 Stream stream = this; 285 for (;;) { 286 if (stream.m_sib_prev) { 287 stream = stream.m_sib_prev; 288 289 continue; 290 } 291 292 if (stream.m_dep_prev) { 293 stream = stream.m_dep_prev; 294 295 continue; 296 } 297 298 break; 299 } 300 301 return stream; 302 } 303 304 /* 305 * Returns true if |target| is found in subtree of $(D Stream). 306 */ 307 bool subtreeContains(Stream target) { 308 309 if (this is target) 310 return true; 311 312 if (m_sib_next && m_sib_next.subtreeContains(target)) 313 return true; 314 315 return m_dep_next?m_dep_next.subtreeContains(target):false; 316 } 317 318 /* 319 * Makes the $(D Stream) depend on the |dep_stream|. This dependency is 320 * exclusive. All existing direct descendants of |dep_stream| become 321 * the descendants of the $(D Stream). This function assumes 322 * |m_data| is null and no dpri members are changed in this 323 * dependency tree. 324 */ 325 void insert(Stream stream) { 326 Stream si; 327 Stream root_stream; 328 329 assert(!m_item); 330 331 LOGF("stream: dep_insert dep_stream(%s)=%d, stream(%s)=%d", this, m_id, stream, stream.m_id); 332 333 stream.m_sum_dep_weight = m_sum_dep_weight; 334 m_sum_dep_weight = stream.m_weight; 335 336 if (m_dep_next) { 337 for (si = m_dep_next; si; si = si.m_sib_next) { 338 stream.m_num_substreams += si.m_num_substreams; 339 } 340 341 stream.m_dep_next = m_dep_next; 342 stream.m_dep_next.m_dep_prev = stream; 343 } 344 345 m_dep_next = stream; 346 stream.m_dep_prev = this; 347 348 root_stream = updateLength(1); 349 350 root_stream.updateSumNorestWeight(); 351 root_stream.updateEffectiveWeight(); 352 353 ++stream.m_roots.num_streams; 354 } 355 356 /* 357 * Makes the $(D Stream) depend on the |dep_stream|. This dependency is 358 * not exclusive. This function assumes |m_data| is null and no 359 * dpri members are changed in this dependency tree. 360 */ 361 void add(Stream stream) { 362 Stream root_stream; 363 364 assert(!stream.m_item); 365 366 LOGF("stream: dep_add dep_stream(%s=%d, stream(%s)=%d", this, m_id, stream, stream.m_id); 367 368 root_stream = updateLength(1); 369 370 m_sum_dep_weight += stream.m_weight; 371 372 if (!m_dep_next) { 373 linkDependency(stream); 374 } else { 375 insertLinkDependency(stream); 376 } 377 378 root_stream.updateSumNorestWeight(); 379 root_stream.updateEffectiveWeight(); 380 381 ++stream.m_roots.num_streams; 382 } 383 384 /* 385 * Removes the $(D Stream) from the current dependency tree. This 386 * function assumes |m_data| is null. 387 */ 388 void remove() { 389 Stream prev, next, dep_prev, si, root_stream; 390 int sum_dep_weight_delta; 391 392 LOGF("stream: dep_remove stream(%s=%d", this, m_id); 393 394 /* Distribute weight of $(D Stream) to direct descendants */ 395 sum_dep_weight_delta = -m_weight; 396 397 for (si = m_dep_next; si; si = si.m_sib_next) { 398 si.m_weight = distributedWeight(si.m_weight); 399 400 sum_dep_weight_delta += si.m_weight; 401 } 402 403 prev = firstSibling(); 404 405 dep_prev = prev.m_dep_prev; 406 407 if (dep_prev) { 408 root_stream = dep_prev.updateLength(-1); 409 410 dep_prev.m_sum_dep_weight += sum_dep_weight_delta; 411 } 412 413 if (m_sib_prev) { 414 unlinkSibling(); 415 } else if (m_dep_prev) { 416 unlinkDependency(); 417 } else { 418 m_roots.remove(this); 419 420 /* stream is a root of tree. Removing stream makes its 421 descendants a root of its own subtree. */ 422 423 for (si = m_dep_next; si;) { 424 next = si.m_sib_next; 425 426 si.m_dep_prev = null; 427 si.m_sib_prev = null; 428 si.m_sib_next = null; 429 430 /* We already distributed weight of $(D Stream) to this. */ 431 si.m_effective_weight = si.m_weight; 432 433 si.m_roots.add(si); 434 435 si = next; 436 } 437 } 438 439 if (root_stream) { 440 root_stream.updateSumNorestWeight(); 441 root_stream.updateEffectiveWeight(); 442 } 443 444 m_num_substreams = 1; 445 m_sum_dep_weight = 0; 446 447 m_dep_prev = null; 448 m_dep_next = null; 449 m_sib_prev = null; 450 m_sib_next = null; 451 452 --m_roots.num_streams; 453 } 454 455 /* 456 * Makes the $(D Stream) depend on the |dep_stream|. This dependency is 457 * exclusive. Updates dpri members in this dependency tree. 458 */ 459 void insertSubtree(Stream stream, Session session) { 460 Stream last_sib; 461 Stream dep_next; 462 Stream root_stream; 463 size_t delta_substreams; 464 465 LOGF("stream: dep_insert_subtree dep_stream(%s=%d stream(%s)=%d", this, m_id, stream, stream.m_id); 466 467 delta_substreams = stream.m_num_substreams; 468 469 stream.updateSetRest(); 470 471 if (m_dep_next) { 472 /* m_num_substreams includes dep_stream itself */ 473 stream.m_num_substreams += m_num_substreams - 1; 474 475 stream.m_sum_dep_weight += m_sum_dep_weight; 476 m_sum_dep_weight = stream.m_weight; 477 478 dep_next = m_dep_next; 479 480 if (dep_next) dep_next.updateSetRest(); 481 482 linkDependency(stream); 483 484 if (stream.m_dep_next) { 485 last_sib = stream.m_dep_next.lastSibling(); 486 487 last_sib.linkSibling(dep_next); 488 489 dep_next.m_dep_prev = null; 490 } else { 491 stream.linkDependency(dep_next); 492 } 493 } else { 494 linkDependency(stream); 495 496 assert(m_sum_dep_weight == 0); 497 m_sum_dep_weight = stream.m_weight; 498 } 499 500 root_stream = updateLength(delta_substreams); 501 502 root_stream.updateSetTop(); 503 504 root_stream.updateSumNorestWeight(); 505 root_stream.updateEffectiveWeight(); 506 507 root_stream.updateQueueTop(session); 508 } 509 510 511 /* 512 * Makes the $(D Stream) depend on the |dep_stream|. This dependency is 513 * not exclusive. Updates dpri members in this dependency tree. 514 */ 515 void addSubtree(Stream stream, Session session) 516 { 517 Stream root_stream; 518 519 LOGF("stream: dep_add_subtree dep_stream(%s=%d stream(%s)=%d", this, m_id, stream, stream.m_id); 520 521 stream.updateSetRest(); 522 523 if (m_dep_next) { 524 m_sum_dep_weight += stream.m_weight; 525 526 insertLinkDependency(stream); 527 } else { 528 linkDependency(stream); 529 530 assert(m_sum_dep_weight == 0); 531 m_sum_dep_weight = stream.m_weight; 532 } 533 534 root_stream = updateLength(stream.m_num_substreams); 535 536 root_stream.updateSetTop(); 537 538 root_stream.updateSumNorestWeight(); 539 root_stream.updateEffectiveWeight(); 540 541 root_stream.updateQueueTop(session); 542 } 543 544 /* 545 * Removes subtree whose root stream is $(D Stream). Removing subtree 546 * does not change dpri values. The effective_weight of streams in 547 * removed subtree is not updated. 548 */ 549 void removeSubtree() 550 { 551 Stream prev, next, dep_prev, root_stream; 552 553 LOGF("stream: dep_remove_subtree stream(%s=%d", this, m_id); 554 555 if (m_sib_prev) { 556 prev = m_sib_prev; 557 558 prev.m_sib_next = m_sib_next; 559 if (prev.m_sib_next) { 560 prev.m_sib_next.m_sib_prev = prev; 561 } 562 563 prev = prev.firstSibling(); 564 565 dep_prev = prev.m_dep_prev; 566 567 } else if (m_dep_prev) { 568 dep_prev = m_dep_prev; 569 next = m_sib_next; 570 571 dep_prev.m_dep_next = next; 572 573 if (next) { 574 next.m_dep_prev = dep_prev; 575 576 next.m_sib_prev = null; 577 } 578 579 } else { 580 m_roots.remove(this); 581 582 dep_prev = null; 583 } 584 585 if (dep_prev) { 586 dep_prev.m_sum_dep_weight -= m_weight; 587 588 root_stream = dep_prev.updateLength(-m_num_substreams); 589 590 root_stream.updateSumNorestWeight(); 591 root_stream.updateEffectiveWeight(); 592 } 593 594 m_sib_prev = null; 595 m_sib_next = null; 596 m_dep_prev = null; 597 } 598 599 /* 600 * Makes the $(D Stream) as root. Updates dpri members in this 601 * dependency tree. 602 */ 603 void makeRoot(Session session) 604 { 605 LOGF("stream: dep_make_root stream(%s=%d", this, m_id); 606 607 m_roots.add(this); 608 609 updateSetRest(); 610 611 m_effective_weight = m_weight; 612 613 updateSetTop(); 614 615 updateSumNorestWeight(); 616 updateEffectiveWeight(); 617 618 updateQueueTop(session); 619 } 620 621 /* 622 * Makes the $(D Stream) as root and all existing root streams become 623 * direct children of $(D Stream). 624 */ 625 void makeTopmostRoot(Session session) 626 { 627 Stream first, si; 628 629 LOGF("stream: ALL YOUR STREAM ARE BELONG TO US stream(%s=%d", this, m_id); 630 631 first = m_roots.head; 632 633 /* stream must not be include in m_roots.head list */ 634 assert(first !is this); 635 636 if (first) { 637 Stream prev; 638 639 prev = first; 640 641 LOGF("stream: root stream(%s=%d", first, first.m_id); 642 643 m_sum_dep_weight += first.m_weight; 644 m_num_substreams += first.m_num_substreams; 645 646 for (si = first.m_root_next; si; si = si.m_root_next) { 647 648 assert(si !is this); 649 650 LOGF("stream: root stream(%s=%d", si, si.m_id); 651 652 m_sum_dep_weight += si.m_weight; 653 m_num_substreams += si.m_num_substreams; 654 655 prev.linkSibling(si); 656 657 prev = si; 658 } 659 660 if (m_dep_next) { 661 Stream sib_next; 662 663 sib_next = m_dep_next; 664 665 sib_next.m_dep_prev = null; 666 667 first.linkSibling(sib_next); 668 linkDependency(prev); 669 } else { 670 linkDependency(first); 671 } 672 } 673 674 m_roots.removeAll(); 675 676 makeRoot(session); 677 } 678 679 /* 680 * Returns true if $(D Stream) is in any dependency tree. 681 */ 682 bool inDepTree() { 683 return m_dep_prev || m_dep_next || m_sib_prev || 684 m_sib_next || m_root_next || m_root_prev || 685 m_roots.head is this; 686 } 687 688 private: 689 690 691 bool updateInitialWindowSize(ref int window_size, int new_initial_window_size, int old_initial_window_size) 692 { 693 long new_window_size = ( cast(long)window_size ) + new_initial_window_size - old_initial_window_size; 694 695 if (int.min > new_window_size || new_window_size > MAX_WINDOW_SIZE) 696 return false; 697 698 window_size = cast(int) new_window_size; 699 700 return true; 701 } 702 703 void pushItem(Session session) 704 { 705 OutboundItem item; 706 707 assert(m_item); 708 assert(m_item.queued == 0); 709 710 item = m_item; 711 712 /* If item is now sent, don't push it to the queue. Otherwise, we may push same item twice. */ 713 if (session.aob.item is item) 714 return; 715 716 if (item.weight > m_effective_weight) 717 item.weight = m_effective_weight; 718 719 item.cycle = session.last_cycle; 720 721 switch (item.frame.hd.type) { 722 case FrameType.DATA: 723 session.ob_da_pq.push(item); 724 break; 725 case FrameType.HEADERS: 726 if (m_state == StreamState.RESERVED) 727 session.ob_ss_pq.push(item); 728 else 729 session.ob_pq.push(item); 730 break; 731 default: 732 /* should not reach here */ 733 assert(0); 734 } 735 736 item.queued = 1; 737 } 738 739 int distributedTopEffectiveWeight(int weight) { 740 if (m_sum_top_weight == 0) 741 return m_effective_weight; 742 743 weight = m_effective_weight * weight / m_sum_top_weight; 744 745 return max(1, weight); 746 } 747 748 /* Updates effective_weight of descendant streams in subtree of $(D Stream). We assume that m_effective_weight is already set right. */ 749 void updateEffectiveWeight() 750 { 751 Stream si; 752 753 LOGF("stream: update_dep_effective_weight stream(%s=%d, weight=%d, sum_norest_weight=%d, sum_top_weight=%d", 754 this, m_id, m_weight, 755 m_sum_norest_weight, m_sum_top_weight); 756 757 /* m_sum_norest_weight == 0 means there is no StreamDPRI.TOP under stream */ 758 if (m_dpri != StreamDPRI.NO_ITEM || 759 m_sum_norest_weight == 0) { 760 return; 761 } 762 763 /* If there is no direct descendant whose dpri is StreamDPRI.TOP, indirect descendants have 764 * the chance to send data, so recursively set weight for descendants. */ 765 if (m_sum_top_weight == 0) { 766 for (si = m_dep_next; si; si = si.m_sib_next) { 767 if (si.m_dpri != StreamDPRI.REST) { 768 si.m_effective_weight = 769 distributedEffectiveWeight(si.m_weight); 770 } 771 772 si.updateEffectiveWeight(); 773 } 774 return; 775 } 776 777 /* If there is at least one direct descendant whose dpri is 778 StreamDPRI.TOP, we won't give a chance to indirect 779 descendants, since closed or blocked stream's weight is 780 distributed among its siblings */ 781 for (si = m_dep_next; si; si = si.m_sib_next) { 782 if (si.m_dpri == StreamDPRI.TOP) { 783 si.m_effective_weight = distributedTopEffectiveWeight(si.m_weight); 784 LOGF("stream: stream=%d top eweight=%d", si.m_id, si.m_effective_weight); 785 786 continue; 787 } 788 789 if (si.m_dpri == StreamDPRI.NO_ITEM) { 790 LOGF("stream: stream=%d no_item, ignored", si.m_id); 791 792 /* Since we marked StreamDPRI.TOP under si, we make them StreamDPRI.REST again. */ 793 if (si.m_dep_next) si.m_dep_next.updateSetRest(); 794 } else { 795 LOGF("stream: stream=%d rest, ignored", si.m_id); 796 } 797 } 798 } 799 800 void updateSetRest() 801 { 802 LOGF("stream: stream=%d is rest", m_id); 803 804 if (m_dpri == StreamDPRI.REST) 805 return; 806 807 if (m_dpri == StreamDPRI.TOP) 808 { 809 m_dpri = StreamDPRI.REST; 810 811 if (m_sib_next) 812 m_sib_next.updateSetRest(); 813 814 return; 815 } 816 817 if (m_sib_next) 818 m_sib_next.updateSetRest(); 819 if (m_dep_next) 820 m_dep_next.updateSetRest(); 821 } 822 823 /* 824 * Performs dfs starting $(D Stream), search stream which can become 825 * StreamDPRI.TOP and set its dpri. 826 */ 827 void updateSetTop() 828 { 829 Stream si; 830 831 if (m_dpri == StreamDPRI.TOP) 832 return; 833 834 if (m_dpri == StreamDPRI.REST) 835 { 836 LOGF("stream: stream=%d item is top", m_id); 837 838 m_dpri = StreamDPRI.TOP; 839 840 return; 841 } 842 843 for (si = m_dep_next; si; si = si.m_sib_next) 844 si.updateSetTop(); 845 846 } 847 848 /* 849 * Performs dfs starting $(D Stream), and queue stream whose dpri is 850 * StreamDPRI.TOP and has not been queued yet. 851 */ 852 void updateQueueTop(Session session) 853 { 854 Stream si; 855 856 if (m_dpri == StreamDPRI.REST) 857 return; 858 859 if (m_dpri == StreamDPRI.TOP) { 860 if (!m_item.queued) { 861 LOGF("stream: stream=%d enqueue", m_id); 862 pushItem(session); 863 } 864 865 return; 866 } 867 868 for (si = m_dep_next; si; si = si.m_sib_next) 869 si.updateQueueTop(session); 870 871 872 } 873 874 /* 875 * Updates m_sum_norest_weight and m_sum_top_weight 876 * recursively. We have to gather effective sum of weight of 877 * descendants. If m_dpri == StreamDPRI.NO_ITEM, we 878 * have to go deeper and check that any of its descendants has dpri 879 * value of StreamDPRI.TOP. If so, we have to add weight of 880 * its direct descendants to m_sum_norest_weight. To make this 881 * work, this function returns true if any of its descendants has dpri 882 * value of StreamDPRI.TOP, otherwise false. 883 * 884 * Calculating m_sum_top-weight is very simple compared to 885 * m_sum_norest_weight. It just adds up the weight of direct 886 * descendants whose dpri is StreamDPRI.TOP. 887 */ 888 bool updateSumNorestWeight() 889 { 890 Stream si; 891 bool ret; 892 893 m_sum_norest_weight = 0; 894 m_sum_top_weight = 0; 895 896 if (m_dpri == StreamDPRI.TOP) 897 return true; 898 899 if (m_dpri == StreamDPRI.REST) 900 return false; 901 902 ret = false; 903 904 for (si = m_dep_next; si; si = si.m_sib_next) { 905 906 if (si.updateSumNorestWeight()) { 907 ret = true; 908 m_sum_norest_weight += si.m_weight; 909 } 910 911 if (si.m_dpri == StreamDPRI.TOP) 912 m_sum_top_weight += si.m_weight; 913 } 914 915 return ret; 916 } 917 918 void updateOnAttachItem(Session session) 919 { 920 Stream root_stream; 921 922 m_dpri = StreamDPRI.REST; 923 924 if (m_dep_next) m_dep_next.updateSetRest(); 925 926 root_stream = getRoot(); 927 928 LOGF("root=%s, stream=%s", root_stream, this); 929 930 root_stream.updateSetTop(); 931 932 root_stream.updateSumNorestWeight(); 933 root_stream.updateEffectiveWeight(); 934 935 root_stream.updateQueueTop(session); 936 } 937 938 void updateDepOnDetachItem(Session session) { 939 Stream root_stream; 940 941 m_dpri = StreamDPRI.NO_ITEM; 942 943 root_stream = getRoot(); 944 945 root_stream.updateSetTop(); 946 947 root_stream.updateSumNorestWeight(); 948 root_stream.updateEffectiveWeight(); 949 950 root_stream.updateQueueTop(session); 951 } 952 953 void linkDependency(Stream stream) { 954 m_dep_next = stream; 955 stream.m_dep_prev = this; 956 } 957 958 void linkSibling(Stream stream) 959 { 960 m_sib_next = stream; 961 stream.m_sib_prev = this; 962 } 963 964 void insertLinkDependency(Stream stream) 965 { 966 Stream sib_next; 967 968 assert(!stream.m_sib_prev); 969 970 sib_next = m_dep_next; 971 972 stream.linkSibling(sib_next); 973 974 sib_next.m_dep_prev = null; 975 976 linkDependency(stream); 977 } 978 979 Stream firstSibling() 980 { 981 Stream stream = this; 982 for (; stream.m_sib_prev; stream = stream.m_sib_prev) 983 continue; 984 985 return stream; 986 } 987 988 Stream lastSibling() 989 { 990 Stream stream = this; 991 for (; stream.m_sib_next; stream = stream.m_sib_next) 992 continue; 993 994 return stream; 995 } 996 997 Stream updateLength(size_t delta) 998 { 999 m_num_substreams += delta; 1000 1001 Stream stream = firstSibling(); 1002 1003 if (stream.m_dep_prev) 1004 return stream.m_dep_prev.updateLength(delta); 1005 1006 return stream; 1007 } 1008 1009 void unlinkSibling() { 1010 Stream prev, next, dep_next; 1011 1012 prev = m_sib_prev; 1013 dep_next = m_dep_next; 1014 1015 assert(prev); 1016 1017 if (dep_next) { 1018 /* 1019 * prev--stream(--sib_next--...) 1020 * | 1021 * dep_next 1022 */ 1023 dep_next.m_dep_prev = null; 1024 1025 prev.linkSibling(dep_next); 1026 1027 if (m_sib_next) { 1028 dep_next.lastSibling().linkSibling(m_sib_next); 1029 } 1030 } else { 1031 /* 1032 * prev--stream(--sib_next--...) 1033 */ 1034 next = m_sib_next; 1035 1036 prev.m_sib_next = next; 1037 1038 if (next) { 1039 next.m_sib_prev = prev; 1040 } 1041 } 1042 } 1043 1044 void unlinkDependency() { 1045 Stream prev, next, dep_next; 1046 1047 prev = m_dep_prev; 1048 dep_next = m_dep_next; 1049 1050 assert(prev); 1051 1052 if (dep_next) { 1053 /* 1054 * prev 1055 * | 1056 * stream(--sib_next--...) 1057 * | 1058 * dep_next 1059 */ 1060 prev.linkDependency(dep_next); 1061 1062 if (m_sib_next) { 1063 dep_next.lastSibling().linkSibling(m_sib_next); 1064 } 1065 } else if (m_sib_next) { 1066 /* 1067 * prev 1068 * | 1069 * stream--sib_next 1070 */ 1071 next = m_sib_next; 1072 1073 next.m_sib_prev = null; 1074 1075 prev.linkDependency(next); 1076 } else { 1077 prev.m_dep_next = null; 1078 } 1079 } 1080 1081 package: 1082 /* 1083 * This function is called when trailer header (for both request and 1084 * response) is received. This function performs validation and 1085 * returns true if it succeeds, or false. 1086 */ 1087 bool validateTrailerHeaders(in Frame frame) const 1088 { 1089 if ((frame.hd.flags & FrameFlags.END_STREAM) == 0) 1090 return false; 1091 1092 return true; 1093 } 1094 1095 /* 1096 * This function is called when END_STREAM flag is seen in incoming 1097 * frame. This function performs validation and returns true if it 1098 * succeeds, or false. 1099 */ 1100 bool validateRemoteEndStream() const 1101 { 1102 if (m_http_flags & HTTPFlags.EXPECT_FINAL_RESPONSE) 1103 return false; 1104 1105 if (m_content_length != -1 && m_content_length != m_recv_content_length) 1106 return false; 1107 1108 return true; 1109 } 1110 1111 /* 1112 * This function is called when chunk of data is received. This 1113 * function also performs validation and returns true if it succeeds, or false. 1114 */ 1115 bool onDataChunk(size_t n) 1116 { 1117 m_recv_content_length += n; 1118 1119 if ((m_http_flags & HTTPFlags.EXPECT_FINAL_RESPONSE) || 1120 (m_content_length != -1 && m_recv_content_length > m_content_length)) 1121 { 1122 return false; 1123 } 1124 1125 return true; 1126 } 1127 1128 /* 1129 * This function inspects header field in |frame| and records its 1130 * method in stream.http_flags. If frame.hd.type is neither 1131 * FrameType.HEADERS nor FrameType.PUSH_PROMISE, this function does 1132 * nothing. 1133 */ 1134 void setRequestMethod(Frame frame) 1135 { 1136 HeaderField[] hfa; 1137 size_t i; 1138 1139 with(FrameType) switch (frame.hd.type) { 1140 case HEADERS: 1141 hfa = frame.headers.hfa; 1142 break; 1143 case PUSH_PROMISE: 1144 hfa = frame.push_promise.hfa; 1145 break; 1146 default: 1147 return; 1148 } 1149 1150 /* TODO we should do this strictly. */ 1151 foreach(ref hf; hfa) { 1152 import libhttp2.helpers : parseToken; 1153 if (parseToken(hf.name) != Token._METHOD) { 1154 continue; 1155 } 1156 if (hf.value == "CONNECT") { 1157 m_http_flags |= HTTPFlags.METH_CONNECT; 1158 return; 1159 } 1160 if (hf.value == "HEAD") { 1161 m_http_flags |= HTTPFlags.METH_HEAD; 1162 return; 1163 } 1164 return; 1165 } 1166 } 1167 1168 /* 1169 * This function is called when request header is received. 1170 * This function performs validation and returns true if it succeeds, or false. 1171 */ 1172 bool onRequestHeaders(Frame frame) 1173 { 1174 if (m_http_flags & HTTPFlags.METH_CONNECT) 1175 { 1176 if ((m_http_flags & HTTPFlags._AUTHORITY) == 0) 1177 return false; 1178 1179 m_content_length = -1; 1180 1181 } else { 1182 if ((m_http_flags & HTTPFlags.REQ_HEADERS) != HTTPFlags.REQ_HEADERS || 1183 (m_http_flags & (HTTPFlags._AUTHORITY | HTTPFlags.HOST)) == 0) 1184 { 1185 return false; 1186 } 1187 if (!checkPath()) 1188 return false; 1189 } 1190 1191 if (frame.hd.type == FrameType.PUSH_PROMISE) { 1192 /* we are going to reuse data fields for upcoming response. Clear them now, except for method flags. */ 1193 m_http_flags &= HTTPFlags.METH_ALL; 1194 m_content_length = -1; 1195 } 1196 1197 return true; 1198 } 1199 1200 /* 1201 * This function is called when response header is received. This 1202 * function performs validation and returns true if it succeeds, or false. 1203 */ 1204 bool onResponseHeaders() { 1205 if ((m_http_flags & HTTPFlags._STATUS) == 0) 1206 return false; 1207 1208 if (m_status_code / 100 == 1) 1209 { 1210 /* non-final response */ 1211 m_http_flags = (m_http_flags & HTTPFlags.METH_ALL) | HTTPFlags.EXPECT_FINAL_RESPONSE; 1212 m_content_length = -1; 1213 m_status_code = -1; 1214 return true; 1215 } 1216 1217 m_http_flags &= ~HTTPFlags.EXPECT_FINAL_RESPONSE; 1218 bool has_response_body = (m_http_flags & HTTPFlags.METH_HEAD) == 0 && m_status_code / 100 != 1 && m_status_code != 304 && m_status_code != 204; 1219 if (!has_response_body) 1220 m_content_length = 0; 1221 else if (m_http_flags & HTTPFlags.METH_CONNECT) 1222 m_content_length = -1; 1223 return true; 1224 } 1225 1226 /* For "http" or "https" URIs, OPTIONS request may have "*" in :path 1227 header field to represent system-wide OPTIONS request. Otherwise, 1228 :path header field value must start with "/". This function must 1229 be called after ":method" header field was received. This function 1230 returns nonzero if path is valid.*/ 1231 bool checkPath() { 1232 return (httpFlags & HTTPFlags.SCHEME_HTTP) == 0 || 1233 ((httpFlags & HTTPFlags.PATH_REGULAR) || 1234 ((httpFlags & HTTPFlags.METH_OPTIONS) && 1235 (httpFlags & HTTPFlags.PATH_ASTERISK))); 1236 } 1237 1238 private: 1239 1240 1241 /// Stream ID 1242 int m_id; 1243 1244 /// Pointers to form dependency tree. If multiple streams depend on a stream, only one stream (left most) has non-null dep_prev 1245 /// which points to the stream it depends on. The remaining streams are linked using sib_prev and sib_next. 1246 /// The stream which has non-null dep_prev always null sib_prev. The right most stream has null sib_next. If this stream is 1247 /// a root of dependency tree, dep_prev and sib_prev are null. 1248 Stream m_dep_prev, m_dep_next; 1249 Stream m_sib_prev, m_sib_next; 1250 1251 /// pointers to track dependency tree root streams. This is doubly-linked list and first element is pointed by roots.head. 1252 Stream m_root_prev, m_root_next; 1253 /* When stream is kept after closure, it may be kept in doubly 1254 linked list pointed by Session.closed_stream_head. 1255 closed_next points to the next stream object if it is the element 1256 of the list. */ 1257 Stream m_closed_prev, m_closed_next; 1258 1259 /// pointer to roots, which tracks dependency tree roots 1260 StreamRoots m_roots; 1261 1262 /// The arbitrary data provided by user for this stream. 1263 void *m_stream_user_data; 1264 1265 /// Item to send 1266 OutboundItem m_item; 1267 1268 /// categorized priority of this stream. Only stream bearing $(D TOP) can send item. 1269 StreamDPRI m_dpri = StreamDPRI.NO_ITEM; 1270 1271 /// the number of streams in subtree 1272 int m_num_substreams = 1; 1273 1274 /// Current remote window size. This value is computed against the current initial window size of remote endpoint. 1275 int m_remote_window_size; 1276 1277 /// Keep track of the number of bytes received without WINDOW_UPDATE. 1278 /// This could be negative after submitting negative value to WINDOW_UPDATE 1279 int m_recv_window_size; 1280 1281 /// The number of bytes consumed by the application and now is subject to WINDOW_UPDATE. 1282 /// This is only used when auto WINDOW_UPDATE is turned off. 1283 int m_consumed_size; 1284 1285 /// The amount of recv_window_size cut using submitting negative value to WINDOW_UPDATE 1286 int m_recv_reduction; 1287 1288 /// window size for local flow control. It is initially set to INITIAL_WINDOW_SIZE and could be increased/decreased by 1289 /// submitting WINDOW_UPDATE. See submit_window_update(). 1290 int m_local_window_size; 1291 1292 /// weight of this stream 1293 int m_weight; 1294 1295 /// effective weight of this stream in belonging dependency tree 1296 int m_effective_weight; 1297 1298 /// sum of weight (not effective_weight) of direct descendants 1299 int m_sum_dep_weight; 1300 1301 /// sum of weight of direct descendants which have at least one descendant with dpri == $(D StreamDPRI.TOP). We use this value to calculate effective weight. 1302 int m_sum_norest_weight; 1303 1304 /// sum of weight of direct descendants whose dpri value is $(D StreamDPRI.TOP) 1305 int m_sum_top_weight; 1306 1307 StreamState m_state; 1308 1309 /// This is bitwise-OR of 0 or more of StreamFlags. 1310 StreamFlags m_flags; 1311 1312 /// Bitwise OR of zero or more ShutdownFlag values 1313 ShutdownFlag m_shut_flags = ShutdownFlag.NONE; 1314 1315 /// Content-Length of request/response body. -1 if unknown. 1316 long m_content_length = -1; 1317 1318 /// Received body so far 1319 long m_recv_content_length; 1320 1321 /// status code from remote server 1322 short m_status_code = -1; 1323 1324 /// Bitwise OR of zero or more HTTPFlags values 1325 HTTPFlags m_http_flags = HTTPFlags.NONE; 1326 1327 package: // used by Session 1328 @property int id() { return m_id; } 1329 @property StreamDPRI dpri() { return m_dpri; } 1330 @property StreamState state() { return m_state; } 1331 @property void state(StreamState state) { m_state = state; } 1332 @property OutboundItem item() { return m_item; } 1333 @property int effectiveWeight() { return m_effective_weight; } 1334 @property int remoteWindowSize() { return m_remote_window_size; } 1335 @property void remoteWindowSize(int rws) { m_remote_window_size = rws; } 1336 @property ref int localWindowSize() { return m_local_window_size; } 1337 @property ref int recvWindowSize() { return m_recv_window_size; } 1338 @property ref int recvReduction() { return m_recv_reduction; } 1339 @property void recvWindowSize(int sz) { m_recv_window_size = sz; } 1340 @property ref int consumedSize() { return m_consumed_size; } 1341 @property void consumedSize(int sz) { m_consumed_size = sz; } 1342 @property void* userData() { return m_stream_user_data; } 1343 @property void userData(void* ptr) { m_stream_user_data = ptr; } 1344 @property ShutdownFlag shutFlags() { return m_shut_flags; } 1345 @property void shutFlags(ShutdownFlag sf) { m_shut_flags = sf; } 1346 @property HTTPFlags httpFlags() { return m_http_flags; } 1347 @property void httpFlags(HTTPFlags flags) { m_http_flags = flags; } 1348 @property StreamFlags flags() { return m_flags; } 1349 @property void flags(StreamFlags f) { m_flags = f; } 1350 @property void weight(int w) { m_weight = w; } 1351 @property int weight() { return m_weight; } 1352 @property Stream closedPrev() { return m_closed_prev; } 1353 @property Stream closedNext() { return m_closed_next; } 1354 @property void closedPrev(Stream s) { m_closed_prev = s; } 1355 @property void closedNext(Stream s) { m_closed_next = s; } 1356 @property long contentLength() { return m_content_length; } 1357 @property short statusCode() { return m_status_code; } 1358 @property void statusCode(short status) { m_status_code = status; } 1359 @property void contentLength(long len) { m_content_length = len; } 1360 // tests 1361 @property int subStreams() { return m_num_substreams; } 1362 @property int sumDepWeight() { return m_sum_dep_weight; } 1363 @property int sumNorestWeight() { return m_sum_norest_weight; } 1364 @property Stream rootNext() { return m_root_next; } 1365 @property Stream depPrev() { return m_dep_prev; } 1366 @property Stream depNext() { return m_dep_next; } 1367 @property Stream sibPrev() { return m_sib_prev; } 1368 @property Stream sibNext() { return m_sib_next; } 1369 1370 }