1 /** 2 * Session 3 * 4 * Copyright: 5 * (C) 2012-2015 Tatsuhiro Tsujikawa 6 * (C) 2014-2015 Etienne Cimon 7 * 8 * License: 9 * Distributed under the terms of the MIT license with an additional section 1.2 of the curl/libcurl project. 10 * Consult the provided LICENSE.md file for details 11 */ 12 module libhttp2.session; 13 14 import libhttp2.constants; 15 import libhttp2.types; 16 import libhttp2.frame; 17 import libhttp2.stream; 18 import libhttp2.connector; 19 import libhttp2.deflater; 20 import libhttp2.inflater; 21 import libhttp2.buffers; 22 import libhttp2.priority_queue; 23 import libhttp2.helpers; 24 import libhttp2.huffman; 25 import core.exception : RangeError; 26 27 import memutils.circularbuffer; 28 import memutils.vector; 29 import memutils.hashmap; 30 31 import std.algorithm : min, max; 32 33 enum OptionsMask { 34 NONE = 0, 35 NO_AUTO_WINDOW_UPDATE = 1 << 0, 36 RECV_CLIENT_PREFACE = 1 << 1, 37 NO_HTTP_MESSAGING = 1 << 2, 38 } 39 40 enum OutboundState { 41 POP_ITEM, 42 SEND_DATA, 43 SEND_NO_COPY 44 } 45 46 struct ActiveOutboundItem { 47 OutboundItem item; 48 Buffers framebufs; 49 OutboundState state = OutboundState.POP_ITEM; 50 51 void reset() { 52 LOGF("send: reset http2_active_outbound_item"); 53 LOGF("send: aob.item = %s", item); 54 if(item) { 55 item.free(); 56 Mem.free(item); 57 item = null; 58 } 59 framebufs.reset(); 60 state = OutboundState.POP_ITEM; 61 } 62 } 63 64 /// Internal state when receiving incoming frame 65 enum InboundState : ubyte { 66 /* Receiving frame header */ 67 READ_CLIENT_PREFACE, 68 READ_FIRST_SETTINGS, 69 READ_HEAD, 70 READ_NBYTE, 71 READ_HEADER_BLOCK, 72 IGN_HEADER_BLOCK, 73 IGN_PAYLOAD, 74 FRAME_SIZE_ERROR, 75 READ_SETTINGS, 76 READ_GOAWAY_DEBUG, 77 EXPECT_CONTINUATION, 78 IGN_CONTINUATION, 79 READ_PAD_DATA, 80 READ_DATA, 81 IGN_DATA, 82 IGN_ALL, 83 } 84 85 struct InboundFrame { 86 Frame frame; 87 88 /* The received SETTINGS entry. The protocol says that we only cares 89 about the defined settings ID. If unknown ID is received, it is 90 ignored. We use last entry to hold minimum header table size if 91 same settings are seen multiple times. */ 92 Setting[INBOUND_NUM_IV] iva; 93 94 /// buffer pointers to small buffer, raw_sbuf 95 Buffer sbuf; 96 97 /// buffer pointers to large buffer, raw_lbuf 98 Buffer lbuf; 99 100 /// Large buffer, malloced on demand 101 ubyte[] raw_lbuf; 102 103 /* The number of entry filled in |iva| */ 104 size_t niv; 105 106 /* How many bytes we still need to receive for current frame */ 107 size_t payloadleft; 108 109 /* padding length for the current frame */ 110 size_t padlen; 111 112 InboundState state; 113 114 /* Small buffer. Currently the largest contiguous chunk to buffer 115 is frame header. We buffer part of payload, but they are smaller 116 than frame header. */ 117 ubyte[FRAME_HDLEN] raw_sbuf; 118 119 /// Returns the amount of bytes that are required by this frame 120 size_t readLength(const ubyte* input, const ubyte* last) 121 { 122 return min(cast(size_t)(last - input), payloadleft); 123 } 124 125 /* 126 * Resets iframe.sbuf and advance its mark pointer by |left| bytes. 127 */ 128 void setMark(size_t left) 129 { 130 sbuf.reset; 131 sbuf.mark += left; 132 } 133 134 size_t read(in ubyte* input, in ubyte* last) 135 { 136 import core.stdc.string : memcpy; 137 138 size_t readlen; 139 140 readlen = min(last - input, sbuf.markAvailable); 141 142 memcpy(sbuf.last, input, readlen); 143 sbuf.last += readlen; 144 145 return readlen; 146 } 147 148 /* 149 * Unpacks SETTINGS entry in iframe.sbuf. 150 */ 151 void unpackSetting() 152 { 153 Setting _iv; 154 _iv.unpack(sbuf[]); 155 156 size_t i; 157 158 with(Setting) switch (_iv.id) { 159 case HEADER_TABLE_SIZE: 160 case ENABLE_PUSH: 161 case MAX_CONCURRENT_STREAMS: 162 case INITIAL_WINDOW_SIZE: 163 case MAX_FRAME_SIZE: 164 case MAX_HEADER_LIST_SIZE: 165 break; 166 default: 167 LOGF("recv: ignore unknown settings id=0x%02x", _iv.id); 168 return; 169 } 170 171 for(i = 0; i < niv; ++i) { 172 if (iva[i].id == _iv.id) { 173 iva[i] = _iv; 174 break; 175 } 176 } 177 178 if (i == niv) { 179 iva[niv] = _iv; 180 niv++; 181 } 182 183 if (_iv.id == Setting.HEADER_TABLE_SIZE && _iv.value < iva[INBOUND_NUM_IV - 1].value) 184 { 185 iva[INBOUND_NUM_IV - 1] = _iv; 186 } 187 } 188 private: 189 /* 190 * Checks PADDED flags and set iframe.sbuf to read them accordingly. 191 * If padding is set, this function returns 1. If no padding is set, 192 * this function returns 0. On error, returns -1. 193 */ 194 int handlePad() 195 { 196 if (frame.hd.flags & FrameFlags.PADDED) { 197 if (frame.hd.length < 1) { 198 return -1; 199 } 200 setMark(1); 201 return 1; 202 } 203 LOGF("recv: no padding in payload"); 204 return ErrorCode.OK; 205 } 206 207 /* 208 * Computes number of padding based on flags. This function returns 209 * padlen if it succeeds, or -1. 210 */ 211 int computePad() 212 { 213 /* 1 for Pad Length field */ 214 int _padlen = sbuf.pos[0] + 1; 215 216 LOGF("recv: padlen=%d", padlen); 217 218 /* We cannot use iframe.frame.hd.length because of CONTINUATION */ 219 if (_padlen - 1 > payloadleft) { 220 return -1; 221 } 222 223 padlen = _padlen; 224 225 return _padlen; 226 } 227 228 /* 229 * This function returns the effective payload length in the data of 230 * length |readlen| when the remaning payload is |payloadleft|. The 231 * |payloadleft| does not include |readlen|. If padding was started 232 * strictly before this data chunk, this function returns -1. 233 */ 234 int effectiveReadLength(size_t _payloadleft, size_t readlen) 235 { 236 size_t trail_padlen = frame.trailPadlen(padlen); 237 238 if (trail_padlen > _payloadleft) { 239 size_t padlen; 240 padlen = trail_padlen - _payloadleft; 241 if (readlen < padlen) { 242 return -1; 243 } else { 244 return cast(int)(readlen - padlen); 245 } 246 } 247 return cast(int)readlen; 248 } 249 250 void reset() 251 { 252 /* A bit risky code, since if this function is called from Session(), we rely on the fact that 253 frame.hd.type is 0, so that no free is performed. */ 254 with (FrameType) switch (frame.hd.type) { 255 case HEADERS: 256 frame.headers.free(); 257 break; 258 case PRIORITY: 259 frame.priority.free(); 260 break; 261 case RST_STREAM: 262 frame.rst_stream.free(); 263 break; 264 case SETTINGS: 265 frame.settings.free(); 266 break; 267 case PUSH_PROMISE: 268 frame.push_promise.free(); 269 break; 270 case PING: 271 frame.ping.free(); 272 break; 273 case GOAWAY: 274 frame.goaway.free(); 275 break; 276 case WINDOW_UPDATE: 277 frame.window_update.free(); 278 break; 279 default: break; 280 } 281 282 destroy(frame); 283 284 state = InboundState.READ_HEAD; 285 286 sbuf = Buffer(raw_sbuf.ptr[0 .. raw_sbuf.sizeof]); 287 sbuf.mark += FRAME_HDLEN; 288 289 lbuf.free(); 290 lbuf = Buffer(); 291 destroy(iva); 292 payloadleft = 0; 293 padlen = 0; 294 iva[INBOUND_NUM_IV - 1].id = Setting.HEADER_TABLE_SIZE; 295 iva[INBOUND_NUM_IV - 1].value = uint.max; 296 niv = 0; 297 } 298 } 299 300 struct SettingsStorage { 301 uint header_table_size = HD_DEFAULT_MAX_BUFFER_SIZE; 302 uint enable_push = 1; 303 uint max_concurrent_streams = INITIAL_MAX_CONCURRENT_STREAMS; 304 uint initial_window_size = INITIAL_WINDOW_SIZE; 305 uint max_frame_size = MAX_FRAME_SIZE_MIN; 306 uint max_header_list_size = uint.max; 307 } 308 309 enum GoAwayFlags { 310 NONE = 0, 311 /* Flag means that connection should be terminated after sending GOAWAY. */ 312 TERM_ON_SEND = 0x1, 313 /* Flag means GOAWAY to terminate session has been sent */ 314 TERM_SENT = 0x2, 315 /* Flag means GOAWAY was sent */ 316 SENT = 0x4, 317 /* Flag means GOAWAY was received */ 318 RECV = 0x8, 319 } 320 321 enum { 322 CLIENT = false, 323 SERVER = true 324 } 325 326 align(8) 327 final class Session { 328 ~this() { 329 if (connector !is null) free(); 330 } 331 332 this(bool server, Connector callbacks, in Options options = Options.init) 333 { 334 if (server) { 335 is_server = true; 336 next_stream_id = 2; // server IDs always pair 337 } 338 else 339 next_stream_id = 1; // client IDs always impair 340 341 roots = Mem.alloc!StreamRoots(); 342 scope(failure) Mem.free(roots); 343 344 hd_inflater = Inflater(true); 345 scope(failure) hd_inflater.free(); 346 347 hd_deflater = Deflater(DEFAULT_MAX_DEFLATE_BUFFER_SIZE); 348 scope(failure) hd_deflater.free(); 349 350 ob_pq = PriorityQueue(128); 351 scope(failure) ob_pq.free(); 352 353 ob_ss_pq = PriorityQueue(128); 354 scope(failure) ob_ss_pq.free(); 355 356 ob_da_pq = PriorityQueue(128); 357 scope(failure) ob_da_pq.free(); 358 359 /* 1 for Pad Field. */ 360 aob.framebufs = Mem.alloc!Buffers(FRAMEBUF_CHUNKLEN, FRAMEBUF_MAX_NUM, 1, FRAME_HDLEN + 1); 361 scope(failure) { aob.framebufs.free(); Mem.free(aob.framebufs); } 362 363 aob.reset(); 364 365 if (options != Options.init) { 366 if ((options.opt_set_mask & OptionFlags.NO_AUTO_WINDOW_UPDATE) && options.no_auto_window_update) 367 { 368 opt_flags |= OptionsMask.NO_AUTO_WINDOW_UPDATE; 369 } 370 371 if (options.opt_set_mask & OptionFlags.PEER_MAX_CONCURRENT_STREAMS) 372 { 373 remote_settings.max_concurrent_streams = options.peer_max_concurrent_streams; 374 } 375 376 if ((options.opt_set_mask & OptionFlags.RECV_CLIENT_PREFACE) && options.recv_client_preface) 377 { 378 opt_flags |= OptionsMask.RECV_CLIENT_PREFACE; 379 } 380 381 if ((options.opt_set_mask & OptionFlags.NO_HTTP_MESSAGING) && options.no_http_messaging) 382 { 383 opt_flags |= OptionsMask.NO_HTTP_MESSAGING; 384 } 385 } 386 387 connector = callbacks; 388 389 iframe.reset(); 390 391 if (is_server && opt_flags & OptionsMask.RECV_CLIENT_PREFACE) 392 { 393 iframe.state = InboundState.READ_CLIENT_PREFACE; 394 iframe.payloadleft = CLIENT_CONNECTION_PREFACE.length; 395 } else static if (ENABLE_FIRST_SETTING_CHECK) 396 { 397 iframe.state = InboundState.READ_FIRST_SETTINGS; 398 } 399 } 400 401 /** 402 * Frees any resources allocated for $(D Session). If $(D Session) is 403 * `null`, this function does nothing. 404 */ 405 void free() { 406 if (inflight_iva) 407 Mem.free(inflight_iva); 408 roots.free(); 409 Mem.free(roots); 410 freeAllStreams(); 411 iframe.reset(); 412 ob_pq.free(); 413 ob_ss_pq.free(); 414 ob_da_pq.free(); 415 aob.reset(); 416 hd_deflater.free(); 417 hd_inflater.free(); 418 aob.framebufs.free(); 419 if (aob.framebufs) 420 Mem.free(aob.framebufs); 421 destroy(streams); 422 connector = null; 423 } 424 425 /** 426 * Sends pending frames to the remote peer. 427 * 428 * This function retrieves the highest prioritized frame from the 429 * outbound queue and sends it to the remote peer. It does this as 430 * many as possible until the user callback $(D Connector.write) returns 431 * $(D ErrorCode.WOULDBLOCK) or the outbound queue becomes empty. 432 * 433 * This function calls several $(D Connector) functions which are passed 434 * when initializing the $(D Session). Here is the simple time chart 435 * which tells when each callback is invoked: 436 * 437 * 1. Get the next frame to be sent from a priority sorted outbound queue. 438 * 439 * 2. Prepare transmission of the frame. 440 * 441 * 3. $(D Connector.onFrameFailure) may be invoked if the control frame cannot 442 * be sent because some preconditions are not met (e.g., request HEADERS 443 * cannot be sent after GOAWAY). This then aborts the following steps. 444 * 445 * 4. $(D Connector.selectPaddingLength) is invoked if the frame is HEADERS, 446 * PUSH_PROMISE or DATA. 447 * 448 * 5. If the frame is request HEADERS, the stream is opened here. 449 * 450 * 6. $(D Connector.onFrameReady) is invoked. 451 * 452 * 7. $(D Connector.write) is invoked one or more times to send the frame. 453 * 454 * 8. $(D Connector.onFrameSent) is invoked after all data is transmitted. 455 * 456 * 9. $(D Connector.onStreamExit) may be invoked if the transmission of the frame 457 * triggers closure of the stream, it is destroyed afterwards. 458 * 459 * This function returns 0 if it succeeds, or one of the following 460 * negative error codes: 461 * 462 * $(D ErrorCode.CALLBACK_FAILURE) 463 * The callback function failed. 464 */ 465 ErrorCode send() { 466 ErrorCode rv; 467 ubyte[] data; 468 int sentlen; 469 Buffers framebufs = aob.framebufs; 470 471 for (;;) { 472 rv = memSendInternal(data, false); 473 if (rv < 0) 474 return rv; 475 else if (data.length == 0) 476 return ErrorCode.OK; 477 try sentlen = connector.write(data); 478 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 479 480 if (sentlen < 0) { 481 if (cast(ErrorCode) sentlen == ErrorCode.WOULDBLOCK) { 482 /* Transmission canceled. Rewind the offset */ 483 framebufs.cur.buf.pos -= data.length; 484 return ErrorCode.OK; 485 } 486 487 return ErrorCode.CALLBACK_FAILURE; 488 } 489 490 /* Rewind the offset to the amount of unsent bytes */ 491 framebufs.cur.buf.pos -= (data.length - sentlen); 492 } 493 494 assert(false); 495 } 496 497 /** 498 * @function 499 * 500 * Returns the serialized data to send. 501 * 502 * This function behaves like `send()` except that it 503 * does not use $(D Connector.write) to transmit data. 504 * Instead, it assigns the serialized data to the given $(D ubyte[]) 505 * |data_arr|. The other callbacks are called in the same way as they are 506 * in `send()`. 507 * 508 * This function may not return all serialized data in one invocation. 509 * To get all data, call this function repeatedly until it returns an 510 * array of 0 length or one of negative error codes. 511 * 512 * The assigned |data_ar| is valid until the next call of 513 * `memSend()` or `send()`. 514 * 515 * The caller must send all data before sending the next chunk of 516 * data. 517 * 518 * This function returns an error code on failure or 0 on success 519 */ 520 ErrorCode memSend(ref ubyte[] data_arr) 521 { 522 ErrorCode rv; 523 524 rv = memSendInternal(data_arr, true); 525 if (rv < 0) { 526 return rv; 527 } 528 529 /* We have to call afterFrameSent here to handle stream 530 closure upon transmission of frames. Otherwise, END_STREAM may 531 be reached to client before we call memSend 532 again and we may get exceeding number of incoming streams. */ 533 rv = afterFrameSent(); 534 if (rv < 0) { 535 /* FATAL */ 536 assert(isFatal(rv)); 537 return rv; 538 } 539 540 return ErrorCode.OK; 541 } 542 543 /** 544 * Receives frames from the remote peer. 545 * 546 * This function receives as many frames as possible until the user 547 * callback $(D Connector.read) returns $(D ErrorCode.WOULDBLOCK). 548 * This function calls several $(D Connector) functions which are passed 549 * when initializing the $(D Session). 550 * 551 * Here is the simple time chart which tells when each callback is invoked: 552 * 553 * 1. $(D Connector.read) is invoked one or more times to receive the frame header. 554 * 555 * 2. $(D Connector.onFrameHeader) is invoked after the frame header is received. 556 * 557 * 3. If the frame is DATA frame: 558 * 559 * 1. $(D Connector.read) is invoked one or more times to receive the DATA payload. 560 * 561 * 2. $(D Connector.onDataChunk) is invoked alternatively with $(D Connector.read) 562 * for each chunk of data. 563 * 564 * 2. $(D Connector.onFrame) may be invoked if one DATA frame is completely received. 565 * 566 * 3. $(D Connector.onStreamExit) may be invoked if the reception of the frame triggers 567 * closure of the stream. 568 * 569 * 4. If the frame is the control frame: 570 * 571 * 1. $(D Connector.read) is invoked one or more times to receive the whole frame. 572 * 573 * 2. If the received frame is valid, then following actions are 574 * taken. 575 * - If the frame is either HEADERS or PUSH_PROMISE: 576 * - $(D Connector.onHeaders) is invoked first. 577 * - $(D Connector.onHeaderField) is invoked for each header fields. 578 * - $(D Connector.onFrame) is invoked after all header fields. 579 * - For other frames: 580 * - $(D Connector.onFrame) is invoked. 581 * - $(D Connector.onStreamExit) may be invoked if the reception of the frame 582 * triggers the closure of the stream. 583 * 584 * 3. $(D Connector.onInvalidFrame) may be invoked if the received frame is unpacked 585 * but is interpreted as invalid. 586 * 587 * This function returns 0 if it succeeds, or one of the following 588 * negative error codes: 589 * 590 * $(D ErrorCode.EOF) 591 * The remote peer did shutdown on the connection. 592 * $(D ErrorCode.CALLBACK_FAILURE) 593 * The callback function failed. 594 * $(D ErrorCode.BAD_PREFACE) 595 * Invalid client preface was detected. This error only returns 596 * when $(D Session) was configured as server and 597 * `setRecvClientPreface()` is used. 598 */ 599 ErrorCode recv() { 600 ubyte[INBOUND_BUFFER_LENGTH] buf; 601 while (1) { 602 int readlen; 603 readlen = callRead(buf.ptr[0 .. buf.sizeof]); 604 if (readlen > 0) { 605 // process the received data 606 int proclen = memRecv(buf[0 .. readlen]); 607 if (proclen < 0) { 608 return cast(ErrorCode)proclen; 609 } 610 assert(proclen == readlen); 611 } else if (readlen == 0 || readlen == ErrorCode.WOULDBLOCK) { 612 return ErrorCode.OK; 613 } else if (readlen == ErrorCode.EOF) { 614 return ErrorCode.EOF; 615 } else if (readlen < 0) { 616 return ErrorCode.CALLBACK_FAILURE; 617 } 618 } 619 } 620 621 /** 622 * Processes data |input| as an input from the remote endpoint. The 623 * |inlen| indicates the number of bytes in the |in|. 624 * 625 * This function behaves like $(D Session.recv) except that it 626 * does not use $(D Connector.read) to receive data; the 627 * |input| is the only data for the invocation of this function. If all 628 * bytes are processed, this function returns. The other connector 629 * are called in the same way as they are in $(D Session.recv). 630 * 631 * In the current implementation, this function always tries to 632 * process all input data unless either an error occurs or 633 * $(D ErrorCode.PAUSE) is returned from $(D Connector.onHeaderField) or 634 * $(D Connector.onDataChunk). If $(D ErrorCode.PAUSE) is used, 635 * the return value includes the number of bytes which was used to 636 * produce the data or frame for the callback. 637 * 638 * This function returns the number of processed bytes, or one of the 639 * following negative error codes: 640 * 641 * $(D ErrorCode.CALLBACK_FAILURE) 642 * The callback function failed. 643 * $(D ErrorCode.BAD_PREFACE) 644 * Invalid client preface was detected. This error only returns 645 * when $(D Session) was configured as server and 646 * `setRecvClientPreface()` is used. 647 */ 648 int memRecv(in ubyte[] input) 649 { 650 const(ubyte)* pos = input.ptr; 651 const ubyte* first = input.ptr; 652 const ubyte* last = input.ptr + input.length; 653 size_t readlen; 654 int padlen; 655 ErrorCode rv; 656 bool busy; 657 FrameHeader cont_hd; 658 Stream stream; 659 size_t pri_fieldlen; 660 661 LOGF("recv: connection recv_window_size=%d, local_window=%d", recv_window_size, local_window_size); 662 663 for (;;) { 664 with(InboundState) final switch (iframe.state) { 665 case READ_CLIENT_PREFACE: 666 readlen = min(input.length, iframe.payloadleft); 667 668 if (CLIENT_CONNECTION_PREFACE[$ - iframe.payloadleft .. $ - iframe.payloadleft + readlen] != pos[0 .. readlen]) 669 { 670 return ErrorCode.BAD_PREFACE; 671 } 672 673 iframe.payloadleft -= readlen; 674 pos += readlen; 675 676 if (iframe.payloadleft == 0) { 677 iframe.reset(); 678 iframe.state = READ_FIRST_SETTINGS; 679 } 680 681 break; 682 case READ_FIRST_SETTINGS: 683 LOGF("recv: [READ_FIRST_SETTINGS]"); 684 685 readlen = iframe.read(pos, last); 686 pos += readlen; 687 688 if (iframe.sbuf.markAvailable) { 689 return cast(int)(pos - first); 690 } 691 692 if (iframe.sbuf.pos[3] != FrameType.SETTINGS || (iframe.sbuf.pos[4] & FrameFlags.ACK)) 693 { 694 695 iframe.state = IGN_ALL; 696 697 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "SETTINGS expected"); 698 699 if (isFatal(rv)) { 700 return rv; 701 } 702 703 return cast(int)input.length; 704 } 705 706 iframe.state = READ_HEAD; 707 708 goto case READ_HEAD; 709 case READ_HEAD: { 710 bool on_frame_header_called; 711 712 LOGF("recv: [READ_HEAD]"); 713 714 readlen = iframe.read(pos, last); 715 pos += readlen; 716 717 if (iframe.sbuf.markAvailable) { 718 return cast(int)(pos - first); 719 } 720 721 iframe.frame.hd.unpack(iframe.sbuf[]); 722 iframe.payloadleft = iframe.frame.hd.length; 723 724 LOGF("recv: payloadlen=%d, type=%u, flags=0x%02x, stream_id=%d", 725 iframe.frame.hd.length, iframe.frame.hd.type, iframe.frame.hd.flags, iframe.frame.hd.stream_id); 726 727 if (iframe.frame.hd.length > local_settings.max_frame_size) { 728 LOGF("recv: length is too large %d > %u", iframe.frame.hd.length, local_settings.max_frame_size); 729 730 busy = true; 731 732 iframe.state = IGN_PAYLOAD; 733 734 rv = terminateSessionWithReason(FrameError.FRAME_SIZE_ERROR, "too large frame size"); 735 736 if (isFatal(rv)) { 737 return rv; 738 } 739 740 break; 741 } 742 743 switch (iframe.frame.hd.type) { 744 case FrameType.DATA: { 745 LOGF("recv: DATA"); 746 747 iframe.frame.hd.flags &= (FrameFlags.END_STREAM | FrameFlags.PADDED); 748 /* Check stream is open. If it is not open or closing, ignore payload. */ 749 busy = true; 750 751 rv = onDataFailFast(); 752 if (rv == ErrorCode.IGN_PAYLOAD) { 753 LOGF("recv: DATA not allowed stream_id=%d", iframe.frame.hd.stream_id); 754 iframe.state = IGN_DATA; 755 break; 756 } 757 758 if (isFatal(rv)) { 759 return rv; 760 } 761 762 rv = cast(ErrorCode)iframe.handlePad(); 763 if (rv < 0) { 764 iframe.state = IGN_DATA; 765 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "DATA: insufficient padding space"); 766 767 if (isFatal(rv)) { 768 return rv; 769 } 770 break; 771 } 772 773 if (rv == 1) { 774 iframe.state = READ_PAD_DATA; 775 break; 776 } 777 778 iframe.state = READ_DATA; 779 break; 780 } 781 case FrameType.HEADERS: 782 783 LOGF("recv: HEADERS"); 784 785 iframe.frame.hd.flags &= (FrameFlags.END_STREAM | FrameFlags.END_HEADERS | FrameFlags.PADDED | FrameFlags.PRIORITY); 786 787 rv = cast(ErrorCode)iframe.handlePad(); 788 if (rv < 0) { 789 busy = true; 790 791 iframe.state = IGN_PAYLOAD; 792 793 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "HEADERS: insufficient padding space"); 794 if (isFatal(rv)) { 795 return rv; 796 } 797 break; 798 } 799 800 if (rv == 1) { 801 iframe.state = READ_NBYTE; 802 break; 803 } 804 805 pri_fieldlen = priorityLength(iframe.frame.hd.flags); 806 807 if (pri_fieldlen > 0) { 808 if (iframe.payloadleft < pri_fieldlen) { 809 busy = true; 810 iframe.state = FRAME_SIZE_ERROR; 811 break; 812 } 813 814 iframe.state = READ_NBYTE; 815 816 iframe.setMark(pri_fieldlen); 817 break; 818 } 819 820 /* Call onFrameHeader here because processHeadersFrame() may call onHeaders callback */ 821 bool ok = callOnFrameHeader(iframe.frame.hd); 822 823 if (!ok) { 824 return ErrorCode.CALLBACK_FAILURE; 825 } 826 827 on_frame_header_called = true; 828 829 rv = processHeadersFrame(); 830 831 if (isFatal(rv)) { 832 return rv; 833 } 834 835 busy = true; 836 837 if (rv == ErrorCode.IGN_HEADER_BLOCK) { 838 iframe.state = IGN_HEADER_BLOCK; 839 break; 840 } 841 842 iframe.state = READ_HEADER_BLOCK; 843 844 break; 845 case FrameType.PRIORITY: 846 LOGF("recv: PRIORITY"); 847 848 iframe.frame.hd.flags = FrameFlags.NONE; 849 850 if (iframe.payloadleft != PRIORITY_SPECLEN) { 851 busy = true; 852 853 iframe.state = FRAME_SIZE_ERROR; 854 855 break; 856 } 857 858 iframe.state = READ_NBYTE; 859 860 iframe.setMark(PRIORITY_SPECLEN); 861 862 break; 863 case FrameType.RST_STREAM: 864 case FrameType.WINDOW_UPDATE: 865 static if (DEBUG) { 866 switch (iframe.frame.hd.type) { 867 case FrameType.RST_STREAM: 868 LOGF("recv: RST_STREAM"); 869 break; 870 case FrameType.WINDOW_UPDATE: 871 LOGF("recv: WINDOW_UPDATE"); 872 break; 873 default: break; 874 } 875 } 876 877 iframe.frame.hd.flags = FrameFlags.NONE; 878 879 if (iframe.payloadleft != 4) { 880 busy = true; 881 iframe.state = FRAME_SIZE_ERROR; 882 break; 883 } 884 885 iframe.state = READ_NBYTE; 886 887 iframe.setMark(4); 888 889 break; 890 case FrameType.SETTINGS: 891 LOGF("recv: SETTINGS"); 892 893 iframe.frame.hd.flags &= FrameFlags.ACK; 894 895 if ((iframe.frame.hd.length % FRAME_SETTINGS_ENTRY_LENGTH) || 896 ((iframe.frame.hd.flags & FrameFlags.ACK) && iframe.payloadleft > 0)) { 897 busy = true; 898 iframe.state = FRAME_SIZE_ERROR; 899 break; 900 } 901 902 iframe.state = READ_SETTINGS; 903 904 if (iframe.payloadleft) { 905 iframe.setMark(FRAME_SETTINGS_ENTRY_LENGTH); 906 break; 907 } 908 909 busy = true; 910 911 iframe.setMark(0); 912 913 break; 914 case FrameType.PUSH_PROMISE: 915 LOGF("recv: PUSH_PROMISE"); 916 917 iframe.frame.hd.flags &= (FrameFlags.END_HEADERS | FrameFlags.PADDED); 918 919 rv = cast(ErrorCode)iframe.handlePad(); 920 if (rv < 0) { 921 busy = true; 922 iframe.state = IGN_PAYLOAD; 923 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: insufficient padding space"); 924 if (isFatal(rv)) { 925 return rv; 926 } 927 break; 928 } 929 930 if (rv == 1) { 931 iframe.state = READ_NBYTE; 932 break; 933 } 934 935 if (iframe.payloadleft < 4) { 936 busy = true; 937 iframe.state = FRAME_SIZE_ERROR; 938 break; 939 } 940 941 iframe.state = READ_NBYTE; 942 943 iframe.setMark(4); 944 945 break; 946 case FrameType.PING: 947 LOGF("recv: PING"); 948 949 iframe.frame.hd.flags &= FrameFlags.ACK; 950 951 if (iframe.payloadleft != 8) { 952 busy = true; 953 iframe.state = FRAME_SIZE_ERROR; 954 break; 955 } 956 957 iframe.state = READ_NBYTE; 958 iframe.setMark(8); 959 960 break; 961 case FrameType.GOAWAY: 962 LOGF("recv: GOAWAY"); 963 964 iframe.frame.hd.flags = FrameFlags.NONE; 965 966 if (iframe.payloadleft < 8) { 967 busy = true; 968 iframe.state = FRAME_SIZE_ERROR; 969 break; 970 } 971 972 iframe.state = READ_NBYTE; 973 iframe.setMark(8); 974 975 break; 976 case FrameType.CONTINUATION: 977 LOGF("recv: unexpected CONTINUATION"); 978 979 /* Receiving CONTINUATION in this state are subject to connection error of type PROTOCOL_ERROR */ 980 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "CONTINUATION: unexpected"); 981 if (isFatal(rv)) 982 { 983 return rv; 984 } 985 986 busy = true; 987 988 iframe.state = IGN_PAYLOAD; 989 990 break; 991 default: 992 LOGF("recv: unknown frame"); 993 994 /* Silently ignore unknown frame type. */ 995 996 busy = true; 997 998 iframe.state = IGN_PAYLOAD; 999 1000 break; 1001 } 1002 1003 if (!on_frame_header_called) { 1004 switch (iframe.state) { 1005 case IGN_HEADER_BLOCK: 1006 case IGN_PAYLOAD: 1007 case FRAME_SIZE_ERROR: 1008 case IGN_DATA: 1009 break; 1010 default: 1011 bool ok = callOnFrameHeader(iframe.frame.hd); 1012 1013 if (!ok) { 1014 return ErrorCode.CALLBACK_FAILURE; 1015 } 1016 } 1017 } 1018 1019 break; 1020 } 1021 case READ_NBYTE: 1022 LOGF("recv: [READ_NBYTE]"); 1023 1024 readlen = iframe.read(pos, last); 1025 pos += readlen; 1026 iframe.payloadleft -= readlen; 1027 1028 LOGF("recv: readlen=%d, payloadleft=%d, left=%d, type=%s", readlen, iframe.payloadleft, iframe.sbuf.markAvailable, iframe.frame.hd.type); 1029 1030 if (iframe.sbuf.markAvailable) { 1031 return cast(int)(pos - first); 1032 } 1033 1034 switch (iframe.frame.hd.type) { 1035 case FrameType.HEADERS: 1036 if (iframe.padlen == 0 && (iframe.frame.hd.flags & FrameFlags.PADDED)) { 1037 padlen = iframe.computePad(); 1038 if (padlen < 0) { 1039 busy = true; 1040 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "HEADERS: invalid padding"); 1041 if (isFatal(rv)) { 1042 return rv; 1043 } 1044 iframe.state = IGN_PAYLOAD; 1045 break; 1046 } 1047 iframe.frame.headers.padlen = padlen; 1048 1049 pri_fieldlen = priorityLength(iframe.frame.hd.flags); 1050 if (pri_fieldlen > 0) { 1051 if (iframe.payloadleft < pri_fieldlen) { 1052 busy = true; 1053 iframe.state = FRAME_SIZE_ERROR; 1054 break; 1055 } 1056 iframe.state = READ_NBYTE; 1057 iframe.setMark(pri_fieldlen); 1058 break; 1059 } else { 1060 /* Truncate buffers used for padding spec */ 1061 iframe.setMark(0); 1062 } 1063 } 1064 1065 rv = processHeadersFrame(); 1066 if (isFatal(rv)) { 1067 return rv; 1068 } 1069 1070 busy = true; 1071 1072 if (rv == ErrorCode.IGN_HEADER_BLOCK) { 1073 iframe.state = IGN_HEADER_BLOCK; 1074 break; 1075 } 1076 1077 iframe.state = READ_HEADER_BLOCK; 1078 1079 break; 1080 case FrameType.PRIORITY: 1081 rv = processPriorityFrame(); 1082 if (isFatal(rv)) { 1083 return rv; 1084 } 1085 1086 iframe.reset(); 1087 1088 break; 1089 case FrameType.RST_STREAM: 1090 rv = processRstStreamFrame(); 1091 if (isFatal(rv)) { 1092 return rv; 1093 } 1094 1095 iframe.reset(); 1096 1097 break; 1098 case FrameType.PUSH_PROMISE: 1099 if (iframe.padlen == 0 && (iframe.frame.hd.flags & FrameFlags.PADDED)) { 1100 padlen = iframe.computePad(); 1101 if (padlen < 0) { 1102 busy = true; 1103 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: invalid padding"); 1104 if (isFatal(rv)) { 1105 return rv; 1106 } 1107 iframe.state = IGN_PAYLOAD; 1108 break; 1109 } 1110 1111 iframe.frame.push_promise.padlen = padlen; 1112 1113 if (iframe.payloadleft < 4) { 1114 busy = true; 1115 iframe.state = FRAME_SIZE_ERROR; 1116 break; 1117 } 1118 1119 iframe.state = READ_NBYTE; 1120 1121 iframe.setMark(4); 1122 1123 break; 1124 } 1125 1126 rv = processPushPromiseFrame(); 1127 if (isFatal(rv)) { 1128 return rv; 1129 } 1130 1131 busy = true; 1132 1133 if (rv == ErrorCode.IGN_HEADER_BLOCK) { 1134 iframe.state = IGN_HEADER_BLOCK; 1135 break; 1136 } 1137 1138 iframe.state = READ_HEADER_BLOCK; 1139 1140 break; 1141 case FrameType.PING: 1142 rv = processPingFrame(); 1143 if (isFatal(rv)) { 1144 return rv; 1145 } 1146 1147 iframe.reset(); 1148 1149 break; 1150 case FrameType.GOAWAY: { 1151 size_t debuglen; 1152 1153 /* 8 is Last-stream-ID + Error Code */ 1154 debuglen = iframe.frame.hd.length - 8; 1155 1156 if (debuglen > 0) { 1157 iframe.raw_lbuf = Mem.alloc!(ubyte[])(debuglen); 1158 iframe.lbuf = Buffer(iframe.raw_lbuf); 1159 } 1160 1161 busy = true; 1162 1163 iframe.state = READ_GOAWAY_DEBUG; 1164 1165 break; 1166 } 1167 case FrameType.WINDOW_UPDATE: 1168 rv = processWindowUpdateFrame(); 1169 if (isFatal(rv)) { 1170 return rv; 1171 } 1172 1173 iframe.reset(); 1174 1175 break; 1176 default: 1177 /* This is unknown frame */ 1178 iframe.reset(); 1179 1180 break; 1181 } 1182 break; 1183 case READ_HEADER_BLOCK: 1184 case IGN_HEADER_BLOCK: { 1185 int data_readlen; 1186 static if (DEBUG) { 1187 if (iframe.state == READ_HEADER_BLOCK) { 1188 LOGF("recv: [READ_HEADER_BLOCK]"); 1189 } else { 1190 LOGF("recv: [IGN_HEADER_BLOCK]"); 1191 } 1192 } 1193 1194 readlen = iframe.readLength(pos, last); 1195 1196 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft - readlen); 1197 1198 data_readlen = iframe.effectiveReadLength(iframe.payloadleft - readlen, readlen); 1199 1200 if (data_readlen >= 0) { 1201 size_t trail_padlen; 1202 size_t hd_proclen = 0; 1203 trail_padlen = iframe.frame.trailPadlen(iframe.padlen); 1204 LOGF("recv: block final=%d", (iframe.frame.hd.flags & FrameFlags.END_HEADERS) && iframe.payloadleft - data_readlen == trail_padlen); 1205 1206 rv = inflateHeaderBlock(iframe.frame, hd_proclen, cast(ubyte[])pos[0 .. data_readlen], 1207 (iframe.frame.hd.flags & FrameFlags.END_HEADERS) && iframe.payloadleft - data_readlen == trail_padlen, 1208 iframe.state == READ_HEADER_BLOCK); 1209 1210 if (isFatal(rv)) { 1211 return rv; 1212 } 1213 1214 if (rv == ErrorCode.PAUSE) { 1215 pos += hd_proclen; 1216 iframe.payloadleft -= hd_proclen; 1217 1218 return cast(int)(pos - first); 1219 } 1220 1221 if (rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) { 1222 /* The application says no more headers. We decompress the 1223 rest of the header block but not invoke on_header_callback 1224 and on_frame_recv_callback. */ 1225 pos += hd_proclen; 1226 iframe.payloadleft -= hd_proclen; 1227 1228 addRstStream(iframe.frame.hd.stream_id, FrameError.INTERNAL_ERROR); 1229 busy = true; 1230 iframe.state = IGN_HEADER_BLOCK; 1231 break; 1232 } 1233 1234 pos += readlen; 1235 iframe.payloadleft -= readlen; 1236 1237 if (rv == ErrorCode.HEADER_COMP) { 1238 /* GOAWAY is already issued */ 1239 if (iframe.payloadleft == 0) { 1240 iframe.reset(); 1241 } else { 1242 busy = true; 1243 iframe.state = IGN_PAYLOAD; 1244 } 1245 break; 1246 } 1247 } else { 1248 pos += readlen; 1249 iframe.payloadleft -= readlen; 1250 } 1251 1252 if (iframe.payloadleft) { 1253 break; 1254 } 1255 1256 if ((iframe.frame.hd.flags & FrameFlags.END_HEADERS) == 0) { 1257 1258 iframe.setMark(FRAME_HDLEN); 1259 1260 iframe.padlen = 0; 1261 1262 if (iframe.state == READ_HEADER_BLOCK) 1263 iframe.state = EXPECT_CONTINUATION; 1264 else 1265 iframe.state = IGN_CONTINUATION; 1266 1267 } else { 1268 if (iframe.state == READ_HEADER_BLOCK) { 1269 rv = afterHeaderBlockReceived(); 1270 if (isFatal(rv)) { 1271 return rv; 1272 } 1273 } 1274 iframe.reset(); 1275 } 1276 break; 1277 } 1278 case IGN_PAYLOAD: 1279 LOGF("recv: [IGN_PAYLOAD]"); 1280 1281 readlen = iframe.readLength(pos, last); 1282 iframe.payloadleft -= readlen; 1283 pos += readlen; 1284 1285 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1286 1287 if (iframe.payloadleft) { 1288 break; 1289 } 1290 1291 switch (iframe.frame.hd.type) { 1292 case FrameType.HEADERS: 1293 case FrameType.PUSH_PROMISE: 1294 case FrameType.CONTINUATION: 1295 /* Mark inflater bad so that we won't perform further decoding */ 1296 hd_inflater.ctx.bad = 1; 1297 break; 1298 default: 1299 break; 1300 } 1301 1302 iframe.reset(); 1303 1304 break; 1305 case FRAME_SIZE_ERROR: 1306 LOGF("recv: [FRAME_SIZE_ERROR]"); 1307 1308 rv = terminateSession(FrameError.FRAME_SIZE_ERROR); 1309 if (isFatal(rv)) { 1310 return rv; 1311 } 1312 1313 busy = true; 1314 1315 iframe.state = IGN_PAYLOAD; 1316 1317 break; 1318 case READ_SETTINGS: 1319 LOGF("recv: [READ_SETTINGS]"); 1320 1321 readlen = iframe.read(pos, last); 1322 iframe.payloadleft -= readlen; 1323 pos += readlen; 1324 1325 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1326 1327 if (iframe.sbuf.markAvailable) { 1328 break; 1329 } 1330 1331 if (readlen > 0) 1332 iframe.unpackSetting(); 1333 1334 if (iframe.payloadleft) { 1335 iframe.setMark(FRAME_SETTINGS_ENTRY_LENGTH); 1336 break; 1337 } 1338 1339 rv = processSettingsFrame(); 1340 1341 if (isFatal(rv)) { 1342 return rv; 1343 } 1344 1345 iframe.reset(); 1346 1347 break; 1348 case READ_GOAWAY_DEBUG: 1349 LOGF("recv: [READ_GOAWAY_DEBUG]"); 1350 1351 readlen = iframe.readLength(pos, last); 1352 1353 iframe.lbuf.last[0 .. readlen] = pos[0 .. readlen]; 1354 iframe.lbuf.last += readlen; 1355 1356 iframe.payloadleft -= readlen; 1357 pos += readlen; 1358 1359 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1360 1361 if (iframe.payloadleft) { 1362 assert(iframe.lbuf.available > 0); 1363 1364 break; 1365 } 1366 1367 rv = processGoAwayFrame(); 1368 1369 if (isFatal(rv)) { 1370 return rv; 1371 } 1372 1373 iframe.reset(); 1374 1375 break; 1376 case EXPECT_CONTINUATION: 1377 case IGN_CONTINUATION: 1378 static if (DEBUG) { 1379 if (iframe.state == EXPECT_CONTINUATION) { 1380 LOGF("recv: [EXPECT_CONTINUATION]"); 1381 } else { 1382 LOGF("recv: [IGN_CONTINUATION]"); 1383 } 1384 } 1385 1386 readlen = iframe.read(pos, last); 1387 pos += readlen; 1388 1389 if (iframe.sbuf.markAvailable) { 1390 return cast(int)(pos - first); 1391 } 1392 1393 cont_hd.unpack(iframe.sbuf.pos); 1394 iframe.payloadleft = cont_hd.length; 1395 1396 LOGF("recv: payloadlen=%d, type=%u, flags=0x%02x, stream_id=%d", cont_hd.length, cont_hd.type, cont_hd.flags, cont_hd.stream_id); 1397 1398 if (cont_hd.type != FrameType.CONTINUATION || 1399 cont_hd.stream_id != iframe.frame.hd.stream_id) { 1400 LOGF("recv: expected stream_id=%d, type=%d, but got stream_id=%d, type=%d", 1401 iframe.frame.hd.stream_id, FrameType.CONTINUATION, cont_hd.stream_id, cont_hd.type); 1402 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "unexpected non-CONTINUATION frame or stream_id is invalid"); 1403 if (isFatal(rv)) { 1404 return rv; 1405 } 1406 1407 busy = true; 1408 1409 iframe.state = IGN_PAYLOAD; 1410 1411 break; 1412 } 1413 1414 /* CONTINUATION won't bear FrameFlags.PADDED flag */ 1415 iframe.frame.hd.flags |= cont_hd.flags & FrameFlags.END_HEADERS; 1416 iframe.frame.hd.length += cont_hd.length; 1417 1418 busy = true; 1419 1420 if (iframe.state == EXPECT_CONTINUATION) { 1421 iframe.state = READ_HEADER_BLOCK; 1422 1423 bool ok = callOnFrameHeader(cont_hd); 1424 1425 if (!ok) { 1426 return ErrorCode.CALLBACK_FAILURE; 1427 } 1428 } else { 1429 iframe.state = IGN_HEADER_BLOCK; 1430 } 1431 1432 break; 1433 case READ_PAD_DATA: 1434 LOGF("recv: [READ_PAD_DATA]"); 1435 1436 readlen = iframe.read(pos, last); 1437 pos += readlen; 1438 iframe.payloadleft -= readlen; 1439 1440 LOGF("recv: readlen=%d, payloadleft=%d, left=%d", readlen, iframe.payloadleft, iframe.sbuf.markAvailable); 1441 1442 if (iframe.sbuf.markAvailable) { 1443 return cast(int)(pos - first); 1444 } 1445 1446 /* Pad Length field is subject to flow control */ 1447 rv = updateRecvConnectionWindowSize(readlen); 1448 if (isFatal(rv)) { 1449 return rv; 1450 } 1451 1452 /* Pad Length field is consumed immediately */ 1453 rv = consume(iframe.frame.hd.stream_id, readlen); 1454 1455 if (isFatal(rv)) { 1456 return rv; 1457 } 1458 1459 stream = getStream(iframe.frame.hd.stream_id); 1460 if (stream) 1461 updateRecvStreamWindowSize(stream, readlen, iframe.payloadleft || (iframe.frame.hd.flags & FrameFlags.END_STREAM) == 0); 1462 1463 busy = true; 1464 1465 padlen = iframe.computePad(); 1466 if (padlen < 0) { 1467 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "DATA: invalid padding"); 1468 if (isFatal(rv)) { 1469 return rv; 1470 } 1471 iframe.state = IGN_DATA; 1472 break; 1473 } 1474 1475 iframe.frame.data.padlen = padlen; 1476 1477 iframe.state = READ_DATA; 1478 1479 break; 1480 case READ_DATA: 1481 LOGF("recv: [READ_DATA]"); 1482 1483 readlen = iframe.readLength(pos, last); 1484 iframe.payloadleft -= readlen; 1485 pos += readlen; 1486 1487 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1488 1489 if (readlen > 0) { 1490 int data_readlen; 1491 1492 rv = updateRecvConnectionWindowSize(readlen); 1493 if (isFatal(rv)) { 1494 return rv; 1495 } 1496 1497 stream = getStream(iframe.frame.hd.stream_id); 1498 if (stream) 1499 updateRecvStreamWindowSize(stream, readlen, iframe.payloadleft || (iframe.frame.hd.flags & FrameFlags.END_STREAM) == 0); 1500 1501 data_readlen = iframe.effectiveReadLength(iframe.payloadleft, readlen); 1502 1503 padlen = cast(int)(readlen - data_readlen); 1504 1505 if (padlen > 0) { 1506 /* Padding is considered as "consumed" immediately */ 1507 rv = consume(iframe.frame.hd.stream_id, padlen); 1508 1509 if (isFatal(rv)) { 1510 return rv; 1511 } 1512 } 1513 1514 LOGF("recv: data_readlen=%d", data_readlen); 1515 1516 if (stream && data_readlen > 0) { 1517 if (isHTTPMessagingEnabled()) { 1518 if (!stream.onDataChunk(data_readlen)) { 1519 addRstStream(iframe.frame.hd.stream_id, FrameError.PROTOCOL_ERROR); 1520 busy = true; 1521 iframe.state = IGN_DATA; 1522 break; 1523 } 1524 } 1525 1526 ubyte[] data_nopad = cast(ubyte[])(pos - readlen)[0 .. data_readlen]; 1527 FrameFlags flags = iframe.frame.hd.flags; 1528 int stream_id = iframe.frame.hd.stream_id; 1529 bool pause; 1530 bool ok; 1531 try ok = connector.onDataChunk(flags, stream_id, data_nopad, pause); 1532 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 1533 1534 if (pause) { 1535 return cast(int)(pos - first); 1536 } 1537 1538 if (!ok) { 1539 return ErrorCode.CALLBACK_FAILURE; 1540 } 1541 1542 } 1543 } 1544 1545 if (iframe.payloadleft) { 1546 break; 1547 } 1548 1549 rv = processDataFrame(); 1550 if (isFatal(rv)) { 1551 return rv; 1552 } 1553 1554 iframe.reset(); 1555 1556 break; 1557 case IGN_DATA: 1558 LOGF("recv: [IGN_DATA]"); 1559 1560 readlen = iframe.readLength(pos, last); 1561 iframe.payloadleft -= readlen; 1562 pos += readlen; 1563 1564 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1565 1566 if (readlen > 0) { 1567 /* Update connection-level flow control window for ignored DATA frame too */ 1568 rv = updateRecvConnectionWindowSize(readlen); 1569 if (isFatal(rv)) { 1570 return rv; 1571 } 1572 1573 if (opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE) { 1574 1575 /* Ignored DATA is considered as "consumed" immediately. */ 1576 rv = updateConnectionConsumedSize(readlen); 1577 1578 if (isFatal(rv)) { 1579 return rv; 1580 } 1581 } 1582 } 1583 1584 if (iframe.payloadleft) { 1585 break; 1586 } 1587 1588 iframe.reset(); 1589 1590 break; 1591 case IGN_ALL: 1592 return cast(int)input.length; 1593 } 1594 1595 if (!busy && pos == last) { 1596 break; 1597 } 1598 1599 busy = false; 1600 } 1601 1602 assert(pos == last); 1603 1604 return cast(int)(pos - first); 1605 } 1606 1607 /** 1608 * Puts back previously deferred DATA frame in the stream |stream_id| 1609 * to the outbound queue. 1610 * 1611 * This function returns 0 if it succeeds, or one of the following 1612 * negative error codes: 1613 * 1614 * $(D ErrorCode.INVALID_ARGUMENT) 1615 * The stream does not exist; or no deferred data exist. 1616 */ 1617 ErrorCode resumeData(int stream_id) 1618 { 1619 Stream stream = getStream(stream_id); 1620 1621 if (!stream || !stream.isItemDeferred()) 1622 return ErrorCode.INVALID_ARGUMENT; 1623 1624 stream.resumeDeferredItem(StreamFlags.DEFERRED_USER, this); 1625 return ErrorCode.OK; 1626 } 1627 1628 /** 1629 * Returns true value if $(D Session) wants to receive data from the 1630 * remote peer. 1631 * 1632 * If both `wantRead()` and `wantWrite()` return false, the application should 1633 * drop the connection. 1634 */ 1635 bool wantRead() 1636 { 1637 size_t num_active_streams; 1638 1639 /* If this flag is set, we don't want to read. The application should drop the connection. */ 1640 if (goaway_flags & GoAwayFlags.TERM_SENT) { 1641 return false; 1642 } 1643 1644 num_active_streams = getNumActiveStreams(); 1645 1646 /* Unless termination GOAWAY is sent or received, we always want to read incoming frames. */ 1647 if (num_active_streams > 0) { 1648 return true; 1649 } 1650 1651 /* If there is no active streams and GOAWAY has been sent or received, we are done with this session. */ 1652 return (goaway_flags & (GoAwayFlags.SENT | GoAwayFlags.RECV)) == 0; 1653 } 1654 1655 /** 1656 * Returns true value if $(D Session) wants to send data to the remote 1657 * peer. 1658 * 1659 * If both `wantRead()` and `wantWrite()` return false, the application should 1660 * drop the connection. 1661 */ 1662 bool wantWrite() 1663 { 1664 /* If these flag is set, we don't want to write any data. The application should drop the connection. */ 1665 if (goaway_flags & GoAwayFlags.TERM_SENT) 1666 { 1667 return false; 1668 } 1669 1670 /* 1671 * Unless termination GOAWAY is sent or received, we want to write 1672 * frames if there is pending ones. If pending frame is request/push 1673 * response HEADERS and concurrent stream limit is reached, we don't 1674 * want to write them. 1675 */ 1676 1677 if (!aob.item && ob_pq.empty && 1678 (ob_da_pq.empty || remote_window_size == 0) && 1679 (ob_ss_pq.empty || isOutgoingConcurrentStreamsMax())) 1680 { 1681 return false; 1682 } 1683 1684 /* If there is no active streams and GOAWAY has been sent or received, we are done with this session. */ 1685 return (goaway_flags & (GoAwayFlags.SENT | GoAwayFlags.RECV)) == 0; 1686 } 1687 1688 /** 1689 * Returns stream_user_data for the stream |stream_id|. The 1690 * stream_user_data is provided by `submitRequest()`, 1691 * `submitHeaders()` or `setStreamUserData()`. 1692 * Unless it is set using `setStreamUserData()`, if the stream is 1693 * initiated by the remote endpoint, stream_user_data is always 1694 * `null`. If the stream does not exist, this function returns 1695 * `null`. 1696 */ 1697 void* getStreamUserData(int stream_id) { 1698 Stream stream = getStream(stream_id); 1699 if (stream) { 1700 return stream.userData; 1701 } else { 1702 return null; 1703 } 1704 } 1705 1706 /** 1707 * Sets the |stream_user_data| to the stream denoted by the 1708 * |stream_id|. If a stream user data is already set to the stream, 1709 * it is replaced with the |stream_user_data|. It is valid to specify 1710 * `null` in the |stream_user_data|, which nullifies the associated 1711 * data pointer. 1712 * 1713 * It is valid to set the |stream_user_data| to the stream reserved by 1714 * PUSH_PROMISE frame. 1715 * 1716 * This function returns 0 if it succeeds, or one of following 1717 * negative error codes: 1718 * 1719 * $(D ErrorCode.INVALID_ARGUMENT) 1720 * The stream does not exist 1721 */ 1722 ErrorCode setStreamUserData(int stream_id, void* stream_user_data){ 1723 Stream stream = getStream(stream_id); 1724 if (!stream) 1725 return ErrorCode.INVALID_ARGUMENT; 1726 stream.userData = stream_user_data; 1727 return ErrorCode.OK; 1728 } 1729 1730 /** 1731 * Returns the number of frames in the outbound queue. This does not 1732 * include the deferred DATA frames. 1733 */ 1734 size_t getOutboundQueueSize() { 1735 return ob_pq.length + ob_ss_pq.length + ob_da_pq.length; 1736 } 1737 1738 /** 1739 * Returns the number of DATA payload in bytes received without 1740 * WINDOW_UPDATE transmission for the stream |stream_id|. The local 1741 * (receive) window size can be adjusted by 1742 * $(D submitWindowUpdate). This function takes into account 1743 * that and returns effective data length. In particular, if the 1744 * local window size is reduced by submitting negative 1745 * window_size_increment with $(D submitWindowUpdate), this 1746 * function returns the number of bytes less than actually received. 1747 * 1748 * This function returns -1 if it fails. 1749 */ 1750 int getStreamEffectiveRecvDataLength(int stream_id) 1751 { 1752 Stream stream = getStream(stream_id); 1753 if (!stream) 1754 return -1; 1755 return stream.recvWindowSize < 0 ? 0 : stream.recvWindowSize; 1756 } 1757 1758 /** 1759 * Returns the local (receive) window size for the stream |stream_id|. 1760 * The local window size can be adjusted by 1761 * $(D submitWindowUpdate). This function takes into account 1762 * that and returns effective window size. 1763 * 1764 * This function returns -1 if it fails. 1765 */ 1766 int getStreamEffectiveLocalWindowSize(int stream_id) 1767 { 1768 Stream stream = getStream(stream_id); 1769 if (!stream) 1770 return -1; 1771 return stream.localWindowSize; 1772 } 1773 1774 /** 1775 * Returns the number of DATA payload in bytes received without 1776 * WINDOW_UPDATE transmission for a connection. The local (receive) 1777 * window size can be adjusted by $(D submitWindowUpdate). 1778 * This function takes into account that and returns effective data 1779 * length. In particular, if the local window size is reduced by 1780 * submitting negative window_size_increment with 1781 * $(D submitWindowUpdate), this function returns the number 1782 * of bytes less than actually received. 1783 * 1784 * This function returns -1 if it fails. 1785 */ 1786 int getEffectiveRecvDataLength() 1787 { 1788 return recv_window_size < 0 ? 0 : recv_window_size; 1789 } 1790 1791 /** 1792 * Returns the local (receive) window size for a connection. The 1793 * local window size can be adjusted by 1794 * $(D submitWindowUpdate). This function takes into account 1795 * that and returns effective window size. 1796 * 1797 * This function returns -1 if it fails. 1798 */ 1799 int getEffectiveLocalWindowSize() 1800 { 1801 return local_window_size; 1802 } 1803 1804 /** 1805 * Returns the remote window size for a given stream |stream_id|. 1806 * 1807 * This is the amount of flow-controlled payload (e.g., DATA) that the 1808 * local endpoint can send without stream level WINDOW_UPDATE. There 1809 * is also connection level flow control, so the effective size of 1810 * payload that the local endpoint can actually send is 1811 * min(getStreamRemoteWindowSize(), getRemoteWindowSize()). 1812 * 1813 * This function returns -1 if it fails. 1814 */ 1815 int getStreamRemoteWindowSize(int stream_id) 1816 { 1817 Stream stream = getStream(stream_id); 1818 if (!stream) 1819 return -1; 1820 1821 /* stream.remoteWindowSize can be negative when Setting.INITIAL_WINDOW_SIZE is changed. */ 1822 return max(0, stream.remoteWindowSize); 1823 } 1824 1825 /** 1826 * Returns the remote window size for a connection. 1827 * 1828 * This function always succeeds. 1829 */ 1830 int getRemoteWindowSize() { 1831 return remote_window_size; 1832 } 1833 1834 /** 1835 * Returns 1 if local peer half closed the given stream |stream_id|. 1836 * Returns 0 if it did not. Returns -1 if no such stream exists. 1837 */ 1838 int getStreamLocalClose(int stream_id) 1839 { 1840 Stream stream = getStream(stream_id); 1841 1842 if (!stream) 1843 return -1; 1844 1845 return (stream.shutFlags & ShutdownFlag.WR) != 0; 1846 } 1847 1848 /** 1849 * Returns 1 if remote peer half closed the given stream |stream_id|. 1850 * Returns 0 if it did not. Returns -1 if no such stream exists. 1851 */ 1852 int getStreamRemoteClose(int stream_id) 1853 { 1854 Stream stream = getStream(stream_id); 1855 1856 if (!stream) 1857 return -1; 1858 1859 return (stream.shutFlags & ShutdownFlag.RD) != 0; 1860 } 1861 1862 /** 1863 * Signals the session so that the connection should be terminated. 1864 * 1865 * The last stream ID is the minimum value between the stream ID of a 1866 * stream for which $(D Connector.onFrame) was called 1867 * most recently and the last stream ID we have sent to the peer 1868 * previously. 1869 * 1870 * The |error_code| is the error code of this GOAWAY frame. The 1871 * pre-defined error code is one of $(D FrameError). 1872 * 1873 * After the transmission, both `wantRead()` and 1874 * `wantWrite()` return 0. 1875 * 1876 * This function should be called when the connection should be 1877 * terminated after sending GOAWAY. If the remaining streams should 1878 * be processed after GOAWAY, use `submitGoAway()` instead. 1879 */ 1880 ErrorCode terminateSession(FrameError error_code) 1881 { 1882 return terminateSession(last_proc_stream_id, error_code, null); 1883 } 1884 1885 1886 /** 1887 * Signals the session so that the connection should be terminated. 1888 * 1889 * This function behaves like $(D Session.terminateSession), 1890 * but the last stream ID can be specified by the application for fine 1891 * grained control of stream. The HTTP/2 specification does not allow 1892 * last_stream_id to be increased. So the actual value sent as 1893 * last_stream_id is the minimum value between the given 1894 * |last_stream_id| and the last_stream_id we have previously sent to 1895 * the peer. 1896 * 1897 * The |last_stream_id| is peer's stream ID or 0. So if $(D Session) is 1898 * initialized as client, |last_stream_id| must be even or 0. If 1899 * $(D Session) is initialized as server, |last_stream_id| must be odd or 1900 * 0. 1901 * 1902 * This function returns 0 if it succeeds, or one of the following 1903 * negative error codes: 1904 * 1905 * $(D ErrorCode.INVALID_ARGUMENT) 1906 * The |last_stream_id| is invalid. 1907 */ 1908 ErrorCode terminateSession(int last_stream_id, FrameError error_code) { 1909 return terminateSession(last_stream_id, error_code, null); 1910 } 1911 1912 /** 1913 * Returns the value of SETTINGS |id| notified by a remote endpoint. 1914 * The |id| must be one of values defined in $(D SettingsID). 1915 */ 1916 uint getRemoteSettings(SettingsID id) { 1917 switch (id) { 1918 case Setting.HEADER_TABLE_SIZE: 1919 return remote_settings.header_table_size; 1920 case Setting.ENABLE_PUSH: 1921 return remote_settings.enable_push; 1922 case Setting.MAX_CONCURRENT_STREAMS: 1923 return remote_settings.max_concurrent_streams; 1924 case Setting.INITIAL_WINDOW_SIZE: 1925 return remote_settings.initial_window_size; 1926 case Setting.MAX_FRAME_SIZE: 1927 return remote_settings.max_frame_size; 1928 case Setting.MAX_HEADER_LIST_SIZE: 1929 return remote_settings.max_header_list_size; 1930 default: return -1; 1931 } 1932 } 1933 1934 /** 1935 * Tells the $(D Session) that next stream ID is |next_stream_id|. The 1936 * |next_stream_id| must be equal or greater than the value returned 1937 * by `getNextStreamID()`. 1938 * 1939 * This function returns 0 if it succeeds, or one of the following 1940 * negative error codes: 1941 * 1942 * $(D ErrorCode.INVALID_ARGUMENT) 1943 * The |next_stream_id| is strictly less than the value 1944 * `getNextStreamID()` returns. 1945 */ 1946 ErrorCode setNextStreamID(int _next_stream_id) 1947 { 1948 if (_next_stream_id < 0 || next_stream_id > cast(uint)next_stream_id) { 1949 return ErrorCode.INVALID_ARGUMENT; 1950 } 1951 1952 next_stream_id = _next_stream_id; 1953 1954 return ErrorCode.OK; 1955 } 1956 1957 /** 1958 * Returns the next outgoing stream ID. Notice that return type is 1959 * uint. If we run out of stream ID for this session, this 1960 * function returns 1 << 31. 1961 */ 1962 uint getNextStreamID() 1963 { 1964 return next_stream_id; 1965 } 1966 1967 /** 1968 * Tells the $(D Session) that |size| bytes for a stream denoted by 1969 * |stream_id| were consumed by application and are ready to 1970 * WINDOW_UPDATE. The consumed bytes are counted towards both connection and 1971 * stream level WINDOW_UPDATE (see `consumeConnection` and `consumeStream`). 1972 * This function is intended to be used without 1973 * automatic window update (see $(D Options.setNoAutoWindowUpdate). 1974 * 1975 * This function returns 0 if it succeeds, or one of the following 1976 * negative error codes: 1977 * 1978 * $(D ErrorCode.INVALID_ARGUMENT) 1979 * The |stream_id| is 0. 1980 * $(D ErrorCode.INVALID_STATE) 1981 * Automatic WINDOW_UPDATE is not disabled. 1982 */ 1983 ErrorCode consume(int stream_id, size_t size) { 1984 ErrorCode rv; 1985 Stream stream; 1986 1987 if (stream_id == 0) { 1988 return ErrorCode.INVALID_ARGUMENT; 1989 } 1990 1991 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) { 1992 return ErrorCode.INVALID_STATE; 1993 } 1994 1995 rv = updateConnectionConsumedSize(size); 1996 1997 if (isFatal(rv)) { 1998 return rv; 1999 } 2000 2001 stream = getStream(stream_id); 2002 2003 if (!stream) 2004 return ErrorCode.OK; 2005 2006 rv = updateStreamConsumedSize(stream, size); 2007 2008 if (isFatal(rv)) { 2009 return rv; 2010 } 2011 2012 return ErrorCode.OK; 2013 } 2014 2015 /** 2016 * Like $(D consume), but this only tells library that 2017 * |size| bytes were consumed only for connection level. Note that 2018 * HTTP/2 maintains connection and stream level flow control windows 2019 * independently. 2020 * 2021 * This function returns 0 if it succeeds, or one of the following 2022 * negative error codes: 2023 * 2024 * $(D ErrorCode.INVALID_STATE) 2025 * Automatic WINDOW_UPDATE is not disabled. 2026 */ 2027 ErrorCode consumeConnection(size_t size) 2028 { 2029 ErrorCode rv; 2030 2031 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) 2032 return ErrorCode.INVALID_STATE; 2033 rv = updateConnectionConsumedSize(size); 2034 if (isFatal(rv)) 2035 return rv; 2036 return ErrorCode.OK; 2037 } 2038 2039 /** 2040 * @function 2041 * 2042 * Like $(D consume), but this only tells library that 2043 * |size| bytes were consumed only for stream denoted by |stream_id|. 2044 * Note that HTTP/2 maintains connection and stream level flow control 2045 * windows independently. 2046 * 2047 * This function returns 0 if it succeeds, or one of the following 2048 * negative error codes: 2049 * 2050 * $(D ErrorCode.INVALID_ARGUMENT) 2051 * The |stream_id| is 0. 2052 * $(D ErrorCode.INVALID_STATE) 2053 * Automatic WINDOW_UPDATE is not disabled. 2054 */ 2055 ErrorCode consumeStream(int stream_id, size_t size) 2056 { 2057 ErrorCode rv; 2058 Stream stream; 2059 2060 if (stream_id == 0) 2061 return ErrorCode.INVALID_ARGUMENT; 2062 2063 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) 2064 return ErrorCode.INVALID_STATE; 2065 2066 stream = getStream(stream_id); 2067 2068 if (!stream) 2069 return ErrorCode.OK; 2070 2071 rv = updateStreamConsumedSize(stream, size); 2072 2073 if (isFatal(rv)) { 2074 return rv; 2075 } 2076 2077 return ErrorCode.OK; 2078 2079 } 2080 2081 /** 2082 * Performs post-process of HTTP Upgrade request. This function can 2083 * be called from both client and server, but the behavior is very 2084 * different in each other. 2085 * 2086 * If called from client side, the |settings_payload| must be the 2087 * value sent in `HTTP2-Settings` header field and must be decoded 2088 * by base64url decoder. The |settings_payload| is unpacked and its 2089 * setting values will be submitted using $(D submitSettings). 2090 * This means that the client application code does not need to submit 2091 * SETTINGS by itself. The stream with stream ID=1 is opened and the 2092 * |stream_user_data| is used for its stream_user_data. The opened 2093 * stream becomes half-closed (local) state. 2094 * 2095 * If called from server side, the |settings_payload| must be the 2096 * value received in `HTTP2-Settings` header field and must be 2097 * decoded by base64url decoder. It is treated as if the SETTINGS 2098 * frame with that payload is received. Thus, callback functions for 2099 * the reception of SETTINGS frame will be invoked. The stream with 2100 * stream ID=1 is opened. The |stream_user_data| is ignored. The 2101 * opened stream becomes half-closed (remote). 2102 * 2103 * This function returns 0 if it succeeds, or one of the following 2104 * negative error codes: 2105 * 2106 * $(D ErrorCode.INVALID_ARGUMENT) 2107 * The |settings_payload| is badly formed. 2108 * $(D ErrorCode.PROTO) 2109 * The stream ID 1 is already used or closed; or is not available. 2110 */ 2111 ErrorCode upgrade(in ubyte[] settings_payload, void* stream_user_data = null) 2112 { 2113 Stream stream; 2114 Frame frame; 2115 Setting[] iva; 2116 ErrorCode rv; 2117 PrioritySpec pri_spec; 2118 2119 if ((!is_server && next_stream_id != 1) || (is_server && last_recv_stream_id >= 1)) { 2120 return ErrorCode.PROTO; 2121 } 2122 2123 if (settings_payload.length % FRAME_SETTINGS_ENTRY_LENGTH) { 2124 return ErrorCode.INVALID_ARGUMENT; 2125 } 2126 2127 Settings.unpack(iva, settings_payload); 2128 2129 if (is_server) { 2130 frame.hd = FrameHeader(cast(uint)settings_payload.length, FrameType.SETTINGS, FrameFlags.NONE, 0); 2131 frame.settings.iva = iva; 2132 rv = onSettings(frame, true /* No ACK */); 2133 } else { 2134 rv = submitSettings(this, iva); 2135 } 2136 2137 if (rv != 0) 2138 return rv; 2139 2140 if (iva) Mem.free(iva); 2141 2142 stream = openStream(1, StreamFlags.NONE, pri_spec, StreamState.OPENING, stream_user_data); 2143 2144 if (is_server) 2145 { 2146 stream.shutdown(ShutdownFlag.RD); 2147 last_recv_stream_id = 1; 2148 last_proc_stream_id = 1; 2149 } else { 2150 stream.shutdown(ShutdownFlag.WR); 2151 next_stream_id += 2; 2152 } 2153 2154 return ErrorCode.OK; 2155 } 2156 2157 /* 2158 * Returns true if |stream_id| is initiated by local endpoint. 2159 */ 2160 bool isMyStreamId(int stream_id) 2161 { 2162 int rem; 2163 if (stream_id == 0) { 2164 return false; 2165 } 2166 rem = stream_id & 0x1; 2167 if (is_server) { 2168 return rem == 0; 2169 } 2170 return rem == 1; 2171 } 2172 2173 /* 2174 * Adds |item| to the outbound queue in $(D Session). When this function 2175 * succeeds, it takes ownership of |item|. So caller must not free it 2176 * on success. 2177 * 2178 * This function returns 0 if it succeeds, or one of the following 2179 * negative error codes: 2180 * 2181 * ErrorCode.STREAM_CLOSED 2182 * Stream already closed (DATA frame only) 2183 * 2184 * ErrorCode.DATA_EXIST 2185 */ 2186 ErrorCode addItem(OutboundItem item) 2187 { 2188 /* TODO: Return error if stream is not found for the frame requiring stream presence. */ 2189 Stream stream = getStream(item.frame.hd.stream_id); 2190 Frame* frame = &item.frame; 2191 2192 if (frame.hd.type != FrameType.DATA) { 2193 switch (frame.hd.type) { 2194 case FrameType.RST_STREAM: 2195 if (stream) 2196 stream.state = StreamState.CLOSING; 2197 break; 2198 case FrameType.SETTINGS: 2199 item.weight = OB_SETTINGS_WEIGHT; 2200 break; 2201 case FrameType.PING: 2202 /* Ping has highest priority. */ 2203 item.weight = OB_PING_WEIGHT; 2204 break; 2205 default: 2206 break; 2207 } 2208 2209 if (frame.hd.type == FrameType.HEADERS) { 2210 /* We push request HEADERS and push response HEADERS to 2211 dedicated queue because their transmission is affected by 2212 Setting.MAX_CONCURRENT_STREAMS */ 2213 /* TODO: If 2 HEADERS are submitted for reserved stream, then 2214 both of them are queued into ob_ss_pq, which is not 2215 desirable. */ 2216 if (frame.headers.cat == HeadersCategory.REQUEST) { 2217 ob_ss_pq.push(item); 2218 item.queued = 1; 2219 } else if (stream && (stream.state == StreamState.RESERVED || item.aux_data.headers.attach_stream)) { 2220 item.weight = stream.effectiveWeight; 2221 item.cycle = last_cycle; 2222 stream.attachItem(item, this); 2223 } else { 2224 ob_pq.push(item); 2225 item.queued = 1; 2226 } 2227 } else { 2228 ob_pq.push(item); 2229 item.queued = 1; 2230 } 2231 2232 return ErrorCode.OK; 2233 } 2234 2235 if (!stream) { 2236 return ErrorCode.STREAM_CLOSED; 2237 } 2238 2239 if (stream.item) { 2240 return ErrorCode.DATA_EXIST; 2241 } 2242 2243 item.weight = stream.effectiveWeight; 2244 item.cycle = last_cycle; 2245 2246 stream.attachItem(item, this); 2247 2248 return ErrorCode.OK; 2249 } 2250 2251 /* 2252 * Adds RST_STREAM frame for the stream |stream_id| with the error 2253 * code |error_code|. This is a convenient function built on top of 2254 * $(D Session.addFrame) to add RST_STREAM easily. 2255 * 2256 * This function simply returns without adding RST_STREAM frame if 2257 * given stream is in StreamState.CLOSING state, because multiple 2258 * RST_STREAM for a stream is redundant. 2259 */ 2260 void addRstStream(int stream_id, FrameError error_code) 2261 { 2262 ErrorCode rv; 2263 OutboundItem item; 2264 Frame* frame; 2265 Stream stream; 2266 2267 stream = getStream(stream_id); 2268 if (stream && stream.state == StreamState.CLOSING) 2269 return; 2270 2271 /* Cancel pending request HEADERS in ob_ss_pq if this RST_STREAM refers to that stream. */ 2272 if (!is_server && isMyStreamId(stream_id) && ob_ss_pq.top) 2273 { 2274 OutboundItem top; 2275 Frame* headers_frame; 2276 2277 top = ob_ss_pq.top; 2278 headers_frame = &top.frame; 2279 2280 assert(headers_frame.hd.type == FrameType.HEADERS); 2281 2282 if (headers_frame.hd.stream_id <= stream_id && cast(uint)stream_id < next_stream_id) 2283 { 2284 foreach (OutboundItem item; ob_ss_pq) { 2285 2286 HeadersAuxData* aux_data = &item.aux_data.headers; 2287 2288 if (item.frame.hd.stream_id != stream_id || aux_data.canceled) 2289 { 2290 continue; 2291 } 2292 2293 aux_data.error_code = error_code; 2294 aux_data.canceled = 1; 2295 return; 2296 } 2297 } 2298 } 2299 2300 item = Mem.alloc!OutboundItem(this); 2301 frame = &item.frame; 2302 2303 frame.rst_stream = RstStream(stream_id, error_code); 2304 addItem(item); 2305 } 2306 2307 /* 2308 * Adds PING frame. This is a convenient functin built on top of 2309 * $(D Session.addFrame) to add PING easily. 2310 * 2311 * If the |opaque_data| is not null, it must point to 8 bytes memory 2312 * region of data. The data pointed by |opaque_data| is copied. It can 2313 * be null. In this case, 8 bytes null is used. 2314 * 2315 */ 2316 void addPing(FrameFlags flags, in ubyte[] opaque_data) 2317 { 2318 ErrorCode rv; 2319 OutboundItem item; 2320 Frame* frame; 2321 2322 item = Mem.alloc!OutboundItem(this); 2323 2324 frame = &item.frame; 2325 2326 frame.ping = Ping(flags, opaque_data); 2327 2328 addItem(item); 2329 } 2330 2331 /* 2332 * Adds GOAWAY frame with the last-stream-ID |last_stream_id| and the 2333 * error code |error_code|. This is a convenient function built on top 2334 * of $(D Session.addFrame) to add GOAWAY easily. The 2335 * |aux_flags| are bitwise-OR of one or more of 2336 * GoAwayAuxFlags. 2337 * 2338 * This function returns 0 if it succeeds, or one of the following 2339 * negative error codes: 2340 * 2341 * ErrorCode.INVALID_ARGUMENT 2342 * The |opaque_data_len| is too large. 2343 */ 2344 ErrorCode addGoAway(int last_stream_id, FrameError error_code, in string opaque_data, GoAwayAuxFlags aux_flags) { 2345 ErrorCode rv; 2346 OutboundItem item; 2347 Frame* frame; 2348 string opaque_data_copy; 2349 GoAwayAuxData* aux_data; 2350 2351 if (isMyStreamId(last_stream_id)) { 2352 return ErrorCode.INVALID_ARGUMENT; 2353 } 2354 2355 if (opaque_data.length > 0) { 2356 if (opaque_data.length + 8 > MAX_PAYLOADLEN) { 2357 return ErrorCode.INVALID_ARGUMENT; 2358 } 2359 opaque_data_copy = cast(string)Mem.copy(opaque_data); 2360 } 2361 2362 item = Mem.alloc!OutboundItem(this); 2363 2364 frame = &item.frame; 2365 2366 /* last_stream_id must not be increased from the value previously sent */ 2367 last_stream_id = min(last_stream_id, local_last_stream_id); 2368 2369 frame.goaway = GoAway(last_stream_id, error_code, opaque_data_copy); 2370 2371 aux_data = &item.aux_data.goaway; 2372 aux_data.flags = aux_flags; 2373 2374 addItem(item); 2375 return ErrorCode.OK; 2376 } 2377 /* 2378 * Adds WINDOW_UPDATE frame with stream ID |stream_id| and 2379 * window-size-increment |window_size_increment|. This is a convenient 2380 * function built on top of $(D Session.addFrame) to add 2381 * WINDOW_UPDATE easily. 2382 */ 2383 void addWindowUpdate(FrameFlags flags, int stream_id, int window_size_increment) { 2384 ErrorCode rv; 2385 OutboundItem item; 2386 Frame* frame; 2387 2388 item = Mem.alloc!OutboundItem(this); 2389 frame = &item.frame; 2390 2391 frame.window_update = WindowUpdate(flags, stream_id, window_size_increment); 2392 2393 addItem(item); 2394 } 2395 2396 /* 2397 * Adds SETTINGS frame. 2398 */ 2399 ErrorCode addSettings(FrameFlags flags, in Setting[] iva) 2400 { 2401 OutboundItem item; 2402 Frame* frame; 2403 Setting[] iva_copy; 2404 size_t i; 2405 2406 if (flags & FrameFlags.ACK) { 2407 if (iva.length != 0) { 2408 return ErrorCode.INVALID_ARGUMENT; 2409 } 2410 } 2411 else if (inflight_iva.length != 0) 2412 return ErrorCode.TOO_MANY_INFLIGHT_SETTINGS; 2413 2414 if (!iva.check()) 2415 return ErrorCode.INVALID_ARGUMENT; 2416 2417 item = Mem.alloc!OutboundItem(this); 2418 scope(failure) Mem.free(item); 2419 2420 if (iva.length > 0) 2421 iva_copy = iva.copy(); 2422 else 2423 iva_copy = null; 2424 2425 scope(failure) if(iva_copy) Mem.free(iva_copy); 2426 2427 if ((flags & FrameFlags.ACK) == 0) { 2428 if (iva.length > 0) 2429 inflight_iva = iva.copy(); 2430 else 2431 inflight_iva = null; 2432 2433 } 2434 2435 frame = &item.frame; 2436 2437 frame.settings = Settings(flags, iva_copy); 2438 2439 addItem(item); 2440 2441 /* Extract Setting.MAX_CONCURRENT_STREAMS and ENABLE_PUSH here. We use it to refuse the 2442 * incoming stream and PUSH_PROMISE with RST_STREAM. */ 2443 foreach_reverse(ref iv; iva) 2444 { 2445 if (iv.id == Setting.MAX_CONCURRENT_STREAMS) { 2446 pending_local_max_concurrent_stream = iv.value; 2447 break; 2448 } 2449 } 2450 foreach_reverse(ref iv; iva) 2451 { 2452 if (iv.id == Setting.ENABLE_PUSH) { 2453 pending_enable_push = iv.value>0; 2454 break; 2455 } 2456 } 2457 2458 return ErrorCode.OK; 2459 } 2460 2461 /** 2462 * Creates new stream in $(D Session) with stream ID |stream_id|, 2463 * priority |pri_spec| and flags |flags|. The |flags| is bitwise OR 2464 * of StreamFlags. Since this function is called when initial 2465 * HEADERS is sent or received, these flags are taken from it. The 2466 * state of stream is set to |initial_state|. The |stream_user_data| 2467 * is a pointer to the arbitrary user supplied data to be associated 2468 * to this stream. 2469 * 2470 * If |initial_state| is StreamState.RESERVED, this function sets the 2471 * StreamFlags.PUSH flag. 2472 * 2473 * This function returns a pointer to created new stream object. 2474 */ 2475 Stream openStream(int stream_id, StreamFlags flags, PrioritySpec pri_spec_in, StreamState initial_state, void *stream_user_data = null) 2476 { 2477 ErrorCode rv; 2478 Stream stream; 2479 Stream dep_stream; 2480 Stream root_stream; 2481 bool stream_alloc; 2482 PrioritySpec pri_spec_default; 2483 PrioritySpec pri_spec = pri_spec_in; 2484 stream = getStreamRaw(stream_id); 2485 2486 if (stream) { 2487 assert(stream.state == StreamState.IDLE); 2488 assert(stream.inDepTree()); 2489 detachIdleStream(stream); 2490 stream.remove(); 2491 } else { 2492 if (is_server && initial_state != StreamState.IDLE && !isMyStreamId(stream_id)) 2493 adjustClosedStream(1); 2494 stream_alloc = true; 2495 } 2496 2497 if (pri_spec.stream_id != 0) { 2498 dep_stream = getStreamRaw(pri_spec.stream_id); 2499 if (is_server && !dep_stream && idleStreamDetect(pri_spec.stream_id)) 2500 { 2501 /* Depends on idle stream, which does not exist in memory. Assign default priority for it. */ 2502 dep_stream = openStream(pri_spec.stream_id, StreamFlags.NONE, pri_spec_default, StreamState.IDLE, null); 2503 } else if (!dep_stream || !dep_stream.inDepTree()) { 2504 /* If dep_stream is not part of dependency tree, stream will get default priority. */ 2505 pri_spec = pri_spec_default; 2506 } 2507 } 2508 2509 if (initial_state == StreamState.RESERVED) 2510 flags |= StreamFlags.PUSH; 2511 2512 if (stream_alloc) 2513 stream = Mem.alloc!Stream(stream_id, flags, initial_state, pri_spec.weight, roots, 2514 remote_settings.initial_window_size, local_settings.initial_window_size, stream_user_data); 2515 else 2516 stream.initialize(stream_id, flags, initial_state, pri_spec.weight, roots, 2517 remote_settings.initial_window_size, local_settings.initial_window_size, stream_user_data); 2518 scope(failure) if (stream_alloc) Mem.free(stream); 2519 2520 if (stream_alloc) 2521 streams[stream_id] = stream; 2522 scope(failure) if (stream_alloc) streams.remove(stream_id); 2523 2524 switch (initial_state) { 2525 case StreamState.RESERVED: 2526 if (isMyStreamId(stream_id)) { 2527 /* half closed (remote) */ 2528 stream.shutdown(ShutdownFlag.RD); 2529 } else { 2530 /* half closed (local) */ 2531 stream.shutdown(ShutdownFlag.WR); 2532 } 2533 /* Reserved stream does not count in the concurrent streams limit. That is one of the DOS vector. */ 2534 break; 2535 case StreamState.IDLE: 2536 /* Idle stream does not count toward the concurrent streams limit. This is used as anchor node in dependency tree. */ 2537 assert(is_server); 2538 keepIdleStream(stream); 2539 break; 2540 default: 2541 if (isMyStreamId(stream_id)) { 2542 ++num_outgoing_streams; 2543 } else { 2544 ++num_incoming_streams; 2545 } 2546 } 2547 2548 /* We don't have to track dependency of received reserved stream */ 2549 if (stream.shutFlags & ShutdownFlag.WR) 2550 return stream; 2551 2552 if (pri_spec.stream_id == 0) 2553 { 2554 2555 ++roots.num_streams; 2556 2557 if (pri_spec.exclusive && roots.num_streams <= MAX_DEP_TREE_LENGTH) 2558 stream.makeTopmostRoot(this); 2559 else 2560 roots.add(stream); 2561 2562 return stream; 2563 } 2564 2565 2566 /* TODO Client does not have to track dependencies of streams except 2567 for those which have upload data. Currently, we just track 2568 everything. */ 2569 assert(dep_stream); 2570 2571 root_stream = dep_stream.getRoot(); 2572 2573 if (root_stream.subStreams < MAX_DEP_TREE_LENGTH) { 2574 if (pri_spec.exclusive) { 2575 dep_stream.insert(stream); 2576 } else { 2577 dep_stream.add(stream); 2578 } 2579 } else { 2580 stream.weight = DEFAULT_WEIGHT; 2581 roots.add(stream); 2582 } 2583 2584 return stream; 2585 } 2586 2587 /* 2588 * Closes stream whose stream ID is |stream_id|. The reason of closure 2589 * is indicated by the |error_code|. When closing the stream, 2590 * $(D Connector.onStreamExit) will be called. 2591 * 2592 * If the session is initialized as server and |stream| is incoming 2593 * stream, stream is just marked closed and this function calls 2594 * keepClosedStream() with |stream|. Otherwise, 2595 * |stream| will be deleted from memory. 2596 * 2597 * This function returns 0 if it succeeds, or one the following 2598 * negative error codes: 2599 * 2600 * ErrorCode.INVALID_ARGUMENT 2601 * The specified stream does not exist. 2602 * ErrorCode.CALLBACK_FAILURE 2603 * The callback function failed. 2604 */ 2605 ErrorCode closeStream(int stream_id, FrameError error_code) 2606 { 2607 Stream stream = getStream(stream_id); 2608 2609 if (!stream) { 2610 return ErrorCode.INVALID_ARGUMENT; 2611 } 2612 2613 LOGF("stream: stream(%s=%d close", stream, stream.id); 2614 2615 if (stream.item) { 2616 OutboundItem item = stream.item; 2617 2618 stream.detachItem(this); 2619 2620 /* If item is queued, it will be deleted when it is popped 2621 (prepareFrame() will fail). If aob.item 2622 points to this item, let active_outbound_item_reset() 2623 free the item. */ 2624 if (!item.queued && item != aob.item) { 2625 item.free(); 2626 Mem.free(item); 2627 } 2628 } 2629 2630 /* We call $(D Connector.onStreamExit) even if stream.state is 2631 StreamState.INITIAL. This will happen while sending request 2632 HEADERS, a local endpoint receives RST_STREAM for that stream. It 2633 may be PROTOCOL_ERROR, but without notifying stream closure will 2634 hang the stream in a local endpoint. 2635 */ 2636 try 2637 if (!connector.onStreamExit(stream_id, error_code)) 2638 return ErrorCode.CALLBACK_FAILURE; 2639 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 2640 2641 /* pushed streams which is not opened yet is not counted toward max concurrent limits */ 2642 if ((stream.flags & StreamFlags.PUSH) == 0) { 2643 if (isMyStreamId(stream_id)) { 2644 --num_outgoing_streams; 2645 } else { 2646 --num_incoming_streams; 2647 } 2648 } 2649 2650 /* Closes both directions just in case they are not closed yet */ 2651 stream.flags = cast(StreamFlags)(stream.flags | StreamFlags.CLOSED); 2652 2653 if (is_server && stream.inDepTree()) 2654 { 2655 /* On server side, retain stream at most MAX_CONCURRENT_STREAMS 2656 combined with the current active incoming streams to make 2657 dependency tree work better. */ 2658 keepClosedStream(stream); 2659 } else { 2660 destroyStream(stream); 2661 } 2662 return ErrorCode.OK; 2663 } 2664 2665 /* 2666 * Deletes |stream| from memory. After this function returns, stream 2667 * cannot be accessed. 2668 * 2669 */ 2670 void destroyStream(Stream stream) 2671 { 2672 LOGF("stream: destroy closed stream(%s=%d", stream, stream.id); 2673 2674 stream.remove(); 2675 2676 streams.remove(stream.id); 2677 stream.free(); 2678 Mem.free(stream); 2679 } 2680 2681 /* 2682 * Tries to keep incoming closed stream |stream|. Due to the 2683 * limitation of maximum number of streams in memory, |stream| is not 2684 * closed and just deleted from memory (see destroyStream). 2685 */ 2686 void keepClosedStream(Stream stream) 2687 { 2688 LOGF("stream: keep closed stream(%s=%d, state=%d", stream, stream.id, stream.state); 2689 2690 if (closed_stream_tail) { 2691 closed_stream_tail.closedNext = stream; 2692 stream.closedPrev = closed_stream_tail; 2693 } else { 2694 closed_stream_head = stream; 2695 } 2696 closed_stream_tail = stream; 2697 2698 ++num_closed_streams; 2699 2700 adjustClosedStream(0); 2701 } 2702 2703 /* 2704 * Appends |stream| to linked list |session.idle_stream_head|. We 2705 * apply fixed limit for list size. To fit into that limit, one or 2706 * more oldest streams are removed from list as necessary. 2707 */ 2708 void keepIdleStream(Stream stream) 2709 { 2710 LOGF("stream: keep idle stream(%s=%d, state=%d", stream, stream.id, stream.state); 2711 2712 if (idle_stream_tail) { 2713 idle_stream_tail.closedNext = stream; 2714 stream.closedPrev = idle_stream_tail; 2715 } else { 2716 idle_stream_head = stream; 2717 } 2718 idle_stream_tail = stream; 2719 2720 ++num_idle_streams; 2721 2722 adjustIdleStream(); 2723 } 2724 2725 /* 2726 * Detaches |stream| from idle streams linked list. 2727 */ 2728 2729 void detachIdleStream(Stream stream) 2730 { 2731 Stream prev_stream; 2732 Stream next_stream; 2733 2734 LOGF("stream: detach idle stream(%s=%d, state=%d", stream, stream.id, stream.state); 2735 2736 prev_stream = stream.closedPrev; 2737 next_stream = stream.closedNext; 2738 2739 if (prev_stream) { 2740 prev_stream.closedNext = next_stream; 2741 } else { 2742 idle_stream_head = next_stream; 2743 } 2744 2745 if (next_stream) { 2746 next_stream.closedPrev = prev_stream; 2747 } else { 2748 idle_stream_tail = prev_stream; 2749 } 2750 2751 stream.closedPrev = null; 2752 stream.closedNext = null; 2753 2754 --num_idle_streams; 2755 } 2756 2757 /* 2758 * Deletes closed stream to ensure that number of incoming streams 2759 * including active and closed is in the maximum number of allowed 2760 * stream. If |offset| is nonzero, it is decreased from the maximum 2761 * number of allowed stream when comparing number of active and closed 2762 * stream and the maximum number. 2763 */ 2764 void adjustClosedStream(int offset) 2765 { 2766 size_t num_stream_max; 2767 2768 num_stream_max = min(local_settings.max_concurrent_streams, pending_local_max_concurrent_stream); 2769 2770 LOGF("stream: adjusting kept closed streams num_closed_streams=%d, num_incoming_streams=%d, max_concurrent_streams=%d", 2771 num_closed_streams, num_incoming_streams, num_stream_max); 2772 2773 while (num_closed_streams > 0 && num_closed_streams + num_incoming_streams + offset > num_stream_max) 2774 { 2775 Stream head_stream; 2776 2777 head_stream = closed_stream_head; 2778 2779 assert(head_stream); 2780 2781 closed_stream_head = head_stream.closedNext; 2782 2783 if (closed_stream_head) 2784 closed_stream_head.closedPrev = null; 2785 else 2786 closed_stream_tail = null; 2787 2788 destroyStream(head_stream); 2789 /* head_stream is now freed */ 2790 --num_closed_streams; 2791 } 2792 } 2793 2794 /* 2795 * Deletes idle stream to ensure that number of idle streams is in 2796 * certain limit. 2797 */ 2798 void adjustIdleStream() 2799 { 2800 size_t _max; 2801 2802 /* Make minimum number of idle streams 2 so that allocating 2 2803 streams at once is easy. This happens when PRIORITY frame to 2804 idle stream, which depends on idle stream which does not 2805 exist. */ 2806 _max = max(2, min(local_settings.max_concurrent_streams, pending_local_max_concurrent_stream)); 2807 2808 LOGF("stream: adjusting kept idle streams num_idle_streams=%d, max=%d", num_idle_streams, _max); 2809 2810 while (num_idle_streams > _max) { 2811 Stream head; 2812 2813 head = idle_stream_head; 2814 assert(head); 2815 2816 idle_stream_head = head.closedNext; 2817 2818 if (idle_stream_head) { 2819 idle_stream_head.closedPrev = null; 2820 } else { 2821 idle_stream_tail = null; 2822 } 2823 2824 destroyStream(head); 2825 /* head is now destroyed */ 2826 --num_idle_streams; 2827 } 2828 } 2829 2830 /* 2831 * Closes stream with stream ID |stream_id| if both transmission and 2832 * reception of the stream were disallowed. The |error_code| indicates 2833 * the reason of the closure. 2834 * 2835 * This function returns 0 if it succeeds, or one of the following 2836 * negative error codes: 2837 * 2838 * ErrorCode.INVALID_ARGUMENT 2839 * The stream is not found. 2840 * ErrorCode.CALLBACK_FAILURE 2841 * The callback function failed. 2842 */ 2843 ErrorCode closeStreamIfShutRdWr(Stream stream) 2844 { 2845 if ((stream.shutFlags & ShutdownFlag.RDWR) == ShutdownFlag.RDWR) 2846 return closeStream(stream.id, FrameError.NO_ERROR); 2847 return ErrorCode.OK; 2848 } 2849 2850 void endRequestHeadersReceived(Frame frame, Stream stream) 2851 { 2852 if (frame.hd.flags & FrameFlags.END_STREAM) { 2853 stream.shutdown(ShutdownFlag.RD); 2854 } 2855 /* Here we assume that stream is not shutdown in ShutdownFlag.WR */ 2856 } 2857 2858 ErrorCode endResponseHeadersReceived(Frame frame, Stream stream) 2859 { 2860 if (frame.hd.flags & FrameFlags.END_STREAM) { 2861 /* This is the last frame of this stream, so disallow further receptions. */ 2862 stream.shutdown(ShutdownFlag.RD); 2863 return closeStreamIfShutRdWr(stream); 2864 } 2865 2866 return ErrorCode.OK; 2867 } 2868 2869 ErrorCode endHeadersReceived(Frame frame, Stream stream) 2870 { 2871 if (frame.hd.flags & FrameFlags.END_STREAM) { 2872 stream.shutdown(ShutdownFlag.RD); 2873 return closeStreamIfShutRdWr(stream); 2874 } 2875 return ErrorCode.OK; 2876 } 2877 2878 2879 ErrorCode onRequestHeaders(Frame frame) 2880 { 2881 ErrorCode rv; 2882 Stream stream; 2883 if (frame.hd.stream_id == 0) { 2884 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: stream_id == 0"); 2885 } 2886 2887 /* If client recieves idle stream from server, it is invalid 2888 regardless stream ID is even or odd. This is because client is 2889 not expected to receive request from server. */ 2890 if (!is_server) { 2891 if (idleStreamDetect(frame.hd.stream_id)) { 2892 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: client received request"); 2893 } 2894 2895 return ErrorCode.IGN_HEADER_BLOCK; 2896 } 2897 2898 if (!isNewPeerStreamId(frame.hd.stream_id)) 2899 { 2900 /* The spec says if an endpoint receives a HEADERS with invalid 2901 stream ID, it MUST issue connection error with error code 2902 PROTOCOL_ERROR. But we could get trailer HEADERS after we have 2903 sent RST_STREAM to this stream and peer have not received it. 2904 Then connection error is too harsh. It means that we only use 2905 connection error if stream ID refers idle stream. OTherwise we 2906 just ignore HEADERS for now. */ 2907 if (idleStreamDetect(frame.hd.stream_id)) { 2908 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: invalid stream_id"); 2909 } 2910 2911 return ErrorCode.IGN_HEADER_BLOCK; 2912 } 2913 2914 last_recv_stream_id = frame.hd.stream_id; 2915 2916 if (goaway_flags & GoAwayFlags.SENT) { 2917 /* We just ignore stream after GOAWAY was queued */ 2918 return ErrorCode.IGN_HEADER_BLOCK; 2919 } 2920 2921 if (isIncomingConcurrentStreamsMax()) { 2922 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: max concurrent streams exceeded"); 2923 } 2924 2925 if (frame.headers.pri_spec.stream_id == frame.hd.stream_id) { 2926 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: depend on itself"); 2927 } 2928 2929 if (isIncomingConcurrentStreamsPendingMax()) { 2930 return handleInflateInvalidStream(frame, FrameError.REFUSED_STREAM); 2931 } 2932 2933 stream = openStream(frame.hd.stream_id, StreamFlags.NONE, frame.headers.pri_spec, StreamState.OPENING, null); 2934 last_proc_stream_id = last_recv_stream_id; 2935 2936 if (!callOnHeaders(frame)) 2937 return ErrorCode.CALLBACK_FAILURE; 2938 2939 return ErrorCode.OK; 2940 } 2941 2942 ErrorCode onResponseHeaders(Frame frame, Stream stream) 2943 { 2944 ErrorCode rv; 2945 /* This function is only called if stream.state == StreamState.OPENING and stream_id is local side initiated. */ 2946 assert(stream.state == StreamState.OPENING && isMyStreamId(frame.hd.stream_id)); 2947 if (frame.hd.stream_id == 0) { 2948 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "response HEADERS: stream_id == 0"); 2949 } 2950 if (stream.shutFlags & ShutdownFlag.RD) { 2951 /* half closed (remote): from the spec: 2952 2953 If an endpoint receives additional frames for a stream that is 2954 in this state it MUST respond with a stream error (Section 2955 5.4.2) of type STREAM_CLOSED. 2956 */ 2957 return handleInflateInvalidStream(frame, FrameError.STREAM_CLOSED); 2958 } 2959 stream.state = StreamState.OPENED; 2960 if (!callOnHeaders(frame)) 2961 return ErrorCode.CALLBACK_FAILURE; 2962 return ErrorCode.OK; 2963 } 2964 2965 ErrorCode onPushResponseHeaders(Frame frame, Stream stream) 2966 { 2967 ErrorCode rv; 2968 assert(stream.state == StreamState.RESERVED); 2969 if (frame.hd.stream_id == 0) { 2970 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "push response HEADERS: stream_id == 0"); 2971 } 2972 if (goaway_flags) { 2973 /* We don't accept new stream after GOAWAY is sent or received. */ 2974 return ErrorCode.IGN_HEADER_BLOCK; 2975 } 2976 2977 if (isIncomingConcurrentStreamsMax()) { 2978 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "push response HEADERS: max concurrent streams exceeded"); 2979 } 2980 if (isIncomingConcurrentStreamsPendingMax()) { 2981 return handleInflateInvalidStream(frame, FrameError.REFUSED_STREAM); 2982 } 2983 2984 stream.promiseFulfilled(); 2985 ++num_incoming_streams; 2986 if (!callOnHeaders(frame)) 2987 return ErrorCode.CALLBACK_FAILURE; 2988 return ErrorCode.OK; 2989 } 2990 2991 /* 2992 * Called when HEADERS is received, assuming |frame| is properly 2993 * initialized. This function will first validate received frame and 2994 * then open stream sending it through callback functions. 2995 * 2996 * This function returns 0 if it succeeds, or one of the following 2997 * negative error codes: 2998 * 2999 * ErrorCode.IGN_HEADER_BLOCK 3000 * Frame was rejected and header block must be decoded but 3001 * result must be ignored. 3002 * ErrorCode.CALLBACK_FAILURE 3003 * The DataProvider failed 3004 */ 3005 ErrorCode onHeaders(Frame frame, Stream stream) 3006 { 3007 ErrorCode rv; 3008 if (frame.hd.stream_id == 0) { 3009 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "HEADERS: stream_id == 0"); 3010 } 3011 if (stream.state == StreamState.RESERVED) 3012 { 3013 /* reserved. The valid push response HEADERS is processed by 3014 onPushResponseHeaders(). This 3015 generic HEADERS is called invalid cases for HEADERS against 3016 reserved state. */ 3017 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "HEADERS: stream in reserved"); 3018 } 3019 if ((stream.shutFlags & ShutdownFlag.RD)) { 3020 /* half closed (remote): from the spec: 3021 3022 If an endpoint receives additional frames for a stream that is 3023 in this state it MUST respond with a stream error (Section 3024 5.4.2) of type STREAM_CLOSED. 3025 */ 3026 return handleInflateInvalidStream(frame, FrameError.STREAM_CLOSED); 3027 } 3028 if (isMyStreamId(frame.hd.stream_id)) { 3029 if (stream.state == StreamState.OPENED) { 3030 if (!callOnHeaders(frame)) 3031 return ErrorCode.CALLBACK_FAILURE; 3032 return ErrorCode.OK; 3033 } else if (stream.state == StreamState.CLOSING) { 3034 /* This is race condition. StreamState.CLOSING indicates 3035 that we queued RST_STREAM but it has not been sent. It will 3036 eventually sent, so we just ignore this frame. */ 3037 return ErrorCode.IGN_HEADER_BLOCK; 3038 } else { 3039 return handleInflateInvalidStream(frame, FrameError.PROTOCOL_ERROR); 3040 } 3041 } 3042 /* If this is remote peer initiated stream, it is OK unless it 3043 has sent END_STREAM frame already. But if stream is in 3044 StreamState.CLOSING, we discard the frame. This is a race 3045 condition. */ 3046 if (stream.state != StreamState.CLOSING) 3047 { 3048 if (!callOnHeaders(frame)) 3049 return ErrorCode.CALLBACK_FAILURE; 3050 return ErrorCode.OK; 3051 } 3052 return ErrorCode.IGN_HEADER_BLOCK; 3053 } 3054 3055 /* 3056 * Called when PRIORITY is received, assuming |frame| is properly 3057 * initialized. 3058 * 3059 * This function returns 0 if it succeeds, or one of the following 3060 * negative error codes: 3061 * 3062 * ErrorCode.CALLBACK_FAILURE 3063 * The DataProvider failed 3064 */ 3065 ErrorCode onPriority(Frame frame) 3066 { 3067 ErrorCode rv; 3068 Stream stream; 3069 3070 if (frame.hd.stream_id == 0) { 3071 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PRIORITY: stream_id == 0"); 3072 } 3073 3074 if (!is_server) { 3075 /* Re-prioritization works only in server */ 3076 bool ok = callOnFrame(frame); 3077 if (!ok) 3078 return ErrorCode.CALLBACK_FAILURE; 3079 return ErrorCode.OK; 3080 } 3081 3082 stream = getStreamRaw(frame.hd.stream_id); 3083 3084 if (!stream) { 3085 /* PRIORITY against idle stream can create anchor node in dependency tree. */ 3086 if (!idleStreamDetect(frame.hd.stream_id)) { 3087 return ErrorCode.OK; 3088 } 3089 3090 stream = openStream(frame.hd.stream_id, StreamFlags.NONE, frame.priority.pri_spec, StreamState.IDLE, null); 3091 } else 3092 reprioritizeStream(stream, frame.priority.pri_spec); 3093 3094 bool ok = callOnFrame(frame); 3095 if (!ok) 3096 return ErrorCode.CALLBACK_FAILURE; 3097 return ErrorCode.OK; 3098 } 3099 3100 /* 3101 * Called when RST_STREAM is received, assuming |frame| is properly 3102 * initialized. 3103 * 3104 * This function returns 0 if it succeeds, or one the following 3105 * negative error codes: 3106 * 3107 * ErrorCode.CALLBACK_FAILURE 3108 * The DataProvider failed 3109 */ 3110 ErrorCode onRstStream(Frame frame) 3111 { 3112 ErrorCode rv; 3113 Stream stream; 3114 if (frame.hd.stream_id == 0) { 3115 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "RST_STREAM: stream_id == 0"); 3116 } 3117 stream = getStream(frame.hd.stream_id); 3118 if (!stream) { 3119 if (idleStreamDetect(frame.hd.stream_id)) { 3120 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "RST_STREAM: stream in idle"); 3121 } 3122 } 3123 3124 bool ok = callOnFrame(frame); 3125 if (!ok) 3126 return ErrorCode.CALLBACK_FAILURE; 3127 3128 rv = closeStream(frame.hd.stream_id, frame.rst_stream.error_code); 3129 if (isFatal(rv)) { 3130 return rv; 3131 } 3132 return ErrorCode.OK; 3133 } 3134 3135 /* 3136 * Called when SETTINGS is received, assuming |frame| is properly 3137 * initialized. If |noack| is non-zero, SETTINGS with ACK will not be 3138 * submitted. If |frame| has NGFrameFlags.ACK flag set, no SETTINGS 3139 * with ACK will not be submitted regardless of |noack|. 3140 * 3141 * This function returns 0 if it succeeds, or one the following 3142 * negative error codes: 3143 * 3144 * ErrorCode.CALLBACK_FAILURE 3145 * The DataProvider failed 3146 */ 3147 3148 ErrorCode onSettings(Frame frame, bool noack) 3149 { 3150 ErrorCode rv; 3151 size_t i; 3152 3153 if (frame.hd.flags & FrameFlags.ACK) { 3154 if (frame.settings.iva.length != 0) { 3155 return handleInvalidConnection(frame, FrameError.FRAME_SIZE_ERROR, "SETTINGS: ACK and payload != 0"); 3156 } 3157 if (!inflight_iva) { 3158 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: unexpected ACK"); 3159 } 3160 rv = updateLocalSettings(inflight_iva); 3161 Mem.free(inflight_iva); 3162 inflight_iva = null; 3163 if (rv != ErrorCode.OK) { 3164 FrameError error_code = FrameError.INTERNAL_ERROR; 3165 if (isFatal(rv)) { 3166 return rv; 3167 } 3168 if (rv == ErrorCode.HEADER_COMP) { 3169 error_code = FrameError.COMPRESSION_ERROR; 3170 } 3171 return handleInvalidConnection(frame, error_code, null); 3172 } 3173 bool ok = callOnFrame(frame); 3174 if (!ok) 3175 return ErrorCode.CALLBACK_FAILURE; 3176 return ErrorCode.OK; 3177 } 3178 3179 if (frame.settings.iva.length > 0 && frame.hd.stream_id != 0) { 3180 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: stream_id != 0"); 3181 } 3182 3183 for (i = 0; i < frame.settings.iva.length; ++i) { 3184 Setting entry = frame.settings.iva[i]; 3185 3186 with(Setting) switch (entry.id) { 3187 case HEADER_TABLE_SIZE: 3188 3189 if (entry.value > MAX_HEADER_TABLE_SIZE) { 3190 return handleInvalidConnection(frame, FrameError.COMPRESSION_ERROR, "SETTINGS: too large Setting.HEADER_TABLE_SIZE"); 3191 } 3192 3193 hd_deflater.changeTableSize(entry.value); 3194 3195 remote_settings.header_table_size = entry.value; 3196 3197 break; 3198 case ENABLE_PUSH: 3199 3200 if (entry.value != 0 && entry.value != 1) { 3201 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: invalid Setting.ENABLE_PUSH"); 3202 } 3203 3204 if (!is_server && entry.value != 0) { 3205 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: server attempted to enable push"); 3206 } 3207 3208 remote_settings.enable_push = entry.value; 3209 3210 break; 3211 case MAX_CONCURRENT_STREAMS: 3212 3213 remote_settings.max_concurrent_streams = entry.value; 3214 3215 break; 3216 case INITIAL_WINDOW_SIZE: 3217 /* Update the initial window size of the all active streams */ 3218 /* Check that initial_window_size < (1u << 31) */ 3219 if (entry.value > MAX_WINDOW_SIZE) { 3220 return handleInvalidConnection(frame, FrameError.FLOW_CONTROL_ERROR, "SETTINGS: too large Setting.INITIAL_WINDOW_SIZE"); 3221 } 3222 3223 rv = updateRemoteInitialWindowSize(entry.value); 3224 3225 if (isFatal(rv)) { 3226 return rv; 3227 } 3228 3229 if (rv != ErrorCode.OK) { 3230 return handleInvalidConnection(frame, FrameError.FLOW_CONTROL_ERROR, null); 3231 } 3232 3233 remote_settings.initial_window_size = entry.value; 3234 3235 break; 3236 case MAX_FRAME_SIZE: 3237 3238 if (entry.value < MAX_FRAME_SIZE_MIN || 3239 entry.value > MAX_FRAME_SIZE_MAX) { 3240 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: invalid Setting.MAX_FRAME_SIZE"); 3241 } 3242 3243 remote_settings.max_frame_size = entry.value; 3244 3245 break; 3246 case MAX_HEADER_LIST_SIZE: 3247 3248 remote_settings.max_header_list_size = entry.value; 3249 3250 break; 3251 default: break; 3252 } 3253 } 3254 3255 if (!noack && !isClosing()) { 3256 rv = addSettings(FrameFlags.ACK, null); 3257 3258 if (rv != ErrorCode.OK) { 3259 if (isFatal(rv)) { 3260 return rv; 3261 } 3262 3263 return handleInvalidConnection(frame, FrameError.INTERNAL_ERROR, null); 3264 } 3265 } 3266 bool ok = callOnFrame(frame); 3267 if (!ok) 3268 return ErrorCode.CALLBACK_FAILURE; 3269 return ErrorCode.OK; 3270 } 3271 /* 3272 * Called when PUSH_PROMISE is received, assuming |frame| is properly 3273 * initialized. 3274 * 3275 * This function returns 0 if it succeeds, or one of the following 3276 * negative error codes: 3277 * 3278 * ErrorCode.IGN_HEADER_BLOCK 3279 * Frame was rejected and header block must be decoded but 3280 * result must be ignored. 3281 * ErrorCode.CALLBACK_FAILURE 3282 * The DataProvider failed 3283 */ 3284 3285 ErrorCode onPushPromise(Frame frame) 3286 { 3287 ErrorCode rv; 3288 Stream stream; 3289 Stream promised_stream; 3290 PrioritySpec pri_spec; 3291 3292 if (frame.hd.stream_id == 0) { 3293 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: stream_id == 0"); 3294 } 3295 if (is_server || local_settings.enable_push == 0) { 3296 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: push disabled"); 3297 } 3298 if (goaway_flags) { 3299 /* We just dicard PUSH_PROMISE after GOAWAY is sent or received. */ 3300 return ErrorCode.IGN_HEADER_BLOCK; 3301 } 3302 3303 if (!isMyStreamId(frame.hd.stream_id)) { 3304 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: invalid stream_id"); 3305 } 3306 3307 if (!isNewPeerStreamId(frame.push_promise.promised_stream_id)) { 3308 /* The spec says if an endpoint receives a PUSH_PROMISE with 3309 illegal stream ID is subject to a connection error of type 3310 PROTOCOL_ERROR. */ 3311 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: invalid promised_stream_id"); 3312 } 3313 last_recv_stream_id = frame.push_promise.promised_stream_id; 3314 stream = getStream(frame.hd.stream_id); 3315 if (!stream || stream.state == StreamState.CLOSING || !pending_enable_push) { 3316 if (!stream) { 3317 if (idleStreamDetect(frame.hd.stream_id)) { 3318 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: stream in idle"); 3319 } 3320 } 3321 addRstStream(frame.push_promise.promised_stream_id, FrameError.REFUSED_STREAM); 3322 return ErrorCode.IGN_HEADER_BLOCK; 3323 } 3324 if (stream.shutFlags & ShutdownFlag.RD) { 3325 try 3326 if (!connector.onInvalidFrame(frame, FrameError.PROTOCOL_ERROR)) 3327 return ErrorCode.CALLBACK_FAILURE; 3328 catch(Exception e) return ErrorCode.CALLBACK_FAILURE; 3329 3330 addRstStream(frame.push_promise.promised_stream_id, FrameError.PROTOCOL_ERROR); 3331 return ErrorCode.IGN_HEADER_BLOCK; 3332 } 3333 3334 /* TODO: It is unclear reserved stream depends on associated stream with or without exclusive flag set */ 3335 pri_spec = PrioritySpec(stream.id, DEFAULT_WEIGHT, 0); 3336 3337 promised_stream = openStream(frame.push_promise.promised_stream_id, StreamFlags.NONE, pri_spec, StreamState.RESERVED, null); 3338 3339 last_proc_stream_id = last_recv_stream_id; 3340 if (!callOnHeaders(frame)) 3341 return ErrorCode.CALLBACK_FAILURE; 3342 return ErrorCode.OK; 3343 } 3344 3345 /* 3346 * Called when PING is received, assuming |frame| is properly 3347 * initialized. 3348 * 3349 * This function returns 0 if it succeeds, or one of the following 3350 * negative error codes: 3351 * 3352 * ErrorCode.CALLBACK_FAILURE 3353 * The callback function failed. 3354 */ 3355 ErrorCode onPing(Frame frame) 3356 { 3357 int rv = 0; 3358 if (frame.hd.stream_id != 0) { 3359 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PING: stream_id != 0"); 3360 } 3361 if ((frame.hd.flags & FrameFlags.ACK) == 0 && !isClosing()) 3362 { 3363 /* Peer sent ping, so ping it back */ 3364 addPing(FrameFlags.ACK, frame.ping.opaque_data); 3365 } 3366 bool ok = callOnFrame(frame); 3367 if (!ok) 3368 return ErrorCode.CALLBACK_FAILURE; 3369 return ErrorCode.OK; 3370 } 3371 3372 /* 3373 * Called when GOAWAY is received, assuming |frame| is properly 3374 * initialized. 3375 * 3376 * This function returns 0 if it succeeds, or one of the following 3377 * negative error codes: 3378 * 3379 * ErrorCode.CALLBACK_FAILURE 3380 * The callback function failed. 3381 */ 3382 ErrorCode onGoAway(Frame frame) 3383 { 3384 if (frame.hd.stream_id != 0) 3385 { 3386 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "GOAWAY: stream_id != 0"); 3387 } 3388 3389 /* Spec says Endpoints MUST NOT increase the value they send in the last stream identifier. */ 3390 if ((frame.goaway.last_stream_id > 0 && !isMyStreamId(frame.goaway.last_stream_id)) || 3391 remote_last_stream_id < frame.goaway.last_stream_id) 3392 { 3393 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "GOAWAY: invalid last_stream_id"); 3394 } 3395 3396 goaway_flags |= GoAwayFlags.RECV; 3397 3398 remote_last_stream_id = frame.goaway.last_stream_id; 3399 3400 bool ok = callOnFrame(frame); 3401 if (!ok) 3402 return ErrorCode.CALLBACK_FAILURE; 3403 3404 return closeStreamOnGoAway(frame.goaway.last_stream_id, false); 3405 } 3406 3407 /* 3408 * Called when WINDOW_UPDATE is recieved, assuming |frame| is properly 3409 * initialized. 3410 * 3411 * This function returns 0 if it succeeds, or one of the following 3412 * negative error codes: 3413 * ErrorCode.CALLBACK_FAILURE 3414 * The callback function failed. 3415 */ 3416 ErrorCode onWindowUpdate(Frame frame) 3417 { 3418 if (frame.hd.stream_id == 0) 3419 { 3420 /* Handle connection-level flow control */ 3421 if (frame.window_update.window_size_increment == 0) 3422 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, null); 3423 3424 if (MAX_WINDOW_SIZE - frame.window_update.window_size_increment < remote_window_size) 3425 return handleInvalidConnection(frame, FrameError.FLOW_CONTROL_ERROR, null); 3426 3427 remote_window_size += frame.window_update.window_size_increment; 3428 3429 } else { 3430 /* handle stream window update */ 3431 Stream stream = getStream(frame.hd.stream_id); 3432 3433 if (!stream) { 3434 //if (idleStreamDetect(frame.hd.stream_id)) 3435 // return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "WINDOW_UPDATE to idle stream"); 3436 return ErrorCode.OK; 3437 } 3438 3439 if (isReservedRemote(stream)) 3440 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "WINDOW_UPADATE to reserved stream"); 3441 3442 if (frame.window_update.window_size_increment == 0) 3443 return handleInvalidStream(frame, FrameError.PROTOCOL_ERROR); 3444 3445 if (MAX_WINDOW_SIZE - frame.window_update.window_size_increment < stream.remoteWindowSize) 3446 return handleInvalidStream(frame, FrameError.FLOW_CONTROL_ERROR); 3447 3448 stream.remoteWindowSize = stream.remoteWindowSize + frame.window_update.window_size_increment; 3449 3450 if (stream.remoteWindowSize > 0 && stream.isDeferredByFlowControl()) 3451 stream.resumeDeferredItem(StreamFlags.DEFERRED_FLOW_CONTROL, this); 3452 } 3453 3454 bool ok = callOnFrame(frame); 3455 3456 if (!ok) 3457 return ErrorCode.CALLBACK_FAILURE; 3458 3459 return ErrorCode.OK; 3460 } 3461 3462 /* 3463 * Called when DATA is received, assuming |frame| is properly 3464 * initialized. 3465 * 3466 * This function returns 0 if it succeeds, or one of the following 3467 * negative error codes: 3468 * ErrorCode.CALLBACK_FAILURE 3469 * The callback function failed. 3470 */ 3471 ErrorCode onData(Frame frame) 3472 { 3473 ErrorCode rv; 3474 bool call_cb = true; 3475 Stream stream = getStream(frame.hd.stream_id); 3476 3477 /* We don't call on_frame_recv_callback if stream has been closed already or being closed. */ 3478 if (!stream || stream.state == StreamState.CLOSING) { 3479 /* This should be treated as stream error, but it results in lots 3480 of RST_STREAM. So just ignore frame against nonexistent stream 3481 for now. */ 3482 return ErrorCode.OK; 3483 } 3484 3485 if (isHTTPMessagingEnabled() && (frame.hd.flags & FrameFlags.END_STREAM)) 3486 { 3487 if (!stream.validateRemoteEndStream()) { 3488 call_cb = false; 3489 handleInvalidStream2(stream.id, frame, FrameError.PROTOCOL_ERROR); 3490 } 3491 } 3492 3493 if (call_cb) { 3494 bool ok = callOnFrame(frame); 3495 if (!ok) { 3496 return ErrorCode.CALLBACK_FAILURE; 3497 } 3498 } 3499 3500 if (frame.hd.flags & FrameFlags.END_STREAM) 3501 { 3502 stream.shutdown(ShutdownFlag.RD); 3503 rv = closeStreamIfShutRdWr(stream); 3504 if (isFatal(rv)) { 3505 return rv; 3506 } 3507 } 3508 return ErrorCode.OK; 3509 } 3510 3511 /* 3512 * Packs DATA frame |frame| in wire frame format and stores it in 3513 * |bufs|. Payload will be read using |aux_data.data_prd|. The 3514 * length of payload is at most |datamax| bytes. 3515 * 3516 * This function returns 0 if it succeeds, or one of the following 3517 * negative error codes: 3518 * 3519 * ErrorCode.DEFERRED 3520 * The DATA frame is postponed. 3521 * ErrorCode.TEMPORAL_CALLBACK_FAILURE 3522 * The DataProvider failed (stream error). 3523 * ErrorCode.CALLBACK_FAILURE 3524 * The DataProvider failed (session error). 3525 */ 3526 ErrorCode packData(Buffers bufs, int datamax, ref Frame frame, ref DataAuxData aux_data) { 3527 ErrorCode rv; 3528 DataFlags data_flags; 3529 int payloadlen; 3530 int padded_payloadlen; 3531 Buffer* buf; 3532 size_t max_payloadlen; 3533 3534 assert(bufs.head == bufs.cur); 3535 3536 buf = &bufs.cur.buf; 3537 3538 Stream stream = getStream(frame.hd.stream_id); 3539 3540 if (!stream) 3541 return ErrorCode.INVALID_ARGUMENT; 3542 3543 try payloadlen = connector.maxFrameSize(frame.hd.type, stream.id, remote_window_size, stream.remoteWindowSize, remote_settings.max_frame_size); 3544 catch(Exception e) payloadlen = min(remote_window_size, stream.remoteWindowSize, remote_settings.max_frame_size); 3545 LOGF("send: read_length_callback=%d", payloadlen); 3546 3547 payloadlen = enforceFlowControlLimits(stream, payloadlen); 3548 3549 LOGF("send: read_length_callback after flow control=%d", payloadlen); 3550 3551 if (payloadlen <= 0) 3552 return ErrorCode.CALLBACK_FAILURE; 3553 3554 if (payloadlen > buf.available) { 3555 import core.exception : OutOfMemoryError; 3556 /* Resize the current buffer(s). The reason why we do +1 for buffer size is for possible padding field. */ 3557 try { 3558 aob.framebufs.realloc(FRAME_HDLEN + 1 + payloadlen); 3559 assert(aob.framebufs == bufs); 3560 buf = &bufs.cur.buf; 3561 } catch (OutOfMemoryError oom) { 3562 /* If reallocation failed, old buffers are still in tact. So use safe limit. */ 3563 payloadlen = datamax; 3564 rv = ErrorCode.NOMEM; 3565 } 3566 } 3567 3568 datamax = payloadlen; 3569 3570 /* Current max DATA length is less then buffer chunk size */ 3571 assert(buf.available >= datamax); 3572 3573 data_flags = DataFlags.NONE; 3574 3575 payloadlen = aux_data.data_prd(buf.pos[0 .. datamax], data_flags); 3576 3577 if (payloadlen == ErrorCode.DEFERRED || 3578 payloadlen == ErrorCode.TEMPORAL_CALLBACK_FAILURE) 3579 { 3580 import libhttp2.types : toString; 3581 LOGF("send: DATA postponed due to %s", toString(cast(ErrorCode)payloadlen)); 3582 3583 return cast(ErrorCode)payloadlen; 3584 } 3585 3586 if (payloadlen < 0 || datamax < cast(size_t)payloadlen) 3587 { 3588 /* This is the error code when callback is failed. */ 3589 return ErrorCode.CALLBACK_FAILURE; 3590 } 3591 3592 buf.last = buf.pos + payloadlen; 3593 buf.pos -= FRAME_HDLEN; 3594 3595 /* Clear flags, because this may contain previous flags of previous DATA */ 3596 frame.hd.flags = FrameFlags.NONE; 3597 3598 if (data_flags & DataFlags.EOF) { 3599 aux_data.eof = true; 3600 if (aux_data.flags & DataFlags.EOF) 3601 frame.hd.flags |= FrameFlags.END_STREAM; 3602 } 3603 3604 if (data_flags & DataFlags.NO_COPY) 3605 aux_data.no_copy = true; 3606 3607 frame.hd.length = payloadlen; 3608 frame.data.padlen = 0; 3609 3610 max_payloadlen = min(datamax, frame.hd.length + MAX_PADLEN); 3611 3612 padded_payloadlen = callSelectPadding(frame, max_payloadlen); 3613 3614 if (isFatal(cast(int)padded_payloadlen)) { 3615 return cast(ErrorCode)padded_payloadlen; 3616 } 3617 3618 frame.data.padlen = padded_payloadlen - payloadlen; 3619 3620 frame.hd.pack((*buf)[]); 3621 3622 frame.hd.addPad(bufs, frame.data.padlen, aux_data.no_copy); 3623 3624 return ErrorCode.OK; 3625 } 3626 /* 3627 * This function is called when HTTP header field |hf| in |frame| is 3628 * received for |stream|. This function will validate |hf| against 3629 * the current state of stream. 3630 * 3631 * This function returns 0 if it succeeds, or one of the following 3632 * negative error codes: 3633 * 3634 * ErrorCode.HTTP_HEADER 3635 * Invalid HTTP header field was received. 3636 * ErrorCode.IGN_HTTP_HEADER 3637 * Invalid HTTP header field was received but it can be treated as 3638 * if it was not received because of compatibility reasons. 3639 */ 3640 ErrorCode validateHeaderField(Stream stream, in Frame frame, HeaderField hf, bool trailer) 3641 { 3642 /* We are strict for pseudo header field. One bad character 3643 should lead to fail. OTOH, we should be a bit forgiving for 3644 regular headers, since existing public internet has so much 3645 illegal headers floating around and if we kill the stream 3646 because of this, we may disrupt many web sites and/or 3647 libraries. So we become conservative here, and just ignore 3648 those illegal regular headers. */ 3649 if (!hf.validateName()) 3650 { 3651 size_t i; 3652 if (hf.name.length > 0 && hf.name[0] == ':') { 3653 return ErrorCode.HTTP_HEADER; 3654 } 3655 /* header field name must be lower-cased without exception */ 3656 for (i = 0; i < hf.name.length; ++i) { 3657 char c = hf.name[i]; 3658 if ('A' <= c && c <= 'Z') { 3659 return ErrorCode.HTTP_HEADER; 3660 } 3661 } 3662 3663 /* When ignoring regular headers, we set this flag so that we 3664 still enforce header field ordering rule for pseudo header 3665 fields. */ 3666 stream.httpFlags = cast(HTTPFlags)(stream.httpFlags | HTTPFlags.PSEUDO_HEADER_DISALLOWED); 3667 return ErrorCode.IGN_HTTP_HEADER; 3668 } 3669 3670 if (!hf.validateValue()) 3671 { 3672 assert(hf.name.length > 0); 3673 if (hf.name[0] == ':') { 3674 return ErrorCode.HTTP_HEADER; 3675 } 3676 3677 /* When ignoring regular headers, we set this flag so that we 3678 still enforce header field ordering rule for pseudo header 3679 fields. */ 3680 stream.httpFlags = cast(HTTPFlags)(stream.httpFlags | HTTPFlags.PSEUDO_HEADER_DISALLOWED); 3681 return ErrorCode.IGN_HTTP_HEADER; 3682 } 3683 3684 if (is_server || frame.hd.type == FrameType.PUSH_PROMISE) 3685 return hf.validateRequestHeader(stream, trailer) ? ErrorCode.OK : ErrorCode.HTTP_HEADER; 3686 3687 return hf.validateResponseHeader(stream, trailer) ? ErrorCode.OK : ErrorCode.HTTP_HEADER; 3688 } 3689 3690 /* 3691 * Pops and returns next item to send. If there is no such item, 3692 * returns null. This function takes into account max concurrent 3693 * streams. That means if session.ob_pq is empty but 3694 * session.ob_ss_pq has item and max concurrent streams is reached, 3695 * then this function returns null. 3696 */ 3697 OutboundItem popNextOutboundItem() { 3698 OutboundItem item; 3699 OutboundItem headers_item; 3700 3701 if (ob_pq.empty) { 3702 if (ob_ss_pq.empty) { 3703 if (remote_window_size == 0 || ob_da_pq.empty) 3704 return null; 3705 item = ob_da_pq.top; 3706 ob_da_pq.pop(); 3707 item.queued = 0; 3708 return item; 3709 } 3710 3711 /* Pop item only when concurrent connection limit is not reached */ 3712 if (isOutgoingConcurrentStreamsMax()) { 3713 if (remote_window_size == 0 || ob_da_pq.empty) 3714 return null; 3715 3716 item = ob_da_pq.top; 3717 ob_da_pq.pop(); 3718 item.queued = 0; 3719 return item; 3720 } 3721 3722 item = ob_ss_pq.top; 3723 ob_ss_pq.pop(); 3724 item.queued = 0; 3725 return item; 3726 } 3727 3728 if (ob_ss_pq.empty) { 3729 item = ob_pq.top; 3730 ob_pq.pop(); 3731 item.queued = 0; 3732 return item; 3733 } 3734 3735 item = ob_pq.top; 3736 headers_item = ob_ss_pq.top; 3737 3738 if (isOutgoingConcurrentStreamsMax() || 3739 item.weight > headers_item.weight || 3740 (item.weight == headers_item.weight && item.seq < headers_item.seq)) 3741 { 3742 ob_pq.pop(); 3743 item.queued = 0; 3744 return item; 3745 } 3746 3747 ob_ss_pq.pop(); 3748 headers_item.queued = 0; 3749 return headers_item; 3750 } 3751 3752 /* 3753 * Returns next item to send. If there is no such item, this function 3754 * returns null. This function takes into account max concurrent 3755 * streams. That means if session.ob_pq is empty but 3756 * session.ob_ss_pq has item and max concurrent streams is reached, 3757 * then this function returns null. 3758 */ 3759 OutboundItem getNextOutboundItem() { 3760 OutboundItem item; 3761 OutboundItem headers_item; 3762 3763 if (ob_pq.empty) { 3764 if (ob_ss_pq.empty) { 3765 if (remote_window_size == 0 || ob_da_pq.empty) 3766 return null; 3767 3768 return ob_da_pq.top; 3769 } 3770 3771 /* Return item only when concurrent connection limit is not reached */ 3772 if (isOutgoingConcurrentStreamsMax()) { 3773 if (remote_window_size == 0 || ob_da_pq.empty) 3774 return null; 3775 3776 return ob_da_pq.top; 3777 } 3778 3779 return ob_ss_pq.top; 3780 } 3781 3782 if (ob_ss_pq.empty) { 3783 return ob_pq.top; 3784 } 3785 3786 item = ob_pq.top; 3787 headers_item = ob_ss_pq.top; 3788 3789 if (isOutgoingConcurrentStreamsMax() || item.weight > headers_item.weight || 3790 (item.weight == headers_item.weight && item.seq < headers_item.seq)) 3791 { 3792 return item; 3793 } 3794 3795 return headers_item; 3796 } 3797 3798 /* 3799 * Updates local settings with the |iva|. The number of elements in the 3800 * array pointed by the |iva| is given by the |iva.length|. This function 3801 * assumes that the all settings_id member in |iva| are in range 1 to 3802 * Setting.MAX_HEADER_LIST_SIZE, inclusive. 3803 * 3804 * While updating individual stream's local window size, if the window 3805 * size becomes strictly larger than MAX_WINDOW_SIZE, 3806 * RST_STREAM is issued against such a stream. 3807 * 3808 * This function returns 0 if it succeeds, or one of the following 3809 * negative error codes: 3810 * 3811 * ErrorCode.HEADER_COMP 3812 * The header table size is out of range 3813 */ 3814 ErrorCode updateLocalSettings(in Setting[] iva) 3815 { 3816 ErrorCode rv; 3817 size_t i; 3818 int new_initial_window_size = -1; 3819 int header_table_size = -1; 3820 bool header_table_size_seen; 3821 /* Use the value last seen. */ 3822 foreach(iv; iva) { 3823 switch (iv.id) { 3824 case Setting.HEADER_TABLE_SIZE: 3825 header_table_size_seen = true; 3826 header_table_size = iv.value; 3827 break; 3828 case Setting.INITIAL_WINDOW_SIZE: 3829 new_initial_window_size = iv.value; 3830 break; 3831 default: break; 3832 } 3833 } 3834 if (header_table_size_seen) 3835 hd_inflater.changeTableSize(header_table_size); 3836 3837 if (new_initial_window_size != -1) { 3838 rv = updateLocalInitialWindowSize(new_initial_window_size, local_settings.initial_window_size); 3839 if (rv != ErrorCode.OK) { 3840 return rv; 3841 } 3842 } 3843 3844 foreach(iv; iva) { 3845 with(Setting) switch (iv.id) { 3846 case HEADER_TABLE_SIZE: 3847 local_settings.header_table_size = iv.value; 3848 break; 3849 case ENABLE_PUSH: 3850 local_settings.enable_push = iv.value; 3851 break; 3852 case MAX_CONCURRENT_STREAMS: 3853 local_settings.max_concurrent_streams = iv.value; 3854 break; 3855 case INITIAL_WINDOW_SIZE: 3856 local_settings.initial_window_size = iv.value; 3857 break; 3858 case MAX_FRAME_SIZE: 3859 local_settings.max_frame_size = iv.value; 3860 break; 3861 case MAX_HEADER_LIST_SIZE: 3862 local_settings.max_header_list_size = iv.value; 3863 break; 3864 default: break; 3865 } 3866 } 3867 3868 pending_local_max_concurrent_stream = INITIAL_MAX_CONCURRENT_STREAMS; 3869 pending_enable_push = true; 3870 3871 return ErrorCode.OK; 3872 } 3873 3874 /* 3875 * Re-prioritize |stream|. The new priority specification is |pri_spec|. 3876 */ 3877 void reprioritizeStream(Stream stream, ref PrioritySpec pri_spec) 3878 { 3879 Stream dep_stream; 3880 Stream root_stream; 3881 PrioritySpec pri_spec_default; 3882 3883 if (!stream.inDepTree()) 3884 return; 3885 3886 if (pri_spec.stream_id == stream.id) { 3887 terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "depend on itself"); 3888 return; 3889 } 3890 3891 if (pri_spec.stream_id != 0) { 3892 dep_stream = getStreamRaw(pri_spec.stream_id); 3893 3894 if (is_server && !dep_stream && idleStreamDetect(pri_spec.stream_id)) 3895 { 3896 dep_stream = openStream(pri_spec.stream_id, StreamFlags.NONE, pri_spec_default, StreamState.IDLE, null); 3897 3898 } else if (!dep_stream || !dep_stream.inDepTree()) { 3899 pri_spec = pri_spec_default; 3900 } 3901 } 3902 3903 if (pri_spec.stream_id == 0) { 3904 stream.removeSubtree(); 3905 3906 /* We have to update weight after removing stream from tree */ 3907 stream.weight = pri_spec.weight; 3908 3909 if (pri_spec.exclusive && 3910 roots.num_streams <= MAX_DEP_TREE_LENGTH) { 3911 3912 stream.makeTopmostRoot(this); 3913 } else { 3914 stream.makeRoot(this); 3915 } 3916 3917 return; 3918 } 3919 3920 assert(dep_stream); 3921 3922 if (stream.subtreeContains(dep_stream)) { 3923 LOGF("stream: cycle detected, dep_stream(%s=%d stream(%s)=%d", dep_stream, dep_stream.id, stream, stream.id); 3924 3925 dep_stream.removeSubtree(); 3926 dep_stream.makeRoot(this); 3927 } 3928 3929 stream.removeSubtree(); 3930 3931 /* We have to update weight after removing stream from tree */ 3932 stream.weight = pri_spec.weight; 3933 3934 root_stream = dep_stream.getRoot(); 3935 3936 if (root_stream.subStreams + stream.subStreams > MAX_DEP_TREE_LENGTH) 3937 { 3938 stream.weight = DEFAULT_WEIGHT; 3939 3940 stream.makeRoot(this); 3941 } else { 3942 if (pri_spec.exclusive) 3943 dep_stream.insertSubtree(stream, this); 3944 else 3945 dep_stream.addSubtree(stream, this); 3946 } 3947 } 3948 3949 /* 3950 * Terminates current $(D Session) with the |error_code|. The |reason| 3951 * is null-terminated debug string. 3952 * 3953 * This function returns 0 if it succeeds, or one of the following 3954 * negative error codes: 3955 * 3956 * ErrorCode.INVALID_ARGUMENT 3957 * The |reason| is too long. 3958 */ 3959 ErrorCode terminateSessionWithReason(FrameError error_code, string reason) 3960 { 3961 return terminateSession(last_proc_stream_id, error_code, reason); 3962 } 3963 3964 /* 3965 * Returns true if the number of outgoing opened streams is larger than or equal to 3966 * remote_settings.max_concurrent_streams. 3967 */ 3968 bool isOutgoingConcurrentStreamsMax() 3969 { 3970 return remote_settings.max_concurrent_streams <= num_outgoing_streams; 3971 } 3972 3973 /* 3974 * Returns true if the number of incoming opened streams is larger 3975 * than or equal to local_settings.max_concurrent_streams. 3976 */ 3977 bool isIncomingConcurrentStreamsMax() 3978 { 3979 return local_settings.max_concurrent_streams <= num_incoming_streams; 3980 } 3981 3982 /* 3983 * Returns true if the number of incoming opened streams is larger 3984 * than or equal to session.pending_local_max_concurrent_stream. 3985 */ 3986 bool isIncomingConcurrentStreamsPendingMax() 3987 { 3988 return pending_local_max_concurrent_stream <= num_incoming_streams; 3989 } 3990 3991 bool isHTTPMessagingEnabled() 3992 { 3993 return (opt_flags & OptionsMask.NO_HTTP_MESSAGING) == 0; 3994 } 3995 3996 /* 3997 * Returns true if |frame| is trailer headers. 3998 */ 3999 bool isTrailerHeaders(Stream stream, in Frame frame) 4000 { 4001 if (!stream || frame.hd.type != FrameType.HEADERS) { 4002 return false; 4003 } 4004 if (is_server) { 4005 return frame.headers.cat == HeadersCategory.HEADERS; 4006 } 4007 4008 return frame.headers.cat == HeadersCategory.HEADERS && (stream.httpFlags & HTTPFlags.EXPECT_FINAL_RESPONSE) == 0; 4009 } 4010 4011 /* Returns true if the |stream| is in reserved(remote) state */ 4012 bool isReservedRemote(Stream stream) 4013 { 4014 return stream.state == StreamState.RESERVED && !isMyStreamId(stream.id); 4015 } 4016 4017 /* Returns true if the |stream| is in reserved(local) state */ 4018 bool isReservedLocal(Stream stream) { 4019 return stream.state == StreamState.RESERVED && isMyStreamId(stream.id); 4020 } 4021 4022 /* 4023 * Checks whether received stream_id is valid. 4024 */ 4025 bool isNewPeerStreamId(int stream_id) 4026 { 4027 return stream_id != 0 && !isMyStreamId(stream_id) && last_recv_stream_id < stream_id; 4028 } 4029 4030 4031 /** 4032 * @function 4033 * 4034 * Returns the last stream ID of a stream for which 4035 * $(D Connector.onFrame) was invoked most recently. 4036 * The returned value can be used as last_stream_id parameter for 4037 * `submitGoAway()` and `terminateSession()`. 4038 * 4039 * This function always succeeds. 4040 */ 4041 int getLastProcStreamID() 4042 { 4043 return last_proc_stream_id; 4044 } 4045 4046 bool idleStreamDetect(int stream_id) 4047 { 4048 /* Assume that stream object with stream_id does not exist */ 4049 if (isMyStreamId(stream_id)) { 4050 if (next_stream_id <= cast(uint)stream_id) 4051 return true; 4052 return false; 4053 } 4054 if (isNewPeerStreamId(stream_id)) 4055 return true; 4056 4057 return false; 4058 } 4059 4060 void freeAllStreams() { 4061 foreach(stream; streams) 4062 { 4063 OutboundItem item = stream.item; 4064 4065 if (item && !item.queued && item != aob.item) 4066 { 4067 item.free(); 4068 Mem.free(item); 4069 } 4070 4071 stream.free(); 4072 Mem.free(stream); 4073 } 4074 } 4075 4076 /* 4077 * Returns Stream object whose stream ID is |stream_id|. It 4078 * could be null if such stream does not exist. This function returns 4079 * null if stream is marked as closed. 4080 */ 4081 Stream getStream(int stream_id) 4082 { 4083 Stream stream; 4084 4085 stream = streams.get(stream_id); 4086 4087 if (!stream || (stream.flags & StreamFlags.CLOSED) || stream.state == StreamState.IDLE) 4088 { 4089 return null; 4090 } 4091 4092 return stream; 4093 } 4094 /* 4095 * This function behaves like getStream(), but it 4096 * returns stream object even if it is marked as closed or in 4097 * StreamState.IDLE state. 4098 */ 4099 Stream getStreamRaw(int stream_id) 4100 { 4101 return streams.get(stream_id); 4102 } 4103 4104 // terminates the session 4105 ErrorCode terminateSession(int last_stream_id, FrameError error_code, string reason) 4106 { 4107 ErrorCode rv; 4108 string debug_data; 4109 4110 if (goaway_flags & GoAwayFlags.TERM_ON_SEND) { 4111 return ErrorCode.OK; 4112 } 4113 4114 if (!reason) { 4115 debug_data = null; 4116 } else { 4117 debug_data = reason; 4118 } 4119 4120 rv = addGoAway(last_stream_id, error_code, debug_data, GoAwayAuxFlags.TERM_ON_SEND); 4121 4122 if (rv != ErrorCode.OK) { 4123 return rv; 4124 } 4125 4126 goaway_flags |= GoAwayFlags.TERM_ON_SEND; 4127 4128 return ErrorCode.OK; 4129 } 4130 4131 /* 4132 * This function returns nonzero if session is closing. 4133 */ 4134 bool isClosing() 4135 { 4136 return (goaway_flags & GoAwayFlags.TERM_ON_SEND) != 0; 4137 } 4138 4139 /* 4140 * Check that we can send a frame to the |stream|. This function 4141 * returns 0 if we can send a frame to the |frame|, or one of the 4142 * following negative error codes: 4143 * 4144 * ErrorCode.STREAM_CLOSED 4145 * The stream is already closed. 4146 * ErrorCode.STREAM_SHUT_WR 4147 * The stream is half-closed for transmission. 4148 * ErrorCode.SESSION_CLOSING 4149 * This session is closing. 4150 */ 4151 ErrorCode predicateForStreamSend(Stream stream) 4152 { 4153 if (!stream) { 4154 return ErrorCode.STREAM_CLOSED; 4155 } 4156 if (isClosing()) { 4157 return ErrorCode.SESSION_CLOSING; 4158 } 4159 if (stream.shutFlags & ShutdownFlag.WR) { 4160 return ErrorCode.STREAM_SHUT_WR; 4161 } 4162 return ErrorCode.OK; 4163 } 4164 4165 /* 4166 * This function checks request HEADERS frame, which opens stream, can 4167 * be sent at this time. 4168 * 4169 * This function returns 0 if it succeeds, or one of the following 4170 * negative error codes: 4171 * 4172 * ErrorCode.START_STREAM_NOT_ALLOWED 4173 * New stream cannot be created because of GOAWAY: session is 4174 * going down or received last_stream_id is strictly less than 4175 * frame.hd.stream_id. 4176 * ErrorCode.STREAM_CLOSING 4177 * request HEADERS was canceled by RST_STREAM while it is in queue. 4178 */ 4179 ErrorCode predicateRequestHeadersSend(OutboundItem item) 4180 { 4181 if (item.aux_data.headers.canceled) { 4182 return ErrorCode.STREAM_CLOSING; 4183 } 4184 /* If we are terminating session (GoAwayFlags.TERM_ON_SEND) or 4185 * GOAWAY was received from peer, new request is not allowed. */ 4186 4187 if (goaway_flags & (GoAwayFlags.TERM_ON_SEND | GoAwayFlags.RECV)) 4188 { 4189 return ErrorCode.START_STREAM_NOT_ALLOWED; 4190 } 4191 return ErrorCode.OK; 4192 } 4193 4194 /* 4195 * This function checks HEADERS, which is the first frame from the 4196 * server, with the |stream| can be sent at this time. The |stream| 4197 * can be null. 4198 * 4199 * This function returns 0 if it succeeds, or one of the following 4200 * negative error codes: 4201 * 4202 * ErrorCode.STREAM_CLOSED 4203 * The stream is already closed or does not exist. 4204 * ErrorCode.STREAM_SHUT_WR 4205 * The transmission is not allowed for this stream (e.g., a frame 4206 * with END_STREAM flag set has already sent) 4207 * ErrorCode.INVALID_STREAM_ID 4208 * The stream ID is invalid. 4209 * ErrorCode.STREAM_CLOSING 4210 * RST_STREAM was queued for this stream. 4211 * ErrorCode.INVALID_STREAM_STATE 4212 * The state of the stream is not valid. 4213 * ErrorCode.SESSION_CLOSING 4214 * This session is closing. 4215 */ 4216 ErrorCode predicateResponseHeadersSend(Stream stream) 4217 { 4218 ErrorCode rv; 4219 rv = predicateForStreamSend(stream); 4220 if (rv != ErrorCode.OK) { 4221 return rv; 4222 } 4223 assert(stream); 4224 if (isMyStreamId(stream.id)) { 4225 return ErrorCode.INVALID_STREAM_ID; 4226 } 4227 if (stream.state == StreamState.OPENING) { 4228 return ErrorCode.OK; 4229 } 4230 if (stream.state == StreamState.CLOSING) { 4231 return ErrorCode.STREAM_CLOSING; 4232 } 4233 return ErrorCode.INVALID_STREAM_STATE; 4234 } 4235 4236 /* 4237 * This function checks HEADERS for reserved stream can be sent. The 4238 * |stream| must be reserved state and the |session| is server side. 4239 * The |stream| can be null. 4240 * 4241 * This function returns 0 if it succeeds, or one of the following 4242 * error codes: 4243 * 4244 * ErrorCode.STREAM_CLOSED 4245 * The stream is already closed. 4246 * ErrorCode.STREAM_SHUT_WR 4247 * The stream is half-closed for transmission. 4248 * ErrorCode.PROTO 4249 * The stream is not reserved state 4250 * ErrorCode.STREAM_CLOSED 4251 * RST_STREAM was queued for this stream. 4252 * ErrorCode.SESSION_CLOSING 4253 * This session is closing. 4254 */ 4255 ErrorCode predicatePushResponseHeadersSend(Stream stream) 4256 { 4257 ErrorCode rv; 4258 /* TODO Should disallow HEADERS if GOAWAY has already been issued? */ 4259 rv = predicateForStreamSend(stream); 4260 if (rv != ErrorCode.OK) { 4261 return rv; 4262 } 4263 assert(stream); 4264 if (stream.state != StreamState.RESERVED) { 4265 return ErrorCode.PROTO; 4266 } 4267 if (stream.state == StreamState.CLOSING) { 4268 return ErrorCode.STREAM_CLOSING; 4269 } 4270 return ErrorCode.OK; 4271 } 4272 4273 /* 4274 * This function checks HEADERS, which is neither stream-opening nor 4275 * first response header, with the |stream| can be sent at this time. 4276 * The |stream| can be null. 4277 * 4278 * This function returns 0 if it succeeds, or one of the following 4279 * negative error codes: 4280 * 4281 * ErrorCode.STREAM_CLOSED 4282 * The stream is already closed or does not exist. 4283 * ErrorCode.STREAM_SHUT_WR 4284 * The transmission is not allowed for this stream (e.g., a frame 4285 * with END_STREAM flag set has already sent) 4286 * ErrorCode.STREAM_CLOSING 4287 * RST_STREAM was queued for this stream. 4288 * ErrorCode.INVALID_STREAM_STATE 4289 * The state of the stream is not valid. 4290 * ErrorCode.SESSION_CLOSING 4291 * This session is closing. 4292 */ 4293 ErrorCode predicateHeadersSend(Stream stream) 4294 { 4295 ErrorCode rv; 4296 rv = predicateForStreamSend(stream); 4297 if (rv != ErrorCode.OK) { 4298 return rv; 4299 } 4300 assert(stream); 4301 if (isMyStreamId(stream.id)) 4302 { 4303 if (stream.state == StreamState.CLOSING) { 4304 return ErrorCode.STREAM_CLOSING; 4305 } 4306 return ErrorCode.OK; 4307 } 4308 if (stream.state == StreamState.OPENED) { 4309 return ErrorCode.OK; 4310 } 4311 if (stream.state == StreamState.CLOSING) { 4312 return ErrorCode.STREAM_CLOSING; 4313 } 4314 return ErrorCode.INVALID_STREAM_STATE; 4315 } 4316 4317 /* 4318 * This function checks PUSH_PROMISE frame |frame| with the |stream| 4319 * can be sent at this time. The |stream| can be null. 4320 * 4321 * This function returns 0 if it succeeds, or one of the following 4322 * negative error codes: 4323 * 4324 * ErrorCode.START_STREAM_NOT_ALLOWED 4325 * New stream cannot be created because GOAWAY is already sent or 4326 * received. 4327 * ErrorCode.PROTO 4328 * The client side attempts to send PUSH_PROMISE, or the server 4329 * sends PUSH_PROMISE for the stream not initiated by the client. 4330 * ErrorCode.STREAM_CLOSED 4331 * The stream is already closed or does not exist. 4332 * ErrorCode.STREAM_CLOSING 4333 * RST_STREAM was queued for this stream. 4334 * ErrorCode.STREAM_SHUT_WR 4335 * The transmission is not allowed for this stream (e.g., a frame 4336 * with END_STREAM flag set has already sent) 4337 * ErrorCode.PUSH_DISABLED 4338 * The remote peer disabled reception of PUSH_PROMISE. 4339 * ErrorCode.SESSION_CLOSING 4340 * This session is closing. 4341 */ 4342 ErrorCode predicatePushPromiseSend(Stream stream) 4343 { 4344 ErrorCode rv; 4345 4346 if (!is_server) { 4347 return ErrorCode.PROTO; 4348 } 4349 4350 rv = predicateForStreamSend(stream); 4351 if (rv != ErrorCode.OK) { 4352 return rv; 4353 } 4354 4355 assert(stream); 4356 4357 if (remote_settings.enable_push == 0) { 4358 return ErrorCode.PUSH_DISABLED; 4359 } 4360 if (stream.state == StreamState.CLOSING) { 4361 return ErrorCode.STREAM_CLOSING; 4362 } 4363 if (goaway_flags & GoAwayFlags.RECV) { 4364 return ErrorCode.START_STREAM_NOT_ALLOWED; 4365 } 4366 return ErrorCode.OK; 4367 } 4368 4369 /* 4370 * This function checks WINDOW_UPDATE with the stream ID |stream_id| 4371 * can be sent at this time. Note that END_STREAM flag of the previous 4372 * frame does not affect the transmission of the WINDOW_UPDATE frame. 4373 * 4374 * This function returns 0 if it succeeds, or one of the following 4375 * negative error codes: 4376 * 4377 * ErrorCode.STREAM_CLOSED 4378 * The stream is already closed or does not exist. 4379 * ErrorCode.STREAM_CLOSING 4380 * RST_STREAM was queued for this stream. 4381 * ErrorCode.INVALID_STREAM_STATE 4382 * The state of the stream is not valid. 4383 * ErrorCode.SESSION_CLOSING 4384 * This session is closing. 4385 */ 4386 ErrorCode predicateWindowUpdateSend(int stream_id) 4387 { 4388 Stream stream; 4389 if (stream_id == 0) { 4390 /* Connection-level window update */ 4391 return ErrorCode.OK; 4392 } 4393 stream = getStream(stream_id); 4394 if (!stream) { 4395 return ErrorCode.STREAM_CLOSED; 4396 } 4397 if (isClosing()) { 4398 return ErrorCode.SESSION_CLOSING; 4399 } 4400 if (stream.state == StreamState.CLOSING) { 4401 return ErrorCode.STREAM_CLOSING; 4402 } 4403 if (isReservedLocal(stream)) { 4404 return ErrorCode.INVALID_STREAM_STATE; 4405 } 4406 return ErrorCode.OK; 4407 } 4408 4409 /* 4410 * This function checks DATA with the |stream| can be sent at this 4411 * time. The |stream| can be null. 4412 * 4413 * This function returns 0 if it succeeds, or one of the following 4414 * negative error codes: 4415 * 4416 * ErrorCode.STREAM_CLOSED 4417 * The stream is already closed or does not exist. 4418 * ErrorCode.STREAM_SHUT_WR 4419 * The transmission is not allowed for this stream (e.g., a frame 4420 * with END_STREAM flag set has already sent) 4421 * ErrorCode.STREAM_CLOSING 4422 * RST_STREAM was queued for this stream. 4423 * ErrorCode.INVALID_STREAM_STATE 4424 * The state of the stream is not valid. 4425 * ErrorCode.SESSION_CLOSING 4426 * This session is closing. 4427 */ 4428 ErrorCode predicateDataSend(Stream stream) 4429 { 4430 ErrorCode rv; 4431 rv = predicateForStreamSend(stream); 4432 if (rv != ErrorCode.OK) { 4433 return rv; 4434 } 4435 assert(stream); 4436 if (isMyStreamId(stream.id)) { 4437 /* Request body data */ 4438 /* If stream.state is StreamState.CLOSING, RST_STREAM was queued but not yet sent. In this case, we won't send DATA frames. */ 4439 if (stream.state == StreamState.CLOSING) { 4440 return ErrorCode.STREAM_CLOSING; 4441 } 4442 if (stream.state == StreamState.RESERVED) { 4443 return ErrorCode.INVALID_STREAM_STATE; 4444 } 4445 return ErrorCode.OK; 4446 } 4447 /* Response body data */ 4448 if (stream.state == StreamState.OPENED) { 4449 return ErrorCode.OK; 4450 } 4451 if (stream.state == StreamState.CLOSING) { 4452 return ErrorCode.STREAM_CLOSING; 4453 } 4454 return ErrorCode.INVALID_STREAM_STATE; 4455 } 4456 4457 4458 /* Take into account settings max frame size and both connection-level flow control here */ 4459 int enforceFlowControlLimits(Stream stream, int requested_window_size) 4460 { 4461 LOGF("send: remote windowsize connection=%d, remote maxframsize=%u, stream(id %d=%d", 4462 remote_window_size, 4463 remote_settings.max_frame_size, stream.id, 4464 stream.remoteWindowSize); 4465 4466 return min(min(min(requested_window_size, stream.remoteWindowSize), remote_window_size), cast(int)remote_settings.max_frame_size); 4467 } 4468 4469 /* 4470 * Now we have SETTINGS synchronization, flow control error can be 4471 * detected strictly. If DATA frame is received with length > 0 and 4472 * current received window size + delta length is strictly larger than 4473 * local window size, it is subject to FLOW_CONTROL_ERROR, so return 4474 * false. Note that local_window_size is calculated after SETTINGS ACK is 4475 * received from peer, so peer must honor this limit. If the resulting 4476 * recv_window_size is strictly larger than MAX_WINDOW_SIZE, 4477 * return false too. 4478 */ 4479 bool adjustRecvWindowSize(ref int _recv_window_size, size_t delta, int local_window_size) 4480 { 4481 if (_recv_window_size > local_window_size - cast(int)delta || 4482 _recv_window_size > MAX_WINDOW_SIZE - cast(int)delta) 4483 { 4484 return false; 4485 } 4486 _recv_window_size += delta; 4487 return true; 4488 } 4489 /* 4490 * Accumulates received bytes |delta_size| for stream-level flow 4491 * control and decides whether to send WINDOW_UPDATE to that stream. 4492 * If OptionFlags.NO_AUTO_WINDOW_UPDATE is set, WINDOW_UPDATE will not 4493 * be sent. 4494 */ 4495 void updateRecvStreamWindowSize(Stream stream, size_t delta_size, int send_window_update) 4496 { 4497 bool ok = adjustRecvWindowSize(stream.recvWindowSize, delta_size, stream.localWindowSize); 4498 if (!ok) { 4499 addRstStream(stream.id, FrameError.FLOW_CONTROL_ERROR); 4500 return; 4501 } 4502 /* We don't have to send WINDOW_UPDATE if the data received is the last chunk in the incoming stream. */ 4503 if (send_window_update && !(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) { 4504 /* We have to use local_settings here because it is the constraint the remote endpoint should honor. */ 4505 if (shouldSendWindowUpdate(stream.localWindowSize, stream.recvWindowSize)) { 4506 addWindowUpdate(FrameFlags.NONE, stream.id, stream.recvWindowSize); 4507 stream.recvWindowSize = 0; 4508 } 4509 } 4510 } 4511 4512 /* 4513 * Accumulates received bytes |delta_size| for connection-level flow 4514 * control and decides whether to send WINDOW_UPDATE to the 4515 * connection. If OptionFlags.NO_AUTO_WINDOW_UPDATE is set, 4516 * WINDOW_UPDATE will not be sent. 4517 */ 4518 ErrorCode updateRecvConnectionWindowSize(size_t delta_size) 4519 { 4520 ErrorCode rv; 4521 bool ok = adjustRecvWindowSize(recv_window_size, delta_size, local_window_size); 4522 if (!ok) { 4523 return terminateSession(FrameError.FLOW_CONTROL_ERROR); 4524 } 4525 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) 4526 { 4527 4528 if (shouldSendWindowUpdate(local_window_size, recv_window_size)) 4529 { 4530 /* Use stream ID 0 to update connection-level flow control window */ 4531 addWindowUpdate(FrameFlags.NONE, 0, recv_window_size); 4532 recv_window_size = 0; 4533 } 4534 } 4535 return ErrorCode.OK; 4536 } 4537 4538 ErrorCode updateConsumedSize(ref int consumed_size, ref int recv_window_size, int stream_id, size_t delta_size, int local_window_size) 4539 { 4540 int recv_size; 4541 ErrorCode rv; 4542 4543 if (cast(size_t)consumed_size > MAX_WINDOW_SIZE - delta_size) 4544 { 4545 return terminateSession(FrameError.FLOW_CONTROL_ERROR); 4546 } 4547 4548 consumed_size += delta_size; 4549 4550 /* recv_window_size may be smaller than consumed_size, because it may be decreased by negative value with http2_submit_window_update(). */ 4551 recv_size = min(consumed_size, recv_window_size); 4552 4553 if (shouldSendWindowUpdate(local_window_size, recv_size)) 4554 { 4555 addWindowUpdate(FrameFlags.NONE, stream_id, recv_size); 4556 recv_window_size -= recv_size; 4557 consumed_size -= recv_size; 4558 } 4559 4560 return ErrorCode.OK; 4561 } 4562 4563 ErrorCode updateStreamConsumedSize(Stream stream, size_t delta_size) 4564 { 4565 return updateConsumedSize(stream.consumedSize, stream.recvWindowSize, stream.id, delta_size, stream.localWindowSize); 4566 } 4567 4568 ErrorCode updateConnectionConsumedSize(size_t delta_size) 4569 { 4570 return updateConsumedSize(consumed_size, recv_window_size, 0, delta_size, local_window_size); 4571 } 4572 4573 4574 /* 4575 * Returns the maximum length of next data read. If the 4576 * connection-level and/or stream-wise flow control are enabled, the 4577 * return value takes into account those current window sizes. The remote 4578 * settings for max frame size is also taken into account. 4579 */ 4580 int nextDataRead(Stream stream) 4581 { 4582 int window_size; 4583 window_size = enforceFlowControlLimits(stream, DATA_PAYLOADLEN); 4584 4585 LOGF("send: available window=%d", window_size); 4586 4587 return window_size > 0 ? window_size : 0; 4588 } 4589 4590 int callSelectPadding(in Frame frame, size_t max_payloadlen) 4591 { 4592 int rv; 4593 4594 if (frame.hd.length >= max_payloadlen) { 4595 return frame.hd.length; 4596 } 4597 4598 int max_paddedlen = cast(int) min(frame.hd.length + MAX_PADLEN, max_payloadlen); 4599 4600 try rv = connector.selectPaddingLength(frame, max_paddedlen); 4601 catch (Exception e) return cast(int) ErrorCode.CALLBACK_FAILURE; 4602 if (rv < cast(int)frame.hd.length || rv > cast(int)max_paddedlen) { 4603 return cast(int) ErrorCode.CALLBACK_FAILURE; 4604 } 4605 return rv; 4606 } 4607 4608 ErrorCode callWriteData(OutboundItem item, Buffers framebufs) 4609 { 4610 Buffer* buf = &framebufs.cur.buf; 4611 Frame* frame = &item.frame; 4612 uint length = frame.hd.length - frame.data.padlen; 4613 FrameHeader hd; 4614 hd.unpack(buf.pos[0 .. FRAME_HDLEN]); 4615 ErrorCode rv = connector.writeData(*frame, buf.pos[0 .. FRAME_HDLEN], length); 4616 4617 if (rv == ErrorCode.OK || rv == ErrorCode.WOULDBLOCK || rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) 4618 return rv; 4619 return ErrorCode.CALLBACK_FAILURE; 4620 4621 } 4622 4623 bool callOnFrameReady(in Frame frame) 4624 { 4625 try return connector.onFrameReady(frame); 4626 catch(Exception e) return false; 4627 } 4628 4629 bool callOnFrameSent(in Frame frame) 4630 { 4631 try return connector.onFrameSent(frame); 4632 catch(Exception e) return false; 4633 } 4634 4635 bool callOnFrameHeader(in FrameHeader hd) 4636 { 4637 try return connector.onFrameHeader(hd); 4638 catch(Exception e) return false; 4639 } 4640 4641 bool callOnHeaders(in Frame frame) 4642 { 4643 LOGF("recv: call onHeaders callback stream_id=%d", frame.hd.stream_id); 4644 try return connector.onHeaders(frame); 4645 catch(Exception e) return false; 4646 } 4647 4648 bool callOnHeaderField(in Frame frame, in HeaderField hf, ref bool pause, ref bool close) 4649 { 4650 try return connector.onHeaderField(frame, hf, pause, close); 4651 catch(Exception e) return false; 4652 } 4653 4654 int callRead(ubyte[] buf) 4655 { 4656 int len; 4657 try len = connector.read(buf); 4658 catch(Exception e) return ErrorCode.CALLBACK_FAILURE; 4659 4660 if (len > 0) { 4661 if (cast(size_t) len > buf.length) 4662 return ErrorCode.CALLBACK_FAILURE; 4663 } else if (len < 0 && len != cast(int) ErrorCode.WOULDBLOCK && len != cast(int)ErrorCode.EOF) 4664 return ErrorCode.CALLBACK_FAILURE; 4665 4666 return len; 4667 } 4668 4669 bool callOnFrame(in Frame frame) 4670 { 4671 try return connector.onFrame(frame); 4672 catch(Exception e) return false; 4673 } 4674 4675 /* 4676 * Checks that we can receive the DATA frame for stream, which is 4677 * indicated by |session.iframe.frame.hd.stream_id|. If it is a 4678 * connection error situation, GOAWAY frame will be issued by this 4679 * function. 4680 * 4681 * If the DATA frame is allowed, returns 0. 4682 * 4683 * This function returns 0 if it succeeds, or one of the following 4684 * negative error codes: 4685 * 4686 * ErrorCode.IGN_PAYLOAD 4687 * The reception of DATA frame is connection error; or should be 4688 * ignored. 4689 */ 4690 ErrorCode onDataFailFast() 4691 { 4692 ErrorCode rv; 4693 Stream stream; 4694 int stream_id; 4695 string failure_reason; 4696 FrameError error_code = FrameError.PROTOCOL_ERROR; 4697 4698 stream_id = iframe.frame.hd.stream_id; 4699 4700 if (stream_id == 0) { 4701 /* The spec says that if a DATA frame is received whose stream ID 4702 is 0, the recipient MUST respond with a connection error of 4703 type PROTOCOL_ERROR. */ 4704 failure_reason = "DATA: stream_id == 0"; 4705 goto fail; 4706 } 4707 stream = getStream(stream_id); 4708 if (!stream) { 4709 if (idleStreamDetect(stream_id)) 4710 { 4711 failure_reason = "DATA: stream in idle"; 4712 error_code = FrameError.STREAM_CLOSED; 4713 goto fail; 4714 } 4715 return ErrorCode.IGN_PAYLOAD; 4716 } 4717 if (stream.shutFlags & ShutdownFlag.RD) { 4718 failure_reason = "DATA: stream in half-closed(remote)"; 4719 error_code = FrameError.STREAM_CLOSED; 4720 goto fail; 4721 } 4722 4723 if (isMyStreamId(stream_id)) { 4724 if (stream.state == StreamState.CLOSING) { 4725 return ErrorCode.IGN_PAYLOAD; 4726 } 4727 if (stream.state != StreamState.OPENED) { 4728 failure_reason = "DATA: stream not opened"; 4729 goto fail; 4730 } 4731 return ErrorCode.OK; 4732 } 4733 if (stream.state == StreamState.RESERVED) { 4734 failure_reason = "DATA: stream in reserved"; 4735 goto fail; 4736 } 4737 if (stream.state == StreamState.CLOSING) { 4738 return ErrorCode.IGN_PAYLOAD; 4739 } 4740 return ErrorCode.OK; 4741 fail: 4742 rv = terminateSessionWithReason(error_code, failure_reason); 4743 if (isFatal(rv)) { 4744 return rv; 4745 } 4746 return ErrorCode.IGN_PAYLOAD; 4747 } 4748 4749 4750 ErrorCode afterHeaderBlockReceived() 4751 { 4752 ErrorCode rv; 4753 bool call_cb = true; 4754 Frame* frame = &iframe.frame; 4755 Stream stream; 4756 4757 /* We don't call Connector.onFrame if stream has been closed already or being closed. */ 4758 stream = getStream(frame.hd.stream_id); 4759 if (!stream || stream.state == StreamState.CLOSING) 4760 { 4761 return ErrorCode.OK; 4762 } 4763 4764 if (isHTTPMessagingEnabled()) { 4765 if (frame.hd.type == FrameType.PUSH_PROMISE) { 4766 Stream subject_stream; 4767 4768 subject_stream = getStream(frame.push_promise.promised_stream_id); 4769 if (subject_stream) { 4770 if (!subject_stream.onRequestHeaders(*frame)) 4771 rv = ErrorCode.ERROR; 4772 } 4773 } else { 4774 assert(frame.hd.type == FrameType.HEADERS); 4775 with(HeadersCategory) switch (frame.headers.cat) { 4776 case REQUEST: 4777 if (!stream.onRequestHeaders(*frame)) 4778 rv = ErrorCode.ERROR; 4779 break; 4780 case RESPONSE: 4781 case PUSH_RESPONSE: 4782 if (!stream.onResponseHeaders()) 4783 rv = ErrorCode.ERROR; 4784 break; 4785 case HEADERS: 4786 if (stream.httpFlags & HTTPFlags.EXPECT_FINAL_RESPONSE) { 4787 assert(!is_server); 4788 if (!stream.onResponseHeaders()) 4789 rv = ErrorCode.ERROR; 4790 } else { 4791 if (!stream.validateTrailerHeaders(*frame)) 4792 rv = ErrorCode.ERROR; 4793 } 4794 break; 4795 default: 4796 assert(0); 4797 } 4798 if (rv == 0 && (frame.hd.flags & FrameFlags.END_STREAM)) { 4799 if (!stream.validateRemoteEndStream()) 4800 rv = ErrorCode.ERROR; 4801 } 4802 } 4803 if (rv != ErrorCode.OK) { 4804 int stream_id; 4805 4806 if (frame.hd.type == FrameType.PUSH_PROMISE) { 4807 stream_id = frame.push_promise.promised_stream_id; 4808 } else { 4809 stream_id = frame.hd.stream_id; 4810 } 4811 4812 call_cb = false; 4813 4814 handleInvalidStream2(stream_id, *frame, FrameError.PROTOCOL_ERROR); 4815 } 4816 } 4817 4818 if (call_cb) { 4819 bool ok = callOnFrame(*frame); 4820 if (!ok) 4821 return ErrorCode.CALLBACK_FAILURE; 4822 } 4823 4824 if (frame.hd.type != FrameType.HEADERS) { 4825 return ErrorCode.OK; 4826 } 4827 4828 switch (frame.headers.cat) { 4829 case HeadersCategory.REQUEST: 4830 endRequestHeadersReceived(*frame, stream); 4831 return ErrorCode.OK; 4832 case HeadersCategory.RESPONSE: 4833 case HeadersCategory.PUSH_RESPONSE: 4834 return endResponseHeadersReceived(*frame, stream); 4835 case HeadersCategory.HEADERS: 4836 return endHeadersReceived(*frame, stream); 4837 default: 4838 assert(0); 4839 } 4840 } 4841 4842 ErrorCode processHeadersFrame() 4843 { 4844 Frame* frame = &iframe.frame; 4845 Stream stream; 4846 4847 frame.headers.unpack(iframe.sbuf[]); 4848 4849 stream = getStream(frame.hd.stream_id); 4850 if (!stream) { 4851 frame.headers.cat = HeadersCategory.REQUEST; 4852 return onRequestHeaders(*frame); 4853 } 4854 4855 if (isMyStreamId(frame.hd.stream_id)) 4856 { 4857 if (stream.state == StreamState.OPENING) { 4858 frame.headers.cat = HeadersCategory.RESPONSE; 4859 return onResponseHeaders(*frame, stream); 4860 } 4861 frame.headers.cat = HeadersCategory.HEADERS; 4862 return onHeaders(*frame, stream); 4863 } 4864 if (stream.state == StreamState.RESERVED) { 4865 frame.headers.cat = HeadersCategory.PUSH_RESPONSE; 4866 return onPushResponseHeaders(*frame, stream); 4867 } 4868 frame.headers.cat = HeadersCategory.HEADERS; 4869 return onHeaders(*frame, stream); 4870 } 4871 4872 ErrorCode processPriorityFrame() 4873 { 4874 Frame* frame = &iframe.frame; 4875 4876 frame.priority.unpack(iframe.sbuf[]); 4877 4878 return onPriority(*frame); 4879 } 4880 4881 ErrorCode processRstStreamFrame() 4882 { 4883 Frame* frame = &iframe.frame; 4884 4885 frame.rst_stream.unpack(iframe.sbuf[]); 4886 4887 return onRstStream(*frame); 4888 } 4889 4890 4891 ErrorCode processSettingsFrame() 4892 { 4893 Frame* frame = &iframe.frame; 4894 size_t i; 4895 Setting min_header_size_entry; 4896 min_header_size_entry = iframe.iva[INBOUND_NUM_IV - 1]; 4897 4898 if (min_header_size_entry.value < uint.max) { 4899 /* If we have less value, then we must have Setting.HEADER_TABLE_SIZE in i < iframe.niv */ 4900 for (i = 0; i < iframe.niv; ++i) { 4901 if (iframe.iva[i].id == Setting.HEADER_TABLE_SIZE) { 4902 break; 4903 } 4904 } 4905 4906 assert(i < iframe.niv); 4907 4908 if (min_header_size_entry.value != iframe.iva[i].value) { 4909 iframe.iva[iframe.niv++] = iframe.iva[i]; 4910 iframe.iva[i] = min_header_size_entry; 4911 } 4912 } 4913 4914 frame.settings.unpack(iframe.iva[0 .. iframe.niv]); 4915 4916 return onSettings(*frame, false /* ACK */); 4917 } 4918 4919 ErrorCode processPushPromiseFrame() 4920 { 4921 Frame* frame = &iframe.frame; 4922 4923 frame.push_promise.unpack(iframe.sbuf[]); 4924 4925 return onPushPromise(*frame); 4926 } 4927 4928 ErrorCode processPingFrame() 4929 { 4930 Frame* frame = &iframe.frame; 4931 4932 frame.ping.unpack(iframe.sbuf[]); 4933 4934 return onPing(*frame); 4935 } 4936 4937 ErrorCode processGoAwayFrame() 4938 { 4939 Frame* frame = &iframe.frame; 4940 4941 frame.goaway.unpack(iframe.sbuf[], iframe.lbuf[]); 4942 4943 iframe.lbuf = Buffer(null); 4944 4945 return onGoAway(*frame); 4946 } 4947 4948 ErrorCode processWindowUpdateFrame() 4949 { 4950 Frame* frame = &iframe.frame; 4951 4952 frame.window_update.unpack(iframe.sbuf[]); 4953 4954 return onWindowUpdate(*frame); 4955 } 4956 4957 /* For errors, this function only returns FATAL error. */ 4958 ErrorCode processDataFrame() 4959 { 4960 ErrorCode rv; 4961 rv = onData(iframe.frame); 4962 if (isFatal(rv)) { 4963 return rv; 4964 } 4965 return ErrorCode.OK; 4966 } 4967 4968 ErrorCode handleInvalidStream(Frame frame, FrameError error_code) { 4969 4970 return handleInvalidStream2(frame.hd.stream_id, frame, error_code); 4971 } 4972 4973 ErrorCode handleInvalidStream2(int stream_id, Frame frame, FrameError error_code) { 4974 4975 addRstStream(stream_id, error_code); 4976 4977 try 4978 if (!connector.onInvalidFrame(frame, error_code)) 4979 return ErrorCode.CALLBACK_FAILURE; 4980 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 4981 4982 return ErrorCode.OK; 4983 } 4984 4985 ErrorCode handleInflateInvalidStream(Frame frame, FrameError error_code) { 4986 ErrorCode rv; 4987 rv = handleInvalidStream(frame, error_code); 4988 if (isFatal(rv)) { 4989 return rv; 4990 } 4991 return ErrorCode.IGN_HEADER_BLOCK; 4992 } 4993 4994 /* 4995 * Handles invalid frame which causes connection error. 4996 */ 4997 ErrorCode handleInvalidConnection(Frame frame, FrameError error_code, string reason) 4998 { 4999 try 5000 if (!connector.onInvalidFrame(frame, error_code, reason)) 5001 return ErrorCode.CALLBACK_FAILURE; 5002 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 5003 5004 return terminateSessionWithReason(error_code, reason); 5005 } 5006 5007 ErrorCode handleInflateInvalidConnection(Frame frame, FrameError error_code, string reason) { 5008 ErrorCode rv; 5009 rv = handleInvalidConnection(frame, error_code, reason); 5010 if (isFatal(rv)) { 5011 return rv; 5012 } 5013 return ErrorCode.IGN_HEADER_BLOCK; 5014 } 5015 5016 5017 /* Add padding to HEADERS or PUSH_PROMISE. We use frame.headers.padlen in this function 5018 * to use the fact that frame.push_promise has also padlen in the same position. */ 5019 ErrorCode headersAddPad(Frame frame) 5020 { 5021 ErrorCode rv; 5022 int padded_payloadlen; 5023 Buffers framebufs = aob.framebufs; 5024 int padlen; 5025 int max_payloadlen; 5026 5027 max_payloadlen = min(MAX_PAYLOADLEN, frame.hd.length + MAX_PADLEN); 5028 5029 padded_payloadlen = callSelectPadding(frame, max_payloadlen); 5030 5031 if (isFatal(padded_payloadlen)) { 5032 return cast(ErrorCode)padded_payloadlen; 5033 } 5034 5035 padlen = padded_payloadlen - frame.hd.length; 5036 5037 LOGF("send: padding selected: payloadlen=%d, padlen=%d", padded_payloadlen, padlen); 5038 5039 frame.hd.addPad(framebufs, padlen, false); 5040 5041 frame.headers.padlen = padlen; 5042 5043 return ErrorCode.OK; 5044 } 5045 5046 size_t estimateHeadersPayload(in HeaderField[] hfa, size_t additional) 5047 { 5048 return hd_deflater.upperBound(hfa) + additional; 5049 } 5050 5051 /* 5052 * Updates the remote initial window size of all active streams. If 5053 * error occurs, all streams may not be updated. 5054 * 5055 */ 5056 ErrorCode updateRemoteInitialWindowSize(int new_initial_window_size) 5057 { 5058 ErrorCode rv; 5059 auto new_window_size = new_initial_window_size; 5060 auto old_window_size = remote_settings.initial_window_size; 5061 5062 foreach (stream; streams) 5063 { 5064 5065 bool ok = stream.updateRemoteInitialWindowSize(new_window_size, old_window_size); 5066 if (!ok) 5067 return terminateSession(FrameError.FLOW_CONTROL_ERROR); 5068 5069 /* If window size gets positive, push deferred DATA frame to outbound queue. */ 5070 if (stream.remoteWindowSize > 0 && stream.isDeferredByFlowControl()) 5071 stream.resumeDeferredItem(StreamFlags.DEFERRED_FLOW_CONTROL, this); 5072 5073 } 5074 5075 return rv; 5076 } 5077 5078 5079 /* 5080 * Updates the local initial window size of all active streams. If 5081 * error occurs, all streams may not be updated. 5082 */ 5083 ErrorCode updateLocalInitialWindowSize(int new_initial_window_size, int old_initial_window_size) 5084 { 5085 ErrorCode rv; 5086 auto new_window_size = new_initial_window_size; 5087 auto old_window_size = old_initial_window_size; 5088 5089 foreach(stream; streams) { 5090 if (!stream.updateLocalInitialWindowSize(new_window_size, old_window_size)) 5091 return terminateSession(FrameError.FLOW_CONTROL_ERROR); 5092 5093 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) { 5094 5095 if (shouldSendWindowUpdate(stream.localWindowSize, stream.recvWindowSize)) { 5096 5097 addWindowUpdate(FrameFlags.NONE, stream.id, stream.recvWindowSize); 5098 stream.recvWindowSize = 0; 5099 } 5100 } 5101 } 5102 return rv; 5103 } 5104 5105 /* 5106 * Returns the number of active streams, which includes streams in 5107 * reserved state. 5108 */ 5109 size_t getNumActiveStreams() { 5110 return streams.length - num_closed_streams - num_idle_streams; 5111 } 5112 5113 /* Closes non-idle and non-closed streams whose stream ID > last_stream_id. 5114 * If incoming is nonzero, we are going to close incoming streams. 5115 * Otherwise, close outgoing streams. */ 5116 ErrorCode closeStreamOnGoAway(int last_stream_id, bool incoming) 5117 { 5118 ErrorCode rv; 5119 5120 foreach(stream; streams) { 5121 5122 if ((!isMyStreamId(stream.id) && !incoming) || (isMyStreamId(stream.id) && incoming)) 5123 continue; 5124 5125 if (stream.state != StreamState.IDLE && !(stream.flags & StreamFlags.CLOSED) && stream.id > last_stream_id) 5126 { 5127 rv = closeStream(stream.id, FrameError.REFUSED_STREAM); 5128 if (isFatal(rv)) 5129 return rv; 5130 } 5131 } 5132 5133 return rv; 5134 } 5135 5136 void cycleWeightOutboundItem(OutboundItem item, int ini_weight) 5137 { 5138 if (item.weight == MIN_WEIGHT || item.weight > ini_weight) { 5139 5140 item.weight = ini_weight; 5141 5142 if (item.cycle == last_cycle) { 5143 item.cycle = ++last_cycle; 5144 } else { 5145 item.cycle = last_cycle; 5146 } 5147 } else { 5148 --item.weight; 5149 } 5150 } 5151 5152 /* 5153 * This function serializes frame for transmission. 5154 * 5155 * This function returns 0 if it succeeds, or one of negative error 5156 * codes, including both fatal and non-fatal ones. 5157 */ 5158 ErrorCode prepareFrame(OutboundItem item) 5159 { 5160 ErrorCode rv; 5161 Frame* frame = &item.frame; 5162 if (frame.hd.type != FrameType.DATA) { 5163 with(FrameType) switch (frame.hd.type) { 5164 case HEADERS: { 5165 HeadersAuxData *aux_data; 5166 size_t estimated_payloadlen; 5167 5168 aux_data = &item.aux_data.headers; 5169 5170 estimated_payloadlen = estimateHeadersPayload(frame.headers.hfa, PRIORITY_SPECLEN); 5171 5172 if (estimated_payloadlen > MAX_HEADERSLEN) { 5173 return ErrorCode.FRAME_SIZE_ERROR; 5174 } 5175 5176 if (frame.headers.cat == HeadersCategory.REQUEST) { 5177 /* initial HEADERS, which opens stream */ 5178 Stream stream = openStream(frame.hd.stream_id, StreamFlags.NONE, frame.headers.pri_spec, StreamState.INITIAL, aux_data.stream_user_data); 5179 5180 rv = predicateRequestHeadersSend(item); 5181 if (rv != ErrorCode.OK) { 5182 return rv; 5183 } 5184 5185 if (isHTTPMessagingEnabled()) { 5186 stream.setRequestMethod(*frame); 5187 } 5188 } else { 5189 Stream stream = getStream(frame.hd.stream_id); 5190 5191 if (predicatePushResponseHeadersSend(stream) == 0) 5192 { 5193 frame.headers.cat = HeadersCategory.PUSH_RESPONSE; 5194 if (aux_data.stream_user_data) 5195 stream.userData = aux_data.stream_user_data; 5196 } else if (predicateResponseHeadersSend(stream) == 0) { 5197 frame.headers.cat = HeadersCategory.RESPONSE; 5198 } else { 5199 frame.headers.cat = HeadersCategory.HEADERS; 5200 5201 rv = predicateHeadersSend(stream); 5202 5203 if (rv != ErrorCode.OK) { 5204 if (stream && stream.item == item) 5205 stream.detachItem(this); 5206 return rv; 5207 } 5208 } 5209 } 5210 5211 rv = frame.headers.pack(aob.framebufs, hd_deflater); 5212 5213 if (rv != ErrorCode.OK) { 5214 return rv; 5215 } 5216 5217 LOGF("send: before padding, HEADERS serialized in %d bytes", aob.framebufs.length); 5218 5219 rv = headersAddPad(*frame); 5220 5221 if (rv != ErrorCode.OK) { 5222 return rv; 5223 } 5224 5225 LOGF("send: HEADERS finally serialized in %d bytes", aob.framebufs.length); 5226 5227 break; 5228 } 5229 case PRIORITY: { 5230 if (isClosing()) { 5231 return ErrorCode.SESSION_CLOSING; 5232 } 5233 /* PRIORITY frame can be sent at any time and to any stream ID. */ 5234 frame.priority.pack(aob.framebufs); 5235 5236 /* Peer can send PRIORITY frame against idle stream to create 5237 "anchor" in dependency tree. Only client can do this in 5238 libhttp2. In libhttp2, only server retains non-active (closed 5239 or idle) streams in memory, so we don't open stream here. */ 5240 break; 5241 } 5242 case RST_STREAM: 5243 if (isClosing()) { 5244 return ErrorCode.SESSION_CLOSING; 5245 } 5246 frame.rst_stream.pack(aob.framebufs); 5247 break; 5248 case SETTINGS: { 5249 rv = frame.settings.pack(aob.framebufs); 5250 if (rv != ErrorCode.OK) { 5251 return rv; 5252 } 5253 break; 5254 } 5255 case PUSH_PROMISE: { 5256 Stream stream; 5257 HeadersAuxData *aux_data; 5258 PrioritySpec pri_spec; 5259 size_t estimated_payloadlen; 5260 5261 aux_data = &item.aux_data.headers; 5262 5263 stream = getStream(frame.hd.stream_id); 5264 5265 /* stream could be null if associated stream was already closed. */ 5266 if (stream) 5267 pri_spec = PrioritySpec(stream.id, DEFAULT_WEIGHT, 0); 5268 5269 openStream(frame.push_promise.promised_stream_id, StreamFlags.NONE, pri_spec, StreamState.RESERVED, aux_data.stream_user_data); 5270 5271 estimated_payloadlen = estimateHeadersPayload(frame.push_promise.hfa, 0); 5272 5273 if (estimated_payloadlen > MAX_HEADERSLEN) 5274 return ErrorCode.FRAME_SIZE_ERROR; 5275 5276 /* predicte should fail if stream is null. */ 5277 rv = predicatePushPromiseSend(stream); 5278 if (rv != ErrorCode.OK) { 5279 return rv; 5280 } 5281 5282 assert(stream); 5283 5284 rv = frame.push_promise.pack(aob.framebufs, hd_deflater); 5285 if (rv != 0) 5286 return rv; 5287 5288 rv = headersAddPad(*frame); 5289 if (rv != 0) 5290 return rv; 5291 5292 break; 5293 } 5294 case PING: 5295 if (isClosing()) { 5296 return ErrorCode.SESSION_CLOSING; 5297 } 5298 frame.ping.pack(aob.framebufs); 5299 break; 5300 case WINDOW_UPDATE: { 5301 rv = predicateWindowUpdateSend(frame.hd.stream_id); 5302 if (rv != ErrorCode.OK) { 5303 return rv; 5304 } 5305 frame.window_update.pack(aob.framebufs); 5306 break; 5307 } 5308 case GOAWAY: 5309 rv = frame.goaway.pack(aob.framebufs); 5310 if (rv != ErrorCode.OK) { 5311 return rv; 5312 } 5313 local_last_stream_id = frame.goaway.last_stream_id; 5314 5315 break; 5316 default: 5317 return ErrorCode.INVALID_ARGUMENT; 5318 } 5319 return ErrorCode.OK; 5320 } else { 5321 int next_readmax; 5322 Stream stream = getStream(frame.hd.stream_id); 5323 5324 if (stream) { 5325 assert(stream.item == item); 5326 } 5327 5328 rv = predicateDataSend(stream); 5329 if (rv != ErrorCode.OK) { 5330 if (stream) 5331 stream.detachItem(this); 5332 5333 return rv; 5334 } 5335 /* Assuming stream is not null */ 5336 assert(stream); 5337 next_readmax = nextDataRead(stream); 5338 5339 if (next_readmax == 0) { 5340 5341 /* This must be true since we only pop DATA frame item from queue when session.remote_window_size > 0 */ 5342 assert(remote_window_size > 0); 5343 5344 stream.deferItem(StreamFlags.DEFERRED_FLOW_CONTROL, this); 5345 aob.item = null; 5346 aob.reset(); 5347 return ErrorCode.DEFERRED; 5348 } 5349 5350 rv = packData(aob.framebufs, next_readmax, *frame, item.aux_data.data); 5351 if (rv == ErrorCode.DEFERRED) { 5352 stream.deferItem(StreamFlags.DEFERRED_USER, this); 5353 aob.item = null; 5354 aob.reset(); 5355 return ErrorCode.DEFERRED; 5356 } 5357 if (rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) { 5358 stream.detachItem(this); 5359 addRstStream(frame.hd.stream_id, FrameError.INTERNAL_ERROR); 5360 return ErrorCode.TEMPORAL_CALLBACK_FAILURE; 5361 } 5362 if (rv != 0) { 5363 stream.detachItem(this); 5364 return rv; 5365 } 5366 return ErrorCode.OK; 5367 } 5368 } 5369 5370 /* 5371 * Called after a frame is sent. This function runs 5372 * $(D Connector.onFrameSent) and handles stream closure upon END_STREAM 5373 * or RST_STREAM. This function does not reset aob. It is a 5374 * responsibility of $(D resetActiveOutboundItem). 5375 * 5376 * This function returns 0 if it succeeds, or one of the following 5377 * negative error codes: 5378 * 5379 * ErrorCode.CALLBACK_FAILURE 5380 * The callback function failed. 5381 */ 5382 ErrorCode afterFrameSent() 5383 { 5384 ErrorCode rv; 5385 OutboundItem item = aob.item; 5386 Buffers framebufs = aob.framebufs; 5387 Frame* frame = &item.frame; 5388 5389 if (frame.hd.type != FrameType.DATA) { 5390 5391 if (frame.hd.type == FrameType.HEADERS || frame.hd.type == FrameType.PUSH_PROMISE) { 5392 5393 if (framebufs.nextPresent()) { 5394 LOGF("send: CONTINUATION exists, just return"); 5395 return ErrorCode.OK; 5396 } 5397 } 5398 bool ok = callOnFrameSent(*frame); 5399 if (!ok) { 5400 return ErrorCode.CALLBACK_FAILURE; 5401 } 5402 with(FrameType) switch (frame.hd.type) { 5403 case HEADERS: { 5404 HeadersAuxData *aux_data; 5405 Stream stream = getStream(frame.hd.stream_id); 5406 if (!stream) 5407 break; 5408 if (stream.item == item) 5409 stream.detachItem(this); 5410 5411 final switch (frame.headers.cat) { 5412 case HeadersCategory.REQUEST: { 5413 stream.state = StreamState.OPENING; 5414 if (frame.hd.flags & FrameFlags.END_STREAM) { 5415 stream.shutdown(ShutdownFlag.WR); 5416 } 5417 rv = closeStreamIfShutRdWr(stream); 5418 if (isFatal(rv)) { 5419 return rv; 5420 } 5421 /* We assume aux_data is a pointer to HeadersAuxData */ 5422 aux_data = &item.aux_data.headers; 5423 if (aux_data.data_prd) { 5424 /* submitData() makes a copy of aux_data.data_prd */ 5425 rv = submitData(this, FrameFlags.END_STREAM, frame.hd.stream_id, aux_data.data_prd); 5426 if (isFatal(rv)) { 5427 return rv; 5428 } 5429 /* TODO: submitData() may fail if stream has already DATA frame item. We might have to handle it here. */ 5430 } 5431 break; 5432 } 5433 case HeadersCategory.PUSH_RESPONSE: 5434 stream.flags = cast(StreamFlags)(stream.flags & ~cast(int)StreamFlags.PUSH); 5435 ++num_outgoing_streams; 5436 goto case HeadersCategory.RESPONSE; 5437 case HeadersCategory.RESPONSE: 5438 stream.state = StreamState.OPENED; 5439 goto case HeadersCategory.HEADERS; 5440 case HeadersCategory.HEADERS: 5441 if (frame.hd.flags & FrameFlags.END_STREAM) { 5442 stream.shutdown(ShutdownFlag.WR); 5443 } 5444 rv = closeStreamIfShutRdWr(stream); 5445 if (isFatal(rv)) { 5446 return rv; 5447 } 5448 /* We assume aux_data is a pointer to HeadersAuxData */ 5449 aux_data = &item.aux_data.headers; 5450 if (aux_data.data_prd) { 5451 rv = submitData(this, FrameFlags.END_STREAM, frame.hd.stream_id, aux_data.data_prd); 5452 if (isFatal(rv)) { 5453 return rv; 5454 } 5455 /* TODO submitData() may fail if stream has already DATA frame item. 5456 * We might have to handle it here. */ 5457 } 5458 break; 5459 } 5460 break; 5461 } 5462 case PRIORITY: { 5463 Stream stream; 5464 5465 if (is_server) { 5466 break; 5467 } 5468 5469 stream = getStreamRaw(frame.hd.stream_id); 5470 5471 if (!stream) { 5472 break; 5473 } 5474 5475 reprioritizeStream(stream, frame.priority.pri_spec); 5476 5477 break; 5478 } 5479 case RST_STREAM: 5480 rv = closeStream(frame.hd.stream_id, frame.rst_stream.error_code); 5481 if (isFatal(rv)) { 5482 return rv; 5483 } 5484 break; 5485 case GOAWAY: { 5486 GoAwayAuxData aux_data = item.aux_data.goaway; 5487 5488 if ((aux_data.flags & GoAwayAuxFlags.SHUTDOWN_NOTICE) == 0) { 5489 5490 if (aux_data.flags & GoAwayAuxFlags.TERM_ON_SEND) { 5491 goaway_flags |= GoAwayFlags.TERM_SENT; 5492 } 5493 5494 goaway_flags |= GoAwayFlags.SENT; 5495 5496 rv = closeStreamOnGoAway(frame.goaway.last_stream_id, true); 5497 5498 if (isFatal(rv)) { 5499 return rv; 5500 } 5501 } 5502 5503 break; 5504 } 5505 default: 5506 break; 5507 } 5508 5509 return ErrorCode.OK; 5510 } 5511 5512 Stream stream = getStream(frame.hd.stream_id); 5513 DataAuxData *aux_data = &item.aux_data.data; 5514 /* We update flow control window after a frame was completely 5515 sent. This is possible because we choose payload length not to 5516 exceed the window */ 5517 remote_window_size -= frame.hd.length; 5518 if (stream) { 5519 stream.remoteWindowSize = stream.remoteWindowSize - frame.hd.length; 5520 } 5521 5522 if (stream && aux_data.eof) { 5523 stream.detachItem(this); 5524 5525 /* Call onFrameSent after detachItem(), so that application can issue submitData() in the callback. */ 5526 bool ok = callOnFrameSent(*frame); 5527 if (!ok) { 5528 return ErrorCode.CALLBACK_FAILURE; 5529 } 5530 5531 if (frame.hd.flags & FrameFlags.END_STREAM) { 5532 int stream_closed; 5533 5534 stream_closed = (stream.shutFlags & ShutdownFlag.RDWR) == ShutdownFlag.RDWR; 5535 5536 stream.shutdown(ShutdownFlag.WR); 5537 5538 rv = closeStreamIfShutRdWr(stream); 5539 if (isFatal(rv)) { 5540 return rv; 5541 } 5542 /* stream may be null if it was closed */ 5543 if (stream_closed) 5544 stream = null; 5545 } 5546 return ErrorCode.OK; 5547 } 5548 5549 bool ok = callOnFrameSent(*frame); 5550 5551 if (!ok) { 5552 return ErrorCode.CALLBACK_FAILURE; 5553 } 5554 5555 return ErrorCode.OK; 5556 } 5557 5558 /* 5559 * Called after a frame is sent and after $(D afterFrameSent). 5560 * This function is responsible for resetting aob. 5561 * 5562 * This function returns 0 if it succeeds, or one of the following 5563 * negative error codes: 5564 * 5565 * ErrorCode.CALLBACK_FAILURE 5566 * The callback function failed. 5567 */ 5568 ErrorCode resetActiveOutboundItem() 5569 { 5570 ErrorCode rv; 5571 OutboundItem item = aob.item; 5572 Buffers framebufs = aob.framebufs; 5573 Frame* frame = &item.frame; 5574 5575 if (frame.hd.type != FrameType.DATA) { 5576 5577 if (frame.hd.type == FrameType.HEADERS || 5578 frame.hd.type == FrameType.PUSH_PROMISE) { 5579 5580 if (framebufs.nextPresent()) { 5581 framebufs.cur = framebufs.cur.next; 5582 5583 LOGF("send: next CONTINUATION frame, %d bytes", framebufs.cur.buf.length); 5584 5585 return ErrorCode.OK; 5586 } 5587 } 5588 5589 aob.reset(); 5590 5591 return ErrorCode.OK; 5592 5593 } 5594 5595 OutboundItem next_item; 5596 Stream stream; 5597 DataAuxData* aux_data = &item.aux_data.data; 5598 5599 /* On EOF, we have already detached data. Please note that 5600 application may issue submitData() in 5601 $(D Connector.onFrameSent) (call from afterFrameSent), 5602 which attach data to stream. We don't want to detach it. */ 5603 if (aux_data.eof) { 5604 aob.reset(); 5605 return ErrorCode.OK; 5606 } 5607 5608 aux_data.no_copy = false; 5609 5610 stream = getStream(frame.hd.stream_id); 5611 5612 /* If Session is closed or RST_STREAM was queued, we won't send further data. */ 5613 if (predicateDataSend(stream) != 0) { 5614 if (stream) 5615 stream.detachItem(this); 5616 aob.reset(); 5617 5618 return ErrorCode.OK; 5619 } 5620 5621 /* Assuming stream is not null */ 5622 assert(stream); 5623 next_item = getNextOutboundItem(); 5624 5625 /* Imagine we hit connection window size limit while sending DATA 5626 frame. If we decrement weight here, its stream might get 5627 inferior share because the other streams' weight is not 5628 decremented because of flow control. */ 5629 if (remote_window_size > 0 || stream.remoteWindowSize <= 0) { 5630 cycleWeightOutboundItem(aob.item, stream.effectiveWeight); 5631 } 5632 5633 /* If priority of this stream is higher or equal to other stream 5634 waiting at the top of the queue, we continue to send this 5635 data. */ 5636 if (stream.dpri == StreamDPRI.TOP && (!next_item || PriorityQueue.compare(item, next_item) < 0)) 5637 { 5638 int next_readmax = nextDataRead(stream); 5639 5640 if (next_readmax == 0) { 5641 5642 if (remote_window_size == 0 && stream.remoteWindowSize > 0) { 5643 5644 /* If DATA cannot be sent solely due to connection level 5645 window size, just push item to queue again. We never pop 5646 DATA item while connection level window size is 0. */ 5647 ob_da_pq.push(aob.item); 5648 5649 if (isFatal(rv)) { 5650 return rv; 5651 } 5652 5653 aob.item.queued = 1; 5654 } else 5655 stream.deferItem(StreamFlags.DEFERRED_FLOW_CONTROL, this); 5656 5657 aob.item = null; 5658 aob.reset(); 5659 5660 return ErrorCode.OK; 5661 } 5662 5663 framebufs.reset(); 5664 5665 rv = packData(framebufs, next_readmax, *frame, item.aux_data.data); 5666 if (isFatal(rv)) { 5667 return rv; 5668 } 5669 5670 if (rv == ErrorCode.DEFERRED) { 5671 stream.deferItem(StreamFlags.DEFERRED_USER, this); 5672 5673 aob.item = null; 5674 aob.reset(); 5675 5676 return ErrorCode.OK; 5677 } 5678 if (rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) 5679 { 5680 /* Stop DATA frame chain and issue RST_STREAM to close the stream. We don't return ErrorCode.TEMPORAL_CALLBACK_FAILURE intentionally. */ 5681 addRstStream(frame.hd.stream_id, FrameError.INTERNAL_ERROR); 5682 stream.detachItem(this); 5683 aob.reset(); 5684 return ErrorCode.OK; 5685 } 5686 5687 assert(rv == 0); 5688 5689 if (aux_data.no_copy) 5690 aob.state = OutboundState.SEND_NO_COPY; 5691 else 5692 aob.state = OutboundState.SEND_DATA; 5693 return ErrorCode.OK; 5694 } 5695 5696 if (stream.dpri == StreamDPRI.TOP) { 5697 ob_da_pq.push(aob.item); 5698 5699 aob.item.queued = true; 5700 } 5701 5702 aob.item = null; 5703 aob.reset(); 5704 return ErrorCode.OK; 5705 } 5706 5707 // fetch data and feed it to data_arr 5708 ErrorCode memSendInternal(ref ubyte[] data_arr, bool fast_cb) 5709 { 5710 ErrorCode rv; 5711 Buffers framebufs = aob.framebufs; 5712 5713 data_arr = null; 5714 5715 for (;;) { 5716 final switch (aob.state) { 5717 case OutboundState.POP_ITEM: { 5718 OutboundItem item; 5719 5720 item = popNextOutboundItem(); 5721 if (!item) { 5722 return ErrorCode.OK; 5723 } 5724 5725 if (item.frame.hd.type == FrameType.DATA || item.frame.hd.type == FrameType.HEADERS) { 5726 Frame* frame = &item.frame; 5727 Stream stream = getStream(frame.hd.stream_id); 5728 5729 if (stream && item == stream.item && stream.dpri != StreamDPRI.TOP) { 5730 // We have DATA with higher priority in queue within the same dependency tree. 5731 break; 5732 } 5733 } 5734 5735 rv = prepareFrame(item); 5736 if (rv == ErrorCode.DEFERRED) { 5737 LOGF("send: frame transmission deferred"); 5738 break; 5739 } 5740 5741 if (rv < 0) { 5742 int opened_stream_id; 5743 FrameError error_code = FrameError.INTERNAL_ERROR; 5744 import libhttp2.types : toString; 5745 LOGF("send: frame preparation failed with %s", toString(cast(ErrorCode)rv)); 5746 /* TODO: If the error comes from compressor, the connection must be closed. */ 5747 if (item.frame.hd.type != FrameType.DATA && !isFatal(rv)) { 5748 Frame* frame = &item.frame; 5749 /* The library is responsible for the transmission of WINDOW_UPDATE frame, so we don't call error callback for it. */ 5750 try if (frame.hd.type != FrameType.WINDOW_UPDATE && !connector.onFrameFailure(*frame, rv)) 5751 { 5752 item.free(); 5753 Mem.free(item); 5754 return ErrorCode.CALLBACK_FAILURE; 5755 } catch (Exception e) { 5756 item.free(); 5757 Mem.free(item); 5758 return ErrorCode.CALLBACK_FAILURE; 5759 } 5760 } 5761 5762 /* We have to close stream opened by failed request HEADERS or PUSH_PROMISE. */ 5763 switch (item.frame.hd.type) { 5764 case FrameType.HEADERS: 5765 if (item.frame.headers.cat == HeadersCategory.REQUEST) { 5766 opened_stream_id = item.frame.hd.stream_id; 5767 if (item.aux_data.headers.canceled) { 5768 error_code = item.aux_data.headers.error_code; 5769 } 5770 } 5771 break; 5772 case FrameType.PUSH_PROMISE: 5773 opened_stream_id = item.frame.push_promise.promised_stream_id; 5774 break; 5775 5776 default: break; 5777 } 5778 if (opened_stream_id) { 5779 /* careful not to override rv */ 5780 ErrorCode rv2; 5781 rv2 = closeStream(opened_stream_id, error_code); 5782 5783 if (isFatal(rv2)) { 5784 return rv2; 5785 } 5786 } 5787 5788 item.free(); 5789 Mem.free(item); 5790 aob.reset(); 5791 5792 if (rv == ErrorCode.HEADER_COMP) { 5793 /* If header compression error occurred, should terminiate connection. */ 5794 rv = terminateSession(FrameError.INTERNAL_ERROR); 5795 } 5796 if (isFatal(rv)) { 5797 return rv; 5798 } 5799 break; 5800 } 5801 5802 aob.item = item; 5803 5804 framebufs.rewind(); 5805 5806 if (item.frame.hd.type != FrameType.DATA) { 5807 Frame* frame = &item.frame; 5808 5809 LOGF("send: next frame: payloadlen=%d, type=%u, flags=0x%02x, stream_id=%d", 5810 frame.hd.length, frame.hd.type, frame.hd.flags, 5811 frame.hd.stream_id); 5812 5813 bool ok = callOnFrameReady(*frame); 5814 if (!ok) { 5815 return ErrorCode.CALLBACK_FAILURE; 5816 } 5817 } else { 5818 LOGF("send: next frame: DATA"); 5819 5820 if (item.aux_data.data.no_copy) 5821 { 5822 aob.state = OutboundState.SEND_NO_COPY; 5823 break; 5824 } 5825 } 5826 5827 LOGF("send: start transmitting frame type=%u, length=%d", 5828 framebufs.cur.buf.pos[3], 5829 framebufs.cur.buf.last - framebufs.cur.buf.pos); 5830 5831 aob.state = OutboundState.SEND_DATA; 5832 5833 break; 5834 } 5835 case OutboundState.SEND_DATA: { 5836 size_t datalen; 5837 Buffer* buf = &framebufs.cur.buf; 5838 5839 if (buf.pos is buf.last) { 5840 LOGF("send: end transmission of a frame"); 5841 5842 /* Frame has completely sent */ 5843 if (fast_cb) { 5844 rv = resetActiveOutboundItem(); 5845 } else { 5846 rv = afterFrameSent(); 5847 if (rv < 0) { 5848 /* FATAL */ 5849 assert(isFatal(rv)); 5850 return rv; 5851 } 5852 rv = resetActiveOutboundItem(); 5853 } 5854 if (rv < 0) { 5855 /* FATAL */ 5856 assert(isFatal(rv)); 5857 return rv; 5858 } 5859 /* We have already adjusted the next state */ 5860 break; 5861 } 5862 5863 datalen = buf.length; 5864 data_arr = buf.pos[0 .. datalen]; 5865 5866 /* We increment the offset here. If write() does not send everything, we will adjust it. */ 5867 buf.pos += datalen; 5868 5869 return ErrorCode.OK; 5870 } 5871 case OutboundState.SEND_NO_COPY: 5872 { 5873 LOGF("send: no copy DATA\n"); 5874 5875 Frame* frame = &aob.item.frame; 5876 Stream stream = getStream(frame.hd.stream_id); 5877 5878 5879 if (!stream) { 5880 LOGF("send: no copy DATA cancelled because stream was closed\n"); 5881 aob.reset(); 5882 break; 5883 } 5884 5885 rv = callWriteData(aob.item, framebufs); 5886 if (isFatal(rv)) { 5887 return rv; 5888 } 5889 5890 if (rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) { 5891 stream.detachItem(this); 5892 5893 addRstStream(frame.hd.stream_id, FrameError.INTERNAL_ERROR); 5894 5895 aob.reset(); 5896 5897 break; 5898 } 5899 5900 if (rv == ErrorCode.WOULDBLOCK) { 5901 return ErrorCode.OK; 5902 } 5903 5904 assert(rv == ErrorCode.OK); 5905 5906 rv = afterFrameSent(); 5907 if (rv < 0) { 5908 assert(isFatal(rv)); 5909 return rv; 5910 } 5911 rv = resetActiveOutboundItem(); 5912 if (rv < 0) { 5913 assert(isFatal(rv)); 5914 return rv; 5915 } 5916 5917 /* We have already adjusted the next state */ 5918 5919 break; 5920 } 5921 } 5922 } 5923 } 5924 5925 /* 5926 * Inflates header block in the memory pointed by |input| with |input.length| 5927 * bytes. If this function returns ErrorCode.PAUSE, the caller must 5928 * call this function again, until it returns 0 or one of negative 5929 * error code. If |call_header_cb| is zero, the on_header_callback 5930 * are not invoked and the function never return ErrorCode.PAUSE. If 5931 * the given |in| is the last chunk of header block, the |final| must 5932 * be nonzero. If header block is successfully processed (which is 5933 * indicated by the return value 0, ErrorCode.PAUSE or 5934 * ErrorCode.TEMPORAL_CALLBACK_FAILURE), the number of processed 5935 * input bytes is assigned to the |*readlen_ptr|. 5936 * 5937 * This function return 0 if it succeeds, or one of the negative error 5938 * codes: 5939 * 5940 * ErrorCode.CALLBACK_FAILURE 5941 * The callback function failed. 5942 * ErrorCode.TEMPORAL_CALLBACK_FAILURE 5943 * The callback returns this error code, indicating that this 5944 * stream should be RST_STREAMed.. 5945 * ErrorCode.PAUSE 5946 * The callback function returned ErrorCode.PAUSE 5947 * ErrorCode.HEADER_COMP 5948 * Header decompression failed 5949 */ 5950 ErrorCode inflateHeaderBlock(Frame frame, ref size_t readlen_ref, ubyte[] input, bool is_final, bool call_header_cb) 5951 { 5952 int proclen; 5953 ErrorCode rv; 5954 InflateFlag inflate_flag; 5955 HeaderField hf; 5956 Stream stream; 5957 Stream subject_stream; 5958 bool trailer; 5959 5960 readlen_ref = 0; 5961 stream = getStream(frame.hd.stream_id); 5962 5963 if (frame.hd.type == FrameType.PUSH_PROMISE) { 5964 subject_stream = getStream(frame.push_promise.promised_stream_id); 5965 } else { 5966 subject_stream = stream; 5967 trailer = isTrailerHeaders(stream, frame); 5968 } 5969 5970 LOGF("recv: decoding header block %d bytes", input.length); 5971 size_t inlen = input.length; 5972 ubyte* inptr = input.ptr; 5973 for (;;) { 5974 inflate_flag = InflateFlag.NONE; 5975 proclen = hd_inflater.inflate(hf, inflate_flag, inptr[0 .. inlen], is_final); 5976 5977 if (isFatal(cast(int)proclen)) { 5978 return cast(ErrorCode)proclen; 5979 } 5980 5981 if (proclen < 0) { 5982 if (iframe.state == InboundState.READ_HEADER_BLOCK) 5983 { 5984 if (stream && stream.state != StreamState.CLOSING) 5985 { 5986 /* Adding RST_STREAM here is very important. It prevents 5987 from invoking subsequent callbacks for the same stream ID. */ 5988 addRstStream(frame.hd.stream_id, FrameError.COMPRESSION_ERROR); 5989 5990 } 5991 } 5992 rv = terminateSession(FrameError.COMPRESSION_ERROR); 5993 if (isFatal(rv)) { 5994 return rv; 5995 } 5996 5997 return ErrorCode.HEADER_COMP; 5998 } 5999 6000 inptr += proclen; 6001 inlen -= proclen; 6002 readlen_ref += proclen; 6003 6004 LOGF("recv: proclen=%d", proclen); 6005 6006 if (call_header_cb && (inflate_flag & InflateFlag.EMIT)) { 6007 rv = ErrorCode.OK; 6008 if (subject_stream && isHTTPMessagingEnabled()) { 6009 rv = validateHeaderField(subject_stream, frame, hf, trailer); 6010 if (rv == ErrorCode.HTTP_HEADER) { 6011 LOGF("recv: HTTP error: type=%d, id=%d, header %.*s: %.*s", 6012 frame.hd.type, subject_stream.id, cast(int)hf.name.length, 6013 hf.name, cast(int)hf.value.length, hf.value); 6014 frame.headers.hfa = (&hf)[0 .. 1]; // keep the invalid header for debug 6015 handleInvalidStream2(subject_stream.id, frame, FrameError.PROTOCOL_ERROR); 6016 return ErrorCode.TEMPORAL_CALLBACK_FAILURE; 6017 } 6018 else if (rv == ErrorCode.IGN_HTTP_HEADER) { 6019 /* header is ignored */ 6020 LOGF("recv: HTTP ignored: type=%d, id=%d, header %s: %s", frame.hd.type, subject_stream.id, hf.name, hf.value); 6021 } 6022 6023 } 6024 6025 if (rv == ErrorCode.OK) { 6026 bool pause; 6027 bool close; 6028 bool ok = callOnHeaderField(frame, hf, pause, close); 6029 if (!ok) 6030 return ErrorCode.CALLBACK_FAILURE; 6031 if (close) 6032 return ErrorCode.TEMPORAL_CALLBACK_FAILURE; 6033 if (pause) 6034 return ErrorCode.PAUSE; 6035 } 6036 } 6037 6038 if (inflate_flag & InflateFlag.FINAL) { 6039 hd_inflater.endHeaders(); 6040 break; 6041 } 6042 if ((inflate_flag & InflateFlag.EMIT) == 0 && inlen == 0) { 6043 break; 6044 } 6045 } 6046 return ErrorCode.OK; 6047 } 6048 6049 package: /* Used only for tests */ 6050 /* 6051 * Returns top of outbound frame queue. This function returns null if 6052 * queue is empty. 6053 */ 6054 @property OutboundItem ob_pq_top() { 6055 return ob_pq.top; 6056 } 6057 6058 package: 6059 HashMap!(int, Stream) streams; 6060 6061 StreamRoots roots; 6062 6063 /// Priority Queue for outbound frames other than stream-starting HEADERS and DATA 6064 PriorityQueue ob_pq; 6065 6066 /// Priority Queue for outbound stream-starting HEADERS frame 6067 PriorityQueue ob_ss_pq; 6068 6069 /// Priority Queue for DATA frame 6070 PriorityQueue ob_da_pq; 6071 6072 ActiveOutboundItem aob; 6073 InboundFrame iframe; 6074 Deflater hd_deflater; 6075 Inflater hd_inflater; 6076 Connector connector; 6077 6078 /// Sequence number of outbound frame to maintain the order of enqueue if priority is equal. 6079 long next_seq; 6080 6081 /** Reset count of OutboundItem's weight. We decrements 6082 weight each time DATA is sent to simulate resource sharing. We 6083 use priority queue and larger weight has the precedence. If 6084 weight is reached to lowest weight, it resets to its initial 6085 weight. If this happens, other items which have the lower weight 6086 currently but same initial weight cannot send DATA until item 6087 having large weight is decreased. To avoid this, we use this 6088 cycle variable. Initally, this is set to 1. If weight gets 6089 lowest weight, and if item's cycle == last_cycle, we increments 6090 last_cycle and assigns it to item's cycle. Otherwise, just 6091 assign last_cycle. In priority queue comparator, we first 6092 compare items' cycle value. Lower cycle value has the 6093 precedence. */ 6094 ulong last_cycle = 1; 6095 6096 /// Points to the latest closed stream. null if there is no closed stream. 6097 /// Notes: Only used when session is initialized as server. 6098 Stream closed_stream_head; 6099 6100 /// Points to the oldest closed stream. null if there is no closed stream. 6101 /// Notes: Only used when session is initialized as server. 6102 Stream closed_stream_tail; 6103 6104 /// Points to the latest idle stream. null if there is no idle stream. 6105 /// Notes: Only used when session is initialized as server . 6106 Stream idle_stream_head; 6107 6108 /// Points to the oldest idle stream. null if there is no idle stream. 6109 /// Notes: Only used when session is initialized as server. 6110 Stream idle_stream_tail; 6111 6112 /// In-flight SETTINGS values. null for no in-flight SETTINGS. 6113 Setting[] inflight_iva; 6114 6115 /// The number of outgoing streams. This will be capped by remote_settings.max_concurrent_streams. 6116 size_t num_outgoing_streams; 6117 6118 /// The number of incoming streams. This will be capped by local_settings.max_concurrent_streams. 6119 size_t num_incoming_streams; 6120 6121 /// The number of closed streams still kept in |streams| hash. The closed streams can be accessed 6122 /// through single linked list |closed_stream_head|. 6123 /// Notes: The current implementation only keeps incoming streams if session is initialized as server. 6124 size_t num_closed_streams; 6125 6126 /// The number of idle streams kept in |streams| hash. The idle streams can be accessed through doubly linked list 6127 /// |idle_stream_head|. 6128 /// Notes: The current implementation only keeps idle streams if session is initialized as server. 6129 size_t num_idle_streams; 6130 6131 /// Next Stream ID. Made unsigned int to detect >= (1 << 31). 6132 uint next_stream_id; 6133 6134 /// The largest stream ID received so far 6135 int last_recv_stream_id; 6136 6137 /// The largest stream ID which has been processed in some way. 6138 /// Notes: This value will be used as last-stream-id when sending GOAWAY frame. 6139 int last_proc_stream_id; 6140 6141 /// Counter of unique ID of PING. Wraps when it exceeds max_UNIQUE_ID */ 6142 uint next_unique_id; 6143 6144 /// This is the last-stream-ID we have sent in GOAWAY 6145 int local_last_stream_id = (1u << 31) - 1; 6146 6147 /// This is the value in GOAWAY frame received from remote endpoint. 6148 int remote_last_stream_id = (1u << 31) - 1; 6149 6150 /// Current sender window size. This value is computed against the current initial window size of remote endpoint. 6151 int remote_window_size = INITIAL_CONNECTION_WINDOW_SIZE; 6152 6153 /// Keep track of the number of bytes received without WINDOW_UPDATE. This could be negative after 6154 /// submitting negative value to WINDOW_UPDATE. 6155 int recv_window_size; 6156 6157 /// The number of bytes consumed by the application and now is subject to WINDOW_UPDATE. 6158 /// Notes: This is only used when auto WINDOW_UPDATE is turned off. 6159 int consumed_size; 6160 6161 /// The amount of recv_window_size cut using submitting negative value to WINDOW_UPDATE 6162 int recv_reduction; 6163 6164 /// window size for local flow control. It is initially set to INITIAL_CONNECTION_WINDOW_SIZE and could be 6165 /// increased/decreased by submitting WINDOW_UPDATE. See submitWindowUpdate(). 6166 int local_window_size = INITIAL_CONNECTION_WINDOW_SIZE; 6167 6168 /// Settings value received from the remote endpoint. We just use ID as index. The index = 0 is unused. 6169 SettingsStorage remote_settings; 6170 6171 /// Settings value of the local endpoint. 6172 SettingsStorage local_settings; 6173 6174 /// Option flags. This is bitwise-OR of 0 or more of OptionsMask. 6175 OptionsMask opt_flags; 6176 6177 /// Unacked local Setting.MAX_CONCURRENT_STREAMS value. We use this to refuse the incoming stream if it exceeds this value. 6178 uint pending_local_max_concurrent_stream = INITIAL_MAX_CONCURRENT_STREAMS; 6179 6180 /// Unacked local ENABLE_PUSH value. We use this to refuse PUSH_PROMISE before SETTINGS ACK is received. 6181 bool pending_enable_push = true; 6182 6183 /// true if the session is server side. 6184 bool is_server; 6185 6186 /// Flags indicating GOAWAY is sent and/or recieved. 6187 GoAwayFlags goaway_flags = GoAwayFlags.NONE; 6188 6189 6190 } 6191 /** 6192 * @function 6193 * 6194 * Serializes the SETTINGS values |iv| in the |buf|. The size of the 6195 * |buf| is specified by |buflen|. The number of entries in the |iv| 6196 * array is given by |niv|. The required space in |buf| for the |niv| 6197 * entries is `8*niv` bytes and if the given buffer is too small, an 6198 * error is returned. This function is used mainly for creating a 6199 * SETTINGS payload to be sent with the `HTTP2-Settings` header 6200 * field in an HTTP Upgrade request. The data written in |buf| is NOT 6201 * base64url encoded and the application is responsible for encoding. 6202 * 6203 * This function returns the number of bytes written in |buf|, or one 6204 * of the following negative error codes: 6205 * 6206 * $(D ErrorCode.INVALID_ARGUMENT) 6207 * The |iv| contains duplicate settings ID or invalid value. 6208 * 6209 * $(D ErrorCode.INSUFF_BUFSIZE) 6210 * The provided |buflen| size is too small to hold the output. 6211 */ 6212 int packSettingsPayload(ubyte[] buf, in Setting[] iva) 6213 { 6214 if (!iva.check()) { 6215 return ErrorCode.INVALID_ARGUMENT; 6216 } 6217 6218 if (buf.length < (iva.length * FRAME_SETTINGS_ENTRY_LENGTH)) { 6219 return ErrorCode.INSUFF_BUFSIZE; 6220 } 6221 6222 return Settings.pack(buf, iva); 6223 } 6224 6225 /** 6226 * Submits HEADERS frame and optionally one or more DATA frames. 6227 * 6228 * The |pri_spec| is priority specification of this request. 6229 * To specify the priority, use `PrioritySpec()`. 6230 * 6231 * The `pri_spec.weight` must be in [$(D MIN_WEIGHT), 6232 * $(D MAX_WEIGHT)], inclusive. If `pri_spec.weight` is 6233 * strictly less than $(D MIN_WEIGHT), it becomes 6234 * $(D MIN_WEIGHT). If it is strictly greater than 6235 * $(D MAX_WEIGHT), it becomes $(D MAX_WEIGHT). 6236 * 6237 * The |hfa| is an array of header fields $(D HeaderField) with 6238 * |hfa.length| elements. The application is responsible to include 6239 * required pseudo-header fields (header field whose name starts with 6240 * ":") in |hfa| and must place pseudo-headers before regular header 6241 * fields. 6242 * 6243 * This function creates copies of all header fields in |hfa|. It 6244 * also lower-cases all names in |hfa|. The order of elements in 6245 * |hfa| is preserved. 6246 * 6247 * HTTP/2 specification has requirement about header fields in the 6248 * request HEADERS. See the specification for more details. 6249 * 6250 * If |data_prd| is not `null`, it provides data which will be sent 6251 * in subsequent DATA frames. In this case, a method that allows 6252 * request message bodies 6253 * (http://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html#sec9) must 6254 * be specified with `:method` key in |hfa| (e.g. `POST`). This 6255 * function does not take ownership of the |data_prd|. The function 6256 * copies the members of the |data_prd|. If |data_prd| is `null`, 6257 * HEADERS have END_STREAM set. The |stream_user_data| is data 6258 * associated to the stream opened by this request and can be an 6259 * arbitrary pointer, which can be retrieved later by 6260 * `getStreamUserData()`. 6261 * 6262 * This function returns assigned stream ID if it succeeds, or one of 6263 * the following negative error codes: 6264 * 6265 * $(D ErrorCode.STREAM_ID_NOT_AVAILABLE) 6266 * No stream ID is available because maximum stream ID was 6267 * reached. 6268 * 6269 * .. warning:: 6270 * 6271 * This function returns assigned stream ID if it succeeds. But 6272 * that stream is not opened yet. The application must not submit 6273 * a frame to that stream ID before $(D Connector.onFrameReady) is called for this 6274 * frame. 6275 * 6276 */ 6277 int submitRequest(Session session, in PrioritySpec pri_spec, in HeaderField[] hfa, in DataProvider data_prd, void* stream_user_data = null) 6278 { 6279 FrameFlags flags = setRequestFlags(pri_spec, data_prd); 6280 6281 return submitHeadersSharedHfa(session, flags, -1, pri_spec, hfa, data_prd, stream_user_data, false); 6282 } 6283 6284 /** 6285 * Submits response HEADERS frame and optionally one or more DATA 6286 * frames against the stream |stream_id|. 6287 * 6288 * The |hfa| is an array of $(D HeaderField) with 6289 * |hfa.length| elements. The application is responsible to include 6290 * required pseudo-header fields (header field whose name starts with 6291 * ":") in |hfa| and must place pseudo-headers before regular header 6292 * fields. 6293 * 6294 * This function creates copies of all header fields in |hfa|. It 6295 * also lower-cases all names in |hfa|. The order of elements in 6296 * |hfa| is preserved. 6297 * 6298 * HTTP/2 specification has requirement about header fields in the 6299 * response HEADERS. See the specification for more details. 6300 * 6301 * If |data_prd| is not `null`, it provides data which will be sent 6302 * in subsequent DATA frames. If |data_prd| is `null`, HEADERS will have 6303 * END_STREAM flag set. 6304 * 6305 * This method can be used as normal HTTP response and push response. 6306 * When pushing a resource using this function, the $(D Session) must be 6307 * configured using `new Session()` or its variants and 6308 * the target stream denoted by the |stream_id| must be reserved using 6309 * `submitPushPromise()`. 6310 * 6311 * To send non-final response headers (e.g., HTTP status 101), don't 6312 * use this function because this function half-closes the outbound 6313 * stream. Instead, use `submitHeaders()` for this purpose. 6314 * 6315 * This function returns 0 if it succeeds, or one of the following 6316 * negative error codes: 6317 * 6318 * $(D ErrorCode.INVALID_ARGUMENT) 6319 * The |stream_id| is 0. 6320 * 6321 * .. warning:: 6322 * 6323 * Calling this function twice for the same stream ID may lead to 6324 * program crash. It is generally considered to a programming error 6325 * to commit response twice. 6326 */ 6327 ErrorCode submitResponse(Session session, int stream_id, in HeaderField[] hfa, in DataProvider data_prd) 6328 { 6329 FrameFlags flags = setResponseFlags(data_prd); 6330 return cast(ErrorCode)submitHeadersSharedHfa(session, flags, stream_id, PrioritySpec.init, hfa, data_prd, null, true); 6331 } 6332 6333 /** 6334 * Submits HEADERS frame. The |flags| is bitwise OR of the 6335 * following values: 6336 * 6337 * * $(D FrameFlags.END_STREAM) 6338 * 6339 * If |flags| includes $(D FrameFlags.END_STREAM), this frame has 6340 * END_STREAM flag set. 6341 * 6342 * The library handles the CONTINUATION frame internally and it 6343 * correctly sets END_HEADERS to the last sequence of the PUSH_PROMISE 6344 * or CONTINUATION frame. 6345 * 6346 * If the |stream_id| is -1, this frame is assumed as request (i.e., 6347 * request HEADERS frame which opens new stream). In this case, the 6348 * assigned stream ID will be returned. Otherwise, specify stream ID 6349 * in |stream_id|. 6350 * 6351 * The |pri_spec| is priority specification of this request. init 6352 * means the default priority. To specify the priority, 6353 * use $(D PrioritySpec) constructor. 6354 * 6355 * The `pri_spec.weight` must be in [$(D MIN_WEIGHT), 6356 * $(D MAX_WEIGHT)], inclusive. If `pri_spec.weight` is 6357 * strictly less than $(D MIN_WEIGHT), it becomes 6358 * $(D MIN_WEIGHT). If it is strictly greater than 6359 * $(D MAX_WEIGHT), it becomes $(D MAX_WEIGHT). 6360 * 6361 * The |hfa| is an array of header fields $(D HeaderField) with 6362 * |hfa.length| elements. The application is responsible to include 6363 * required pseudo-header fields (header field whose name starts with 6364 * ":") in |hfa| and must place pseudo-headers before regular header 6365 * fields. 6366 * 6367 * This function creates copies of all header fields in |hfa|. It 6368 * also lower-cases all names in |hfa|. The order of elements in 6369 * |hfa| is preserved. 6370 * 6371 * The |stream_user_data| is a pointer to an arbitrary data which is 6372 * associated to the stream this frame will open. Therefore it is 6373 * only used if this frame opens streams, in other words, it changes 6374 * stream state from idle or reserved to open. 6375 * 6376 * This function is low-level in a sense that the application code can 6377 * specify flags directly. For usual HTTP request, 6378 * `submitRequest()` is useful. 6379 * 6380 * This function returns newly assigned stream ID if it succeeds and 6381 * |stream_id| is -1. Otherwise, this function returns 0 if it 6382 * succeeds, or one of the following negative error codes: 6383 * 6384 * $(D ErrorCode.STREAM_ID_NOT_AVAILABLE) 6385 * No stream ID is available because maximum stream ID was 6386 * reached. 6387 * $(D ErrorCode.INVALID_ARGUMENT) 6388 * The |stream_id| is 0. 6389 * 6390 * .. warning:: 6391 * 6392 * This function returns assigned stream ID if it succeeds and 6393 * |stream_id| is -1. But that stream is not opened yet. The 6394 * application must not submit frame to that stream ID before 6395 * $(D Connector.onFrameHeader) is called for this 6396 * frame. 6397 * 6398 */ 6399 int submitHeaders(Session session, FrameFlags flags, int stream_id = -1, in PrioritySpec pri_spec = PrioritySpec.init, in HeaderField[] hfa = null, void *stream_user_data = null) 6400 { 6401 flags &= FrameFlags.END_STREAM; 6402 6403 if (pri_spec != PrioritySpec.init) 6404 flags |= FrameFlags.PRIORITY; 6405 6406 return submitHeadersSharedHfa(session, flags, stream_id, pri_spec, hfa, DataProvider.init, stream_user_data, false); 6407 } 6408 6409 /** 6410 * Submits one or more DATA frames to the stream |stream_id|. The 6411 * data to be sent are provided by |data_prd|. If |flags| contains 6412 * $(D FrameFlags.END_STREAM), the last DATA frame has END_STREAM 6413 * flag set. 6414 * 6415 * This function does not take ownership of the |data_prd|. The 6416 * function copies the members of the |data_prd|. 6417 * 6418 * This function returns 0 if it succeeds, or one of the following 6419 * negative error codes: 6420 * 6421 * $(D ErrorCode.DATA_EXIST) 6422 * DATA has been already submitted and not fully processed yet. 6423 * $(D ErrorCode.INVALID_ARGUMENT) 6424 * The |stream_id| is 0. 6425 * $(D ErrorCode.STREAM_CLOSED) 6426 * The stream was alreay closed; or the |stream_id| is invalid. 6427 * 6428 * .. note:: 6429 * 6430 * Currently, only one data is allowed for a stream at a time. 6431 * Submitting data more than once before first data is finished 6432 * results in $(D ErrorCode.DATA_EXIST) error code. The 6433 * earliest callback which tells that previous data is done is 6434 * $(D Connector.onFrameSent). In side that callback, 6435 * new data can be submitted using `submitData()`. Of 6436 * course, all data except for last one must not have 6437 * $(D FrameFlags.END_STREAM) flag set in |flags|. 6438 */ 6439 ErrorCode submitData(Session session, FrameFlags flags, int stream_id, in DataProvider data_prd) 6440 { 6441 OutboundItem item; 6442 Frame* frame; 6443 DataAuxData* aux_data; 6444 DataFlags nflags = cast(DataFlags)(flags & FrameFlags.END_STREAM); 6445 6446 if (stream_id == 0) 6447 return ErrorCode.INVALID_ARGUMENT; 6448 6449 item = Mem.alloc!OutboundItem(session); 6450 scope(failure) Mem.free(item); 6451 6452 frame = &item.frame; 6453 aux_data = &item.aux_data.data; 6454 aux_data.data_prd = data_prd; 6455 aux_data.eof = false; 6456 aux_data.flags = nflags; 6457 6458 /* flags are sent on transmission */ 6459 frame.data = Data(FrameFlags.NONE, stream_id); 6460 scope(failure) frame.data.free(); 6461 ErrorCode rv = session.addItem(item); 6462 if (rv != 0) { 6463 frame.data.free(); 6464 Mem.free(item); 6465 return rv; 6466 } 6467 return ErrorCode.OK; 6468 } 6469 6470 /** 6471 * Submits PRIORITY frame to change the priority of stream |stream_id| 6472 * to the priority specification |pri_spec|. 6473 * 6474 * 6475 * The |pri_spec| is priority specification of this request. `null` 6476 * is not allowed for this function. To specify the priority, use 6477 * `PrioritySpec.init`. This function will copy its data 6478 * members. 6479 * 6480 * The `pri_spec.weight` must be in [$(D MIN_WEIGHT), 6481 * $(D MAX_WEIGHT)], inclusive. If `pri_spec.weight` is 6482 * strictly less than $(D MIN_WEIGHT), it becomes 6483 * $(D MIN_WEIGHT). If it is strictly greater than 6484 * $(D MAX_WEIGHT), it becomes $(D MAX_WEIGHT). 6485 * 6486 * This function returns 0 if it succeeds, or one of the following 6487 * negative error codes: 6488 * 6489 * $(D ErrorCode.INVALID_ARGUMENT) 6490 * The |stream_id| is 0; or the |pri_spec| is null; or trying to 6491 * depend on itself. 6492 */ 6493 ErrorCode submitPriority(Session session, int stream_id, in PrioritySpec pri_spec) 6494 { 6495 OutboundItem item; 6496 Frame* frame; 6497 PrioritySpec copy_pri_spec; 6498 6499 if (stream_id == 0 || pri_spec == PrioritySpec.init) 6500 return ErrorCode.INVALID_ARGUMENT; 6501 6502 if (stream_id == pri_spec.stream_id) 6503 return ErrorCode.INVALID_ARGUMENT; 6504 6505 copy_pri_spec = pri_spec; 6506 6507 copy_pri_spec.adjustWeight(); 6508 6509 item = Mem.alloc!OutboundItem(session); 6510 scope(failure) Mem.free(item); 6511 frame = &item.frame; 6512 6513 frame.priority = Priority(stream_id, copy_pri_spec); 6514 scope(failure) frame.priority.free(); 6515 6516 session.addItem(item); 6517 return ErrorCode.OK; 6518 } 6519 6520 6521 /** 6522 * @function 6523 * 6524 * Submits RST_STREAM frame to cancel/reject the stream |stream_id| 6525 * with the error code |error_code|. 6526 * 6527 * The pre-defined error code is one of $(D FrameError). 6528 * 6529 * This function returns 0 if it succeeds, or one of the following 6530 * negative error codes: 6531 * 6532 * $(D ErrorCode.NOMEM) 6533 * Out of memory. 6534 * $(D ErrorCode.INVALID_ARGUMENT) 6535 * The |stream_id| is 0. 6536 */ 6537 ErrorCode submitRstStream(Session session, int stream_id, FrameError error_code) 6538 { 6539 if (stream_id == 0) 6540 return ErrorCode.INVALID_ARGUMENT; 6541 6542 session.addRstStream(stream_id, error_code); 6543 return ErrorCode.OK; 6544 } 6545 6546 /** 6547 * @function 6548 * 6549 * Stores local settings and submits SETTINGS frame. The |iva| is the 6550 * pointer to the array of $(D Setting). The |iv.length| 6551 * indicates the number of settings. 6552 * 6553 * This function does not take ownership of the |iva|. This function 6554 * copies all the elements in the |iva|. 6555 * 6556 * While updating individual stream's local window size, if the window 6557 * size becomes strictly larger than max_WINDOW_SIZE, 6558 * RST_STREAM is issued against such a stream. 6559 * 6560 * SETTINGS with $(D FrameFlags.ACK) is automatically submitted 6561 * by the library and application could not send it at its will. 6562 * 6563 * This function returns 0 if it succeeds, or one of the following 6564 * negative error codes: 6565 * 6566 * $(D ErrorCode.INVALID_ARGUMENT) 6567 * The |iv| contains invalid value (e.g., initial window size 6568 * strictly greater than (1 << 31) - 1. 6569 * $(D ErrorCode.TOO_MANY_INFLIGHT_SETTINGS) 6570 * There is already another in-flight SETTINGS. Note that the 6571 * current implementation only allows 1 in-flight SETTINGS frame 6572 * without ACK flag set. 6573 * $(D ErrorCode.NOMEM) 6574 * Out of memory. 6575 */ 6576 ErrorCode submitSettings(Session session, in Setting[] iva) 6577 { 6578 return session.addSettings(FrameFlags.NONE, iva); 6579 } 6580 6581 /** 6582 * Submits PUSH_PROMISE frame. 6583 * 6584 * The |stream_id| must be client initiated stream ID. 6585 * 6586 * The |hfa| is an array of $(D HeaderField) with 6587 * |hfa.length| elements. The application is responsible to include 6588 * required pseudo-header fields (header field whose name starts with 6589 * ":") in |hfa| and must place pseudo-headers before regular header 6590 * fields. 6591 * 6592 * This function creates copies of all header fieldss in |hfa|. It 6593 * also lower-cases all names in |hfa|. The order of elements in 6594 * |hfa| is preserved. 6595 * 6596 * The |promised_stream_user_data| is a pointer to an arbitrary data 6597 * which is associated to the promised stream this frame will open and 6598 * make it in reserved state. It is available using $(D Session.getStreamUserData). 6599 * The application can access it in $(D Connector.onFrameHeader) and 6600 * $(D Connector.onFrameSent) of this frame. 6601 * 6602 * The client side is not allowed to use this function. 6603 * 6604 * To submit response headers and data, use 6605 * `submitResponse()`. 6606 * 6607 * This function returns assigned promised stream ID if it succeeds, 6608 * or one of the following negative error codes: 6609 * 6610 * $(D ErrorCode.PROTO) 6611 * This function was invoked when $(D Session) is initialized as 6612 * client. 6613 * $(D ErrorCode.STREAM_ID_NOT_AVAILABLE) 6614 * No stream ID is available because maximum stream ID was 6615 * reached. 6616 * $(D ErrorCode.INVALID_ARGUMENT) 6617 * The |stream_id| is 0; The |stream_id| does not designate stream 6618 * that peer initiated. 6619 * 6620 * .. warning:: 6621 * 6622 * This function returns assigned promised stream ID if it succeeds. 6623 * But that stream is not opened yet. The application must not 6624 * submit frame to that stream ID before 6625 * $(D Connector.onFrameHeader) is called for this 6626 * frame. 6627 * 6628 */ 6629 int submitPushPromise(Session session, int stream_id, in HeaderField[] hfa, void* promised_stream_user_data) 6630 { 6631 OutboundItem item; 6632 Frame* frame; 6633 HeaderField[] hfa_copy; 6634 FrameFlags flags_copy; 6635 int promised_stream_id; 6636 6637 if (stream_id == 0 || session.isMyStreamId(stream_id)) { 6638 return ErrorCode.INVALID_ARGUMENT; 6639 } 6640 6641 if (!session.is_server) 6642 return ErrorCode.PROTO; 6643 6644 /* All 32bit signed stream IDs are spent. */ 6645 if (session.next_stream_id > int.max) { 6646 return ErrorCode.STREAM_ID_NOT_AVAILABLE; 6647 } 6648 6649 item = Mem.alloc!OutboundItem(session); 6650 scope(failure) 6651 Mem.free(item); 6652 6653 item.aux_data.headers.stream_user_data = promised_stream_user_data; 6654 6655 frame = &item.frame; 6656 bool is_owner; 6657 hfa_copy = hfa.copy(); 6658 is_owner = true; 6659 scope(failure) if (is_owner) Mem.free(hfa_copy); 6660 flags_copy = FrameFlags.END_HEADERS; 6661 6662 promised_stream_id = session.next_stream_id; 6663 session.next_stream_id += 2; 6664 6665 is_owner = false; 6666 frame.push_promise = PushPromise(flags_copy, stream_id, promised_stream_id, hfa_copy); 6667 scope(failure) frame.push_promise.free(); 6668 6669 session.addItem(item); 6670 6671 return promised_stream_id; 6672 } 6673 6674 /** 6675 * Submits PING frame. You don't have to send PING back when you 6676 * received PING frame. The library automatically submits PING frame 6677 * in this case. 6678 * 6679 * 6680 * If the |opaque_data| is non `null`, then it should point to the 8 6681 * bytes array of memory to specify opaque data to send with PING 6682 * frame. If the |opaque_data| is `null`, zero-cleared 8 bytes will 6683 * be sent as opaque data. 6684 */ 6685 void submitPing(Session session, in ubyte[] opaque_data) 6686 { 6687 return session.addPing(FrameFlags.NONE, opaque_data); 6688 } 6689 6690 /** 6691 * @function 6692 * 6693 * Submits GOAWAY frame with the last stream ID |last_stream_id| and 6694 * the error code |error_code|. 6695 * 6696 * The pre-defined error code is one of $(D FrameError). 6697 * 6698 * The |flags| is currently ignored and should be 6699 * $(D FrameFlags.NONE). 6700 * 6701 * The |last_stream_id| is peer's stream ID or 0. So if $(D Session) is 6702 * initialized as client, |last_stream_id| must be even or 0. If 6703 * $(D Session) is initialized as server, |last_stream_id| must be odd or 6704 * 0. 6705 * 6706 * The HTTP/2 specification says last_stream_id must not be increased 6707 * from the value previously sent. So the actual value sent as 6708 * last_stream_id is the minimum value between the given 6709 * |last_stream_id| and the last_stream_id previously sent to the 6710 * peer. 6711 * 6712 * If the |opaque_data| is not `null` and |opaque_data_len| is not 6713 * zero, those data will be sent as additional debug data. The 6714 * library makes a copy of the memory region pointed by |opaque_data| 6715 * with the length |opaque_data_len|, so the caller does not need to 6716 * keep this memory after the return of this function. If the 6717 * |opaque_data_len| is 0, the |opaque_data| could be `null`. 6718 * 6719 * After successful transmission of GOAWAY, following things happen. 6720 * All incoming streams having strictly more than |last_stream_id| are 6721 * closed. All incoming HEADERS which starts new stream are simply 6722 * ignored. After all active streams are handled, both 6723 * `wantRead()` and `wantWrite()` return 0 and the application can close session. 6724 * 6725 * This function returns 0 if it succeeds, or one of the following 6726 * negative error codes: 6727 * 6728 * $(D ErrorCode.INVALID_ARGUMENT) 6729 * The |opaque_data.length| is too large; the |last_stream_id| is invalid. 6730 */ 6731 ErrorCode submitGoAway(Session session, int last_stream_id, FrameError error_code, in string opaque_data) 6732 { 6733 if (session.goaway_flags & GoAwayFlags.TERM_ON_SEND) { 6734 return ErrorCode.OK; 6735 } 6736 return session.addGoAway(last_stream_id, error_code, opaque_data, GoAwayAuxFlags.NONE); 6737 } 6738 6739 /** 6740 * Submits WINDOW_UPDATE frame. 6741 * 6742 * The |flags| is currently ignored and should be 6743 * $(D FrameFlags.NONE). 6744 * 6745 * If the |window_size_increment| is positive, the WINDOW_UPDATE with 6746 * that value as window_size_increment is queued. If the 6747 * |window_size_increment| is larger than the received bytes from the 6748 * remote endpoint, the local window size is increased by that 6749 * difference. 6750 * 6751 * If the |window_size_increment| is negative, the local window size 6752 * is decreased by -|window_size_increment|. If automatic 6753 * WINDOW_UPDATE is enabled 6754 * $(D Options.setNoAutoWindowUpdate), and the library 6755 * decided that the WINDOW_UPDATE should be submitted, then 6756 * WINDOW_UPDATE is queued with the current received bytes count. 6757 * 6758 * If the |window_size_increment| is 0, the function does nothing and 6759 * returns 0. 6760 * 6761 * This function returns 0 if it succeeds, or one of the following 6762 * negative error codes: 6763 * 6764 * $(D ErrorCode.FLOW_CONTROL) 6765 * The local window size overflow or gets negative. 6766 */ 6767 ErrorCode submitWindowUpdate(Session session, int stream_id, int window_size_increment) 6768 { 6769 ErrorCode rv; 6770 Stream stream; 6771 if (window_size_increment == 0) { 6772 return ErrorCode.OK; 6773 } 6774 FrameFlags flags; 6775 if (stream_id == 0) { 6776 rv = adjustLocalWindowSize(session.local_window_size, session.recv_window_size, session.recv_reduction, window_size_increment); 6777 if (rv != ErrorCode.OK) { 6778 return rv; 6779 } 6780 } else { 6781 stream = session.getStream(stream_id); 6782 if (!stream) { 6783 return ErrorCode.OK; 6784 } 6785 6786 rv = adjustLocalWindowSize(stream.localWindowSize, stream.recvWindowSize, stream.recvReduction, window_size_increment); 6787 if (rv != ErrorCode.OK) { 6788 return rv; 6789 } 6790 } 6791 6792 if (window_size_increment > 0) { 6793 if (stream_id == 0) { 6794 session.consumed_size = max(0, session.consumed_size - window_size_increment); 6795 } else { 6796 stream.consumedSize = max(0, stream.consumedSize - window_size_increment); 6797 } 6798 6799 session.addWindowUpdate(flags, stream_id, window_size_increment); 6800 } 6801 return ErrorCode.OK; 6802 } 6803 6804 6805 /** 6806 * Signals to the client that the server started graceful shutdown 6807 * procedure. 6808 * 6809 * This function is only usable for server. If this function is 6810 * called with client side session, this function returns 6811 * $(D ErrorCode.INVALID_STATE). 6812 * 6813 * To gracefully shutdown HTTP/2 session, server should call this 6814 * function to send GOAWAY with last_stream_id (1u << 31) - 1. And 6815 * after some delay (e.g., 1 RTT), send another GOAWAY with the stream 6816 * ID that the server has some processing using 6817 * `submitGoAway()`. See also `getLastProcStreamID()`. 6818 * 6819 * Unlike `submitGoAway()`, this function just sends GOAWAY 6820 * and does nothing more. This is a mere indication to the client 6821 * that session shutdown is imminent. The application should call 6822 * `submitGoAway()` with appropriate last_stream_id after 6823 * this call. 6824 * 6825 * If one or more GOAWAY frame have been already sent by either 6826 * `submitGoAway()` or `terminateSession()`, this function has no effect. 6827 * 6828 * This function returns 0 if it succeeds, or one of the following 6829 * negative error codes: 6830 * 6831 * $(D ErrorCode.NOMEM) 6832 * Out of memory. 6833 * $(D ErrorCode.INVALID_STATE) 6834 * The $(D Session) is initialized as client. 6835 */ 6836 ErrorCode submitShutdownNotice(Session session) 6837 { 6838 if (!session.is_server) { 6839 return ErrorCode.INVALID_STATE; 6840 } 6841 if (session.goaway_flags) 6842 return ErrorCode.OK; 6843 6844 return session.addGoAway((1u << 31) - 1, FrameError.NO_ERROR, null, GoAwayAuxFlags.SHUTDOWN_NOTICE); 6845 } 6846 6847 private: 6848 6849 FrameFlags setResponseFlags(in DataProvider data_prd) 6850 { 6851 FrameFlags flags = FrameFlags.NONE; 6852 6853 if (!data_prd) 6854 flags |= FrameFlags.END_STREAM; 6855 6856 return flags; 6857 } 6858 6859 FrameFlags setRequestFlags(in PrioritySpec pri_spec, in DataProvider data_prd) 6860 { 6861 FrameFlags flags = FrameFlags.NONE; 6862 if (!data_prd) 6863 flags |= FrameFlags.END_STREAM; 6864 6865 if (pri_spec != PrioritySpec.init) 6866 flags |= FrameFlags.PRIORITY; 6867 6868 return flags; 6869 } 6870 6871 /* This function takes ownership of |hfa_copy|. Regardless of the 6872 return value, the caller must not free |hfa_copy| after this 6873 function returns. */ 6874 int submitHeadersShared(Session session, FrameFlags flags, int stream_id, 6875 const ref PrioritySpec pri_spec, HeaderField[] hfa_copy, 6876 in DataProvider data_prd, void *stream_user_data, bool attach_stream) 6877 { 6878 ErrorCode rv; 6879 FrameFlags flags_copy; 6880 OutboundItem item; 6881 Frame* frame; 6882 HeadersCategory hcat; 6883 bool owns_hfa = true; 6884 scope(failure) 6885 if (owns_hfa && hfa_copy) 6886 hfa_copy.free(); 6887 6888 if (stream_id == 0) { 6889 hfa_copy.free(); 6890 return ErrorCode.INVALID_ARGUMENT; 6891 } 6892 6893 item = Mem.alloc!OutboundItem(session); 6894 scope(failure) { 6895 if (item) Mem.free(item); 6896 } 6897 if (data_prd) { 6898 item.aux_data.headers.data_prd = data_prd; 6899 } 6900 6901 item.aux_data.headers.stream_user_data = stream_user_data; 6902 item.aux_data.headers.attach_stream = attach_stream; 6903 6904 flags_copy = cast(FrameFlags)((flags & (FrameFlags.END_STREAM | FrameFlags.PRIORITY)) | FrameFlags.END_HEADERS); 6905 6906 if (stream_id == -1) { 6907 if (session.next_stream_id > int.max) { 6908 if (item) Mem.free(item); 6909 if (hfa_copy) 6910 hfa_copy.free(); 6911 return ErrorCode.STREAM_ID_NOT_AVAILABLE; 6912 } 6913 6914 stream_id = session.next_stream_id; 6915 session.next_stream_id += 2; 6916 6917 hcat = HeadersCategory.REQUEST; 6918 } else { 6919 /* More specific categorization will be done later. */ 6920 hcat = HeadersCategory.HEADERS; 6921 } 6922 6923 frame = &item.frame; 6924 6925 owns_hfa = false; 6926 frame.headers = Headers(flags_copy, stream_id, hcat, pri_spec, hfa_copy); 6927 session.addItem(item); 6928 6929 if (rv != ErrorCode.OK) { 6930 if (item) { Mem.free(item); } 6931 if (hfa_copy) 6932 hfa_copy.free(); 6933 return rv; 6934 } 6935 6936 if (hcat == HeadersCategory.REQUEST) 6937 return stream_id; 6938 6939 return ErrorCode.OK; 6940 } 6941 6942 6943 6944 int submitHeadersSharedHfa(Session session, FrameFlags flags, int stream_id, in PrioritySpec pri_spec, in HeaderField[] hfa, 6945 in DataProvider data_prd, void *stream_user_data, bool attach_stream) 6946 { 6947 HeaderField[] hfa_copy = hfa.copy(); 6948 PrioritySpec copy_pri_spec = pri_spec; 6949 copy_pri_spec.adjustWeight(); 6950 6951 return submitHeadersShared(session, flags, stream_id, copy_pri_spec, hfa_copy, data_prd, stream_user_data, attach_stream); 6952 } 6953 6954 public: 6955 6956 /** 6957 * A helper function for dealing with NPN in client side or ALPN in 6958 * server side. The |input| contains peer's protocol list in preferable 6959 * order. The format of |input| is length-prefixed and not 6960 * null-terminated. For example, `HTTP-draft-04/2.0` and 6961 * `http/1.1` stored in |input| like this:: 6962 * 6963 * in[0] = 17 6964 * in[1..17] = "HTTP-draft-04/2.0" 6965 * in[18] = 8 6966 * in[19..26] = "http/1.1" 6967 * inlen = 27 6968 * 6969 * The selection algorithm is as follows: 6970 * 6971 * 1. If peer's list contains HTTP/2 protocol the library supports, 6972 * it is selected and returns 1. The following step is not taken. 6973 * 6974 * 2. If peer's list contains `http/1.1`, this function selects 6975 * `http/1.1` and returns 0. The following step is not taken. 6976 * 6977 * 3. This function selects nothing and returns -1 (So called 6978 * non-overlap case). In this case, |output| is left 6979 * untouched. 6980 * 6981 * Selecting `HTTP-draft-04/2.0` means that `HTTP-draft-04/2.0` is 6982 * written into |*out| and its length (which is 17) is assigned to 6983 * |*outlen|. 6984 * 6985 * For ALPN, refer to 6986 * https://tools.ietf.org/html/draft-ietf-tls-applayerprotoneg-05 6987 * 6988 * See http://technotes.googlecode.com/git/nextprotoneg.html for more 6989 * details about NPN. 6990 * 6991 * For NPN, to use this method you should do something like:: 6992 * 6993 * static int select_next_proto_cb(SSL* ssl, 6994 * unsigned char **out, 6995 * unsigned char *outlen, 6996 * const unsigned char *in, 6997 * unsigned int inlen, 6998 * void *arg) 6999 * { 7000 * int rv; 7001 * rv = selectNextProtocol(out, outlen, in, inlen); 7002 * if(rv == 1) { 7003 * (cast(MyType*)arg).http2_selected = 1; 7004 * } 7005 * return SSL_TLSEXT_ERR_OK; 7006 * } 7007 * ... 7008 * SSL_CTX_set_next_proto_select_cb(ssl_ctx, select_next_proto_cb, my_obj); 7009 * 7010 */ 7011 int selectNextProtocol(ref ubyte[] output, in ubyte[] input, ubyte[] other_proto = null) 7012 { 7013 size_t i; 7014 size_t len; 7015 while (i < input.length) 7016 { 7017 len = input[i]; 7018 ++i; 7019 ubyte[] proto = cast(ubyte[]) input[i .. i+len]; 7020 i += len; 7021 if (other_proto && other_proto == proto) 7022 { 7023 output = proto; 7024 return 1; 7025 } 7026 7027 if (proto == PROTOCOL_ALPN) { 7028 output = proto; 7029 return 1; 7030 } 7031 if (proto == HTTP_1_1_ALPN) { 7032 output = proto; 7033 return ErrorCode.OK; 7034 } 7035 } 7036 return -1; 7037 } 7038 7039 7040 7041 /** 7042 * Returns true if the $(D RV) library error code 7043 * |lib_error| is fatal. 7044 */ 7045 bool isFatal(int lib_error) { return lib_error < ErrorCode.FATAL; } 7046 7047 7048 /// Configuration options 7049 enum OptionFlags { 7050 /** 7051 * This option prevents the library from sending WINDOW_UPDATE for a 7052 * connection automatically. If this option is set to nonzero, the 7053 * library won't send WINDOW_UPDATE for DATA until application calls 7054 * $(D Session.consume) to indicate the amount of consumed 7055 * DATA. By default, this option is set to zero. 7056 */ 7057 NO_AUTO_WINDOW_UPDATE = 1, 7058 /** 7059 * This option sets the Setting.MAX_CONCURRENT_STREAMS value of 7060 * remote endpoint as if it is received in SETTINGS frame. Without 7061 * specifying this option, before the local endpoint receives 7062 * Setting.MAX_CONCURRENT_STREAMS in SETTINGS frame from remote 7063 * endpoint, Setting.MAX_CONCURRENT_STREAMS is unlimited. This may 7064 * cause problem if local endpoint submits lots of requests 7065 * initially and sending them at once to the remote peer may lead to 7066 * the rejection of some requests. Specifying this option to the 7067 * sensible value, say 100, may avoid this kind of issue. This value 7068 * will be overwritten if the local endpoint receives 7069 * Setting.MAX_CONCURRENT_STREAMS from the remote endpoint. 7070 */ 7071 PEER_MAX_CONCURRENT_STREAMS = 1 << 1, 7072 RECV_CLIENT_PREFACE = 1 << 2, 7073 NO_HTTP_MESSAGING = 1 << 3, 7074 } 7075 7076 /// Struct to store option values for http2_session. 7077 struct Options { 7078 private: 7079 /// Bitwise OR of http2_option_flag to determine which fields are specified. 7080 uint m_opt_set_mask; 7081 7082 uint m_peer_max_concurrent_streams; 7083 7084 bool m_no_auto_window_update; 7085 7086 bool m_recv_client_preface; 7087 7088 bool m_no_http_messaging; 7089 public: 7090 @property uint peer_max_concurrent_streams() const { return m_peer_max_concurrent_streams; } 7091 @property uint opt_set_mask() const { return m_opt_set_mask; } 7092 @property bool no_auto_window_update() const { return m_no_auto_window_update; } 7093 @property bool recv_client_preface() const { return m_recv_client_preface; } 7094 @property bool no_http_messaging() const { return m_no_http_messaging; } 7095 7096 /** 7097 * This option prevents the library from sending WINDOW_UPDATE for a 7098 * connection automatically. If this option is set to nonzero, the 7099 * library won't send WINDOW_UPDATE for DATA until application calls 7100 * `consume()` to indicate the consumed amount of 7101 * data. Don't use $(D submitWindowUpdate) for this purpose. 7102 * By default, this option is set to zero. 7103 */ 7104 @property void setNoAutoWindowUpdate(bool val) 7105 { 7106 if (val) m_opt_set_mask |= OptionFlags.NO_AUTO_WINDOW_UPDATE; 7107 else m_opt_set_mask |= ~OptionFlags.NO_AUTO_WINDOW_UPDATE; 7108 m_no_auto_window_update = val; 7109 } 7110 7111 /** 7112 * This option sets the Setting.MAX_CONCURRENT_STREAMS value of 7113 * remote endpoint as if it is received in SETTINGS frame. Without 7114 * specifying this option, before the local endpoint receives 7115 * Setting.MAX_CONCURRENT_STREAMS in SETTINGS frame from remote 7116 * endpoint, Setting.MAX_CONCURRENT_STREAMS is unlimited. This may 7117 * cause problem if local endpoint submits lots of requests initially 7118 * and sending them at once to the remote peer may lead to the 7119 * rejection of some requests. Specifying this option to the sensible 7120 * value, say 100, may avoid this kind of issue. This value will be 7121 * overwritten if the local endpoint receives 7122 * Setting.MAX_CONCURRENT_STREAMS from the remote endpoint. 7123 */ 7124 void setPeerMaxConcurrentStreams(uint val) 7125 { 7126 m_opt_set_mask |= OptionFlags.PEER_MAX_CONCURRENT_STREAMS; 7127 m_peer_max_concurrent_streams = val; 7128 } 7129 7130 /** 7131 * By default, libhttp2 library only handles HTTP/2 frames and does not 7132 * recognize first 24 bytes of client connection preface. This design 7133 * choice is done due to the fact that server may want to detect the 7134 * application protocol based on first few bytes on clear text 7135 * communication. But for simple servers which only speak HTTP/2, it 7136 * is easier for developers if libhttp2 library takes care of client 7137 * connection preface. 7138 * 7139 * If this option is used with nonzero |val|, libhttp2 library checks 7140 * first 24 bytes client connection preface. If it is not a valid 7141 * one, $(D Session.recv) and $(D Session.memRecv) will 7142 * return error $(D ErrorCode.BAD_PREFACE), which is fatal error. 7143 */ 7144 void setRecvClientPreface(bool val) 7145 { 7146 m_opt_set_mask |= OptionFlags.RECV_CLIENT_PREFACE; 7147 m_recv_client_preface = val; 7148 } 7149 7150 /** 7151 * By default, libhttp2 library enforces subset of HTTP Messaging rules 7152 * described in `HTTP/2 specification, section 8 7153 * <https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-8>`_. 7154 * See `HTTP Messaging`_ section for details. For those applications 7155 * who use libhttp2 library as non-HTTP use, give nonzero to |val| to 7156 * disable this enforcement. 7157 */ 7158 void setNoHTTPMessaging(bool val) 7159 { 7160 m_opt_set_mask |= OptionFlags.NO_HTTP_MESSAGING; 7161 m_no_http_messaging = val; 7162 } 7163 7164 }