1 /** 2 * Session 3 * 4 * Copyright: 5 * (C) 2012-2015 Tatsuhiro Tsujikawa 6 * (C) 2014-2015 Etienne Cimon 7 * 8 * License: 9 * Distributed under the terms of the MIT license with an additional section 1.2 of the curl/libcurl project. 10 * Consult the provided LICENSE.md file for details 11 */ 12 module libhttp2.session; 13 14 import libhttp2.constants; 15 import libhttp2.types; 16 import libhttp2.frame; 17 import libhttp2.stream; 18 import libhttp2.connector; 19 import libhttp2.deflater; 20 import libhttp2.inflater; 21 import libhttp2.buffers; 22 import libhttp2.priority_queue; 23 import libhttp2.helpers; 24 import libhttp2.huffman; 25 import core.exception : RangeError; 26 27 import memutils.circularbuffer; 28 import memutils.vector; 29 import memutils.hashmap; 30 31 import std.algorithm : min, max; 32 33 enum OptionsMask { 34 NONE = 0, 35 NO_AUTO_WINDOW_UPDATE = 1 << 0, 36 RECV_CLIENT_PREFACE = 1 << 1, 37 NO_HTTP_MESSAGING = 1 << 2, 38 } 39 40 enum OutboundState { 41 POP_ITEM, 42 SEND_DATA, 43 SEND_NO_COPY 44 } 45 46 struct ActiveOutboundItem { 47 OutboundItem item; 48 Buffers framebufs; 49 OutboundState state = OutboundState.POP_ITEM; 50 51 void reset() { 52 LOGF("send: reset http2_active_outbound_item"); 53 LOGF("send: aob.item = %s", item); 54 if(item) { 55 item.free(); 56 Mem.free(item); 57 item = null; 58 } 59 framebufs.reset(); 60 state = OutboundState.POP_ITEM; 61 } 62 } 63 64 /// Internal state when receiving incoming frame 65 enum InboundState : ubyte { 66 /* Receiving frame header */ 67 READ_CLIENT_PREFACE, 68 READ_FIRST_SETTINGS, 69 READ_HEAD, 70 READ_NBYTE, 71 READ_HEADER_BLOCK, 72 IGN_HEADER_BLOCK, 73 IGN_PAYLOAD, 74 FRAME_SIZE_ERROR, 75 READ_SETTINGS, 76 READ_GOAWAY_DEBUG, 77 EXPECT_CONTINUATION, 78 IGN_CONTINUATION, 79 READ_PAD_DATA, 80 READ_DATA, 81 IGN_DATA, 82 IGN_ALL, 83 } 84 85 struct InboundFrame { 86 Frame frame; 87 88 /* The received SETTINGS entry. The protocol says that we only cares 89 about the defined settings ID. If unknown ID is received, it is 90 ignored. We use last entry to hold minimum header table size if 91 same settings are seen multiple times. */ 92 Setting[INBOUND_NUM_IV] iva; 93 94 /// buffer pointers to small buffer, raw_sbuf 95 Buffer sbuf; 96 97 /// buffer pointers to large buffer, raw_lbuf 98 Buffer lbuf; 99 100 /// Large buffer, malloced on demand 101 ubyte[] raw_lbuf; 102 103 /* The number of entry filled in |iva| */ 104 size_t niv; 105 106 /* How many bytes we still need to receive for current frame */ 107 size_t payloadleft; 108 109 /* padding length for the current frame */ 110 size_t padlen; 111 112 InboundState state; 113 114 /* Small buffer. Currently the largest contiguous chunk to buffer 115 is frame header. We buffer part of payload, but they are smaller 116 than frame header. */ 117 ubyte[FRAME_HDLEN] raw_sbuf; 118 119 /// Returns the amount of bytes that are required by this frame 120 size_t readLength(const ubyte* input, const ubyte* last) 121 { 122 return min(cast(size_t)(last - input), payloadleft); 123 } 124 125 /* 126 * Resets iframe.sbuf and advance its mark pointer by |left| bytes. 127 */ 128 void setMark(size_t left) 129 { 130 sbuf.reset; 131 sbuf.mark += left; 132 } 133 134 size_t read(in ubyte* input, in ubyte* last) 135 { 136 import std.c.string : memcpy; 137 138 size_t readlen; 139 140 readlen = min(last - input, sbuf.markAvailable); 141 142 memcpy(sbuf.last, input, readlen); 143 sbuf.last += readlen; 144 145 return readlen; 146 } 147 148 /* 149 * Unpacks SETTINGS entry in iframe.sbuf. 150 */ 151 void unpackSetting() 152 { 153 Setting _iv; 154 _iv.unpack(sbuf[]); 155 156 size_t i; 157 158 with(Setting) switch (_iv.id) { 159 case HEADER_TABLE_SIZE: 160 case ENABLE_PUSH: 161 case MAX_CONCURRENT_STREAMS: 162 case INITIAL_WINDOW_SIZE: 163 case MAX_FRAME_SIZE: 164 case MAX_HEADER_LIST_SIZE: 165 break; 166 default: 167 LOGF("recv: ignore unknown settings id=0x%02x", _iv.id); 168 return; 169 } 170 171 for(i = 0; i < niv; ++i) { 172 if (iva[i].id == _iv.id) { 173 iva[i] = _iv; 174 break; 175 } 176 } 177 178 if (i == niv) { 179 iva[niv] = _iv; 180 niv++; 181 } 182 183 if (_iv.id == Setting.HEADER_TABLE_SIZE && _iv.value < iva[INBOUND_NUM_IV - 1].value) 184 { 185 iva[INBOUND_NUM_IV - 1] = _iv; 186 } 187 } 188 private: 189 /* 190 * Checks PADDED flags and set iframe.sbuf to read them accordingly. 191 * If padding is set, this function returns 1. If no padding is set, 192 * this function returns 0. On error, returns -1. 193 */ 194 int handlePad() 195 { 196 if (frame.hd.flags & FrameFlags.PADDED) { 197 if (frame.hd.length < 1) { 198 return -1; 199 } 200 setMark(1); 201 return 1; 202 } 203 LOGF("recv: no padding in payload"); 204 return ErrorCode.OK; 205 } 206 207 /* 208 * Computes number of padding based on flags. This function returns 209 * padlen if it succeeds, or -1. 210 */ 211 int computePad() 212 { 213 /* 1 for Pad Length field */ 214 int _padlen = sbuf.pos[0] + 1; 215 216 LOGF("recv: padlen=%d", padlen); 217 218 /* We cannot use iframe.frame.hd.length because of CONTINUATION */ 219 if (_padlen - 1 > payloadleft) { 220 return -1; 221 } 222 223 padlen = _padlen; 224 225 return _padlen; 226 } 227 228 /* 229 * This function returns the effective payload length in the data of 230 * length |readlen| when the remaning payload is |payloadleft|. The 231 * |payloadleft| does not include |readlen|. If padding was started 232 * strictly before this data chunk, this function returns -1. 233 */ 234 int effectiveReadLength(size_t _payloadleft, size_t readlen) 235 { 236 size_t trail_padlen = frame.trailPadlen(padlen); 237 238 if (trail_padlen > _payloadleft) { 239 size_t padlen; 240 padlen = trail_padlen - _payloadleft; 241 if (readlen < padlen) { 242 return -1; 243 } else { 244 return cast(int)(readlen - padlen); 245 } 246 } 247 return cast(int)readlen; 248 } 249 250 void reset() 251 { 252 /* A bit risky code, since if this function is called from Session(), we rely on the fact that 253 frame.hd.type is 0, so that no free is performed. */ 254 with (FrameType) switch (frame.hd.type) { 255 case HEADERS: 256 frame.headers.free(); 257 break; 258 case PRIORITY: 259 frame.priority.free(); 260 break; 261 case RST_STREAM: 262 frame.rst_stream.free(); 263 break; 264 case SETTINGS: 265 frame.settings.free(); 266 break; 267 case PUSH_PROMISE: 268 frame.push_promise.free(); 269 break; 270 case PING: 271 frame.ping.free(); 272 break; 273 case GOAWAY: 274 frame.goaway.free(); 275 break; 276 case WINDOW_UPDATE: 277 frame.window_update.free(); 278 break; 279 default: break; 280 } 281 282 destroy(frame); 283 284 state = InboundState.READ_HEAD; 285 286 sbuf = Buffer(raw_sbuf.ptr[0 .. raw_sbuf.sizeof]); 287 sbuf.mark += FRAME_HDLEN; 288 289 lbuf.free(); 290 lbuf = Buffer(); 291 destroy(iva); 292 payloadleft = 0; 293 padlen = 0; 294 iva[INBOUND_NUM_IV - 1].id = Setting.HEADER_TABLE_SIZE; 295 iva[INBOUND_NUM_IV - 1].value = uint.max; 296 niv = 0; 297 } 298 } 299 300 struct SettingsStorage { 301 uint header_table_size = HD_DEFAULT_MAX_BUFFER_SIZE; 302 uint enable_push = 1; 303 uint max_concurrent_streams = INITIAL_MAX_CONCURRENT_STREAMS; 304 uint initial_window_size = INITIAL_WINDOW_SIZE; 305 uint max_frame_size = MAX_FRAME_SIZE_MIN; 306 uint max_header_list_size = uint.max; 307 } 308 309 enum GoAwayFlags { 310 NONE = 0, 311 /* Flag means that connection should be terminated after sending GOAWAY. */ 312 TERM_ON_SEND = 0x1, 313 /* Flag means GOAWAY to terminate session has been sent */ 314 TERM_SENT = 0x2, 315 /* Flag means GOAWAY was sent */ 316 SENT = 0x4, 317 /* Flag means GOAWAY was received */ 318 RECV = 0x8, 319 } 320 321 enum { 322 CLIENT = false, 323 SERVER = true 324 } 325 326 class Session { 327 ~this() { 328 if (connector !is null) free(); 329 } 330 331 this(bool server, Connector callbacks, in Options options = Options.init) 332 { 333 if (server) { 334 is_server = true; 335 next_stream_id = 2; // server IDs always pair 336 } 337 else 338 next_stream_id = 1; // client IDs always impair 339 340 roots = Mem.alloc!StreamRoots(); 341 scope(failure) Mem.free(roots); 342 343 hd_inflater = Inflater(true); 344 scope(failure) hd_inflater.free(); 345 346 hd_deflater = Deflater(DEFAULT_MAX_DEFLATE_BUFFER_SIZE); 347 scope(failure) hd_deflater.free(); 348 349 ob_pq = PriorityQueue(128); 350 scope(failure) ob_pq.free(); 351 352 ob_ss_pq = PriorityQueue(128); 353 scope(failure) ob_ss_pq.free(); 354 355 ob_da_pq = PriorityQueue(128); 356 scope(failure) ob_da_pq.free(); 357 358 /* 1 for Pad Field. */ 359 aob.framebufs = Mem.alloc!Buffers(FRAMEBUF_CHUNKLEN, FRAMEBUF_MAX_NUM, 1, FRAME_HDLEN + 1); 360 scope(failure) { aob.framebufs.free(); Mem.free(aob.framebufs); } 361 362 aob.reset(); 363 364 if (options != Options.init) { 365 if ((options.opt_set_mask & OptionFlags.NO_AUTO_WINDOW_UPDATE) && options.no_auto_window_update) 366 { 367 opt_flags |= OptionsMask.NO_AUTO_WINDOW_UPDATE; 368 } 369 370 if (options.opt_set_mask & OptionFlags.PEER_MAX_CONCURRENT_STREAMS) 371 { 372 remote_settings.max_concurrent_streams = options.peer_max_concurrent_streams; 373 } 374 375 if ((options.opt_set_mask & OptionFlags.RECV_CLIENT_PREFACE) && options.recv_client_preface) 376 { 377 opt_flags |= OptionsMask.RECV_CLIENT_PREFACE; 378 } 379 380 if ((options.opt_set_mask & OptionFlags.NO_HTTP_MESSAGING) && options.no_http_messaging) 381 { 382 opt_flags |= OptionsMask.NO_HTTP_MESSAGING; 383 } 384 } 385 386 connector = callbacks; 387 388 iframe.reset(); 389 390 if (is_server && opt_flags & OptionsMask.RECV_CLIENT_PREFACE) 391 { 392 iframe.state = InboundState.READ_CLIENT_PREFACE; 393 iframe.payloadleft = CLIENT_CONNECTION_PREFACE.length; 394 } else static if (ENABLE_FIRST_SETTING_CHECK) 395 { 396 iframe.state = InboundState.READ_FIRST_SETTINGS; 397 } 398 } 399 400 /** 401 * Frees any resources allocated for $(D Session). If $(D Session) is 402 * `null`, this function does nothing. 403 */ 404 void free() { 405 if (inflight_iva) 406 Mem.free(inflight_iva); 407 roots.free(); 408 Mem.free(roots); 409 freeAllStreams(); 410 iframe.reset(); 411 ob_pq.free(); 412 ob_ss_pq.free(); 413 ob_da_pq.free(); 414 aob.reset(); 415 hd_deflater.free(); 416 hd_inflater.free(); 417 aob.framebufs.free(); 418 if (aob.framebufs) 419 Mem.free(aob.framebufs); 420 destroy(streams); 421 connector = null; 422 } 423 424 /** 425 * Sends pending frames to the remote peer. 426 * 427 * This function retrieves the highest prioritized frame from the 428 * outbound queue and sends it to the remote peer. It does this as 429 * many as possible until the user callback $(D Connector.write) returns 430 * $(D ErrorCode.WOULDBLOCK) or the outbound queue becomes empty. 431 * 432 * This function calls several $(D Connector) functions which are passed 433 * when initializing the $(D Session). Here is the simple time chart 434 * which tells when each callback is invoked: 435 * 436 * 1. Get the next frame to be sent from a priority sorted outbound queue. 437 * 438 * 2. Prepare transmission of the frame. 439 * 440 * 3. $(D Connector.onFrameFailure) may be invoked if the control frame cannot 441 * be sent because some preconditions are not met (e.g., request HEADERS 442 * cannot be sent after GOAWAY). This then aborts the following steps. 443 * 444 * 4. $(D Connector.selectPaddingLength) is invoked if the frame is HEADERS, 445 * PUSH_PROMISE or DATA. 446 * 447 * 5. If the frame is request HEADERS, the stream is opened here. 448 * 449 * 6. $(D Connector.onFrameReady) is invoked. 450 * 451 * 7. $(D Connector.write) is invoked one or more times to send the frame. 452 * 453 * 8. $(D Connector.onFrameSent) is invoked after all data is transmitted. 454 * 455 * 9. $(D Connector.onStreamExit) may be invoked if the transmission of the frame 456 * triggers closure of the stream, it is destroyed afterwards. 457 * 458 * This function returns 0 if it succeeds, or one of the following 459 * negative error codes: 460 * 461 * $(D ErrorCode.CALLBACK_FAILURE) 462 * The callback function failed. 463 */ 464 ErrorCode send() { 465 ErrorCode rv; 466 ubyte[] data; 467 int sentlen; 468 Buffers framebufs = aob.framebufs; 469 470 for (;;) { 471 rv = memSendInternal(data, false); 472 if (rv < 0) 473 return rv; 474 else if (data.length == 0) 475 return ErrorCode.OK; 476 try sentlen = connector.write(data); 477 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 478 479 if (sentlen < 0) { 480 if (cast(ErrorCode) sentlen == ErrorCode.WOULDBLOCK) { 481 /* Transmission canceled. Rewind the offset */ 482 framebufs.cur.buf.pos -= data.length; 483 return ErrorCode.OK; 484 } 485 486 return ErrorCode.CALLBACK_FAILURE; 487 } 488 489 /* Rewind the offset to the amount of unsent bytes */ 490 framebufs.cur.buf.pos -= (data.length - sentlen); 491 } 492 493 assert(false); 494 } 495 496 /** 497 * @function 498 * 499 * Returns the serialized data to send. 500 * 501 * This function behaves like `send()` except that it 502 * does not use $(D Connector.write) to transmit data. 503 * Instead, it assigns the serialized data to the given $(D ubyte[]) 504 * |data_arr|. The other callbacks are called in the same way as they are 505 * in `send()`. 506 * 507 * This function may not return all serialized data in one invocation. 508 * To get all data, call this function repeatedly until it returns an 509 * array of 0 length or one of negative error codes. 510 * 511 * The assigned |data_ar| is valid until the next call of 512 * `memSend()` or `send()`. 513 * 514 * The caller must send all data before sending the next chunk of 515 * data. 516 * 517 * This function returns an error code on failure or 0 on success 518 */ 519 ErrorCode memSend(ref ubyte[] data_arr) 520 { 521 ErrorCode rv; 522 523 rv = memSendInternal(data_arr, true); 524 if (rv < 0) { 525 return rv; 526 } 527 528 /* We have to call afterFrameSent here to handle stream 529 closure upon transmission of frames. Otherwise, END_STREAM may 530 be reached to client before we call memSend 531 again and we may get exceeding number of incoming streams. */ 532 rv = afterFrameSent(); 533 if (rv < 0) { 534 /* FATAL */ 535 assert(isFatal(rv)); 536 return rv; 537 } 538 539 return ErrorCode.OK; 540 } 541 542 /** 543 * Receives frames from the remote peer. 544 * 545 * This function receives as many frames as possible until the user 546 * callback $(D Connector.read) returns $(D ErrorCode.WOULDBLOCK). 547 * This function calls several $(D Connector) functions which are passed 548 * when initializing the $(D Session). 549 * 550 * Here is the simple time chart which tells when each callback is invoked: 551 * 552 * 1. $(D Connector.read) is invoked one or more times to receive the frame header. 553 * 554 * 2. $(D Connector.onFrameHeader) is invoked after the frame header is received. 555 * 556 * 3. If the frame is DATA frame: 557 * 558 * 1. $(D Connector.read) is invoked one or more times to receive the DATA payload. 559 * 560 * 2. $(D Connector.onDataChunk) is invoked alternatively with $(D Connector.read) 561 * for each chunk of data. 562 * 563 * 2. $(D Connector.onFrame) may be invoked if one DATA frame is completely received. 564 * 565 * 3. $(D Connector.onStreamExit) may be invoked if the reception of the frame triggers 566 * closure of the stream. 567 * 568 * 4. If the frame is the control frame: 569 * 570 * 1. $(D Connector.read) is invoked one or more times to receive the whole frame. 571 * 572 * 2. If the received frame is valid, then following actions are 573 * taken. 574 * - If the frame is either HEADERS or PUSH_PROMISE: 575 * - $(D Connector.onHeaders) is invoked first. 576 * - $(D Connector.onHeaderField) is invoked for each header fields. 577 * - $(D Connector.onFrame) is invoked after all header fields. 578 * - For other frames: 579 * - $(D Connector.onFrame) is invoked. 580 * - $(D Connector.onStreamExit) may be invoked if the reception of the frame 581 * triggers the closure of the stream. 582 * 583 * 3. $(D Connector.onInvalidFrame) may be invoked if the received frame is unpacked 584 * but is interpreted as invalid. 585 * 586 * This function returns 0 if it succeeds, or one of the following 587 * negative error codes: 588 * 589 * $(D ErrorCode.EOF) 590 * The remote peer did shutdown on the connection. 591 * $(D ErrorCode.CALLBACK_FAILURE) 592 * The callback function failed. 593 * $(D ErrorCode.BAD_PREFACE) 594 * Invalid client preface was detected. This error only returns 595 * when $(D Session) was configured as server and 596 * `setRecvClientPreface()` is used. 597 */ 598 ErrorCode recv() { 599 ubyte[INBOUND_BUFFER_LENGTH] buf; 600 while (1) { 601 int readlen; 602 readlen = callRead(buf.ptr[0 .. buf.sizeof]); 603 if (readlen > 0) { 604 // process the received data 605 int proclen = memRecv(buf[0 .. readlen]); 606 if (proclen < 0) { 607 return cast(ErrorCode)proclen; 608 } 609 assert(proclen == readlen); 610 } else if (readlen == 0 || readlen == ErrorCode.WOULDBLOCK) { 611 return ErrorCode.OK; 612 } else if (readlen == ErrorCode.EOF) { 613 return ErrorCode.EOF; 614 } else if (readlen < 0) { 615 return ErrorCode.CALLBACK_FAILURE; 616 } 617 } 618 } 619 620 /** 621 * Processes data |input| as an input from the remote endpoint. The 622 * |inlen| indicates the number of bytes in the |in|. 623 * 624 * This function behaves like $(D Session.recv) except that it 625 * does not use $(D Connector.read) to receive data; the 626 * |input| is the only data for the invocation of this function. If all 627 * bytes are processed, this function returns. The other connector 628 * are called in the same way as they are in $(D Session.recv). 629 * 630 * In the current implementation, this function always tries to 631 * process all input data unless either an error occurs or 632 * $(D ErrorCode.PAUSE) is returned from $(D Connector.onHeaderField) or 633 * $(D Connector.onDataChunk). If $(D ErrorCode.PAUSE) is used, 634 * the return value includes the number of bytes which was used to 635 * produce the data or frame for the callback. 636 * 637 * This function returns the number of processed bytes, or one of the 638 * following negative error codes: 639 * 640 * $(D ErrorCode.CALLBACK_FAILURE) 641 * The callback function failed. 642 * $(D ErrorCode.BAD_PREFACE) 643 * Invalid client preface was detected. This error only returns 644 * when $(D Session) was configured as server and 645 * `setRecvClientPreface()` is used. 646 */ 647 int memRecv(in ubyte[] input) 648 { 649 const(ubyte)* pos = input.ptr; 650 const ubyte* first = input.ptr; 651 const ubyte* last = input.ptr + input.length; 652 size_t readlen; 653 int padlen; 654 ErrorCode rv; 655 bool busy; 656 FrameHeader cont_hd; 657 Stream stream; 658 size_t pri_fieldlen; 659 660 LOGF("recv: connection recv_window_size=%d, local_window=%d", recv_window_size, local_window_size); 661 662 for (;;) { 663 with(InboundState) final switch (iframe.state) { 664 case READ_CLIENT_PREFACE: 665 readlen = min(input.length, iframe.payloadleft); 666 667 if (CLIENT_CONNECTION_PREFACE[$ - iframe.payloadleft .. $ - iframe.payloadleft + readlen] != pos[0 .. readlen]) 668 { 669 return ErrorCode.BAD_PREFACE; 670 } 671 672 iframe.payloadleft -= readlen; 673 pos += readlen; 674 675 if (iframe.payloadleft == 0) { 676 iframe.reset(); 677 iframe.state = READ_FIRST_SETTINGS; 678 } 679 680 break; 681 case READ_FIRST_SETTINGS: 682 LOGF("recv: [READ_FIRST_SETTINGS]"); 683 684 readlen = iframe.read(pos, last); 685 pos += readlen; 686 687 if (iframe.sbuf.markAvailable) { 688 return cast(int)(pos - first); 689 } 690 691 if (iframe.sbuf.pos[3] != FrameType.SETTINGS || (iframe.sbuf.pos[4] & FrameFlags.ACK)) 692 { 693 694 iframe.state = IGN_ALL; 695 696 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "SETTINGS expected"); 697 698 if (isFatal(rv)) { 699 return rv; 700 } 701 702 return cast(int)input.length; 703 } 704 705 iframe.state = READ_HEAD; 706 707 goto case READ_HEAD; 708 case READ_HEAD: { 709 bool on_frame_header_called; 710 711 LOGF("recv: [READ_HEAD]"); 712 713 readlen = iframe.read(pos, last); 714 pos += readlen; 715 716 if (iframe.sbuf.markAvailable) { 717 return cast(int)(pos - first); 718 } 719 720 iframe.frame.hd.unpack(iframe.sbuf[]); 721 iframe.payloadleft = iframe.frame.hd.length; 722 723 LOGF("recv: payloadlen=%d, type=%u, flags=0x%02x, stream_id=%d", 724 iframe.frame.hd.length, iframe.frame.hd.type, iframe.frame.hd.flags, iframe.frame.hd.stream_id); 725 726 if (iframe.frame.hd.length > local_settings.max_frame_size) { 727 LOGF("recv: length is too large %d > %u", iframe.frame.hd.length, local_settings.max_frame_size); 728 729 busy = true; 730 731 iframe.state = IGN_PAYLOAD; 732 733 rv = terminateSessionWithReason(FrameError.FRAME_SIZE_ERROR, "too large frame size"); 734 735 if (isFatal(rv)) { 736 return rv; 737 } 738 739 break; 740 } 741 742 switch (iframe.frame.hd.type) { 743 case FrameType.DATA: { 744 LOGF("recv: DATA"); 745 746 iframe.frame.hd.flags &= (FrameFlags.END_STREAM | FrameFlags.PADDED); 747 /* Check stream is open. If it is not open or closing, ignore payload. */ 748 busy = true; 749 750 rv = onDataFailFast(); 751 if (rv == ErrorCode.IGN_PAYLOAD) { 752 LOGF("recv: DATA not allowed stream_id=%d", iframe.frame.hd.stream_id); 753 iframe.state = IGN_DATA; 754 break; 755 } 756 757 if (isFatal(rv)) { 758 return rv; 759 } 760 761 rv = cast(ErrorCode)iframe.handlePad(); 762 if (rv < 0) { 763 iframe.state = IGN_DATA; 764 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "DATA: insufficient padding space"); 765 766 if (isFatal(rv)) { 767 return rv; 768 } 769 break; 770 } 771 772 if (rv == 1) { 773 iframe.state = READ_PAD_DATA; 774 break; 775 } 776 777 iframe.state = READ_DATA; 778 break; 779 } 780 case FrameType.HEADERS: 781 782 LOGF("recv: HEADERS"); 783 784 iframe.frame.hd.flags &= (FrameFlags.END_STREAM | FrameFlags.END_HEADERS | FrameFlags.PADDED | FrameFlags.PRIORITY); 785 786 rv = cast(ErrorCode)iframe.handlePad(); 787 if (rv < 0) { 788 busy = true; 789 790 iframe.state = IGN_PAYLOAD; 791 792 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "HEADERS: insufficient padding space"); 793 if (isFatal(rv)) { 794 return rv; 795 } 796 break; 797 } 798 799 if (rv == 1) { 800 iframe.state = READ_NBYTE; 801 break; 802 } 803 804 pri_fieldlen = priorityLength(iframe.frame.hd.flags); 805 806 if (pri_fieldlen > 0) { 807 if (iframe.payloadleft < pri_fieldlen) { 808 busy = true; 809 iframe.state = FRAME_SIZE_ERROR; 810 break; 811 } 812 813 iframe.state = READ_NBYTE; 814 815 iframe.setMark(pri_fieldlen); 816 break; 817 } 818 819 /* Call onFrameHeader here because processHeadersFrame() may call onHeaders callback */ 820 bool ok = callOnFrameHeader(iframe.frame.hd); 821 822 if (!ok) { 823 return ErrorCode.CALLBACK_FAILURE; 824 } 825 826 on_frame_header_called = true; 827 828 rv = processHeadersFrame(); 829 830 if (isFatal(rv)) { 831 return rv; 832 } 833 834 busy = true; 835 836 if (rv == ErrorCode.IGN_HEADER_BLOCK) { 837 iframe.state = IGN_HEADER_BLOCK; 838 break; 839 } 840 841 iframe.state = READ_HEADER_BLOCK; 842 843 break; 844 case FrameType.PRIORITY: 845 LOGF("recv: PRIORITY"); 846 847 iframe.frame.hd.flags = FrameFlags.NONE; 848 849 if (iframe.payloadleft != PRIORITY_SPECLEN) { 850 busy = true; 851 852 iframe.state = FRAME_SIZE_ERROR; 853 854 break; 855 } 856 857 iframe.state = READ_NBYTE; 858 859 iframe.setMark(PRIORITY_SPECLEN); 860 861 break; 862 case FrameType.RST_STREAM: 863 case FrameType.WINDOW_UPDATE: 864 static if (DEBUG) { 865 switch (iframe.frame.hd.type) { 866 case FrameType.RST_STREAM: 867 LOGF("recv: RST_STREAM"); 868 break; 869 case FrameType.WINDOW_UPDATE: 870 LOGF("recv: WINDOW_UPDATE"); 871 break; 872 default: break; 873 } 874 } 875 876 iframe.frame.hd.flags = FrameFlags.NONE; 877 878 if (iframe.payloadleft != 4) { 879 busy = true; 880 iframe.state = FRAME_SIZE_ERROR; 881 break; 882 } 883 884 iframe.state = READ_NBYTE; 885 886 iframe.setMark(4); 887 888 break; 889 case FrameType.SETTINGS: 890 LOGF("recv: SETTINGS"); 891 892 iframe.frame.hd.flags &= FrameFlags.ACK; 893 894 if ((iframe.frame.hd.length % FRAME_SETTINGS_ENTRY_LENGTH) || 895 ((iframe.frame.hd.flags & FrameFlags.ACK) && iframe.payloadleft > 0)) { 896 busy = true; 897 iframe.state = FRAME_SIZE_ERROR; 898 break; 899 } 900 901 iframe.state = READ_SETTINGS; 902 903 if (iframe.payloadleft) { 904 iframe.setMark(FRAME_SETTINGS_ENTRY_LENGTH); 905 break; 906 } 907 908 busy = true; 909 910 iframe.setMark(0); 911 912 break; 913 case FrameType.PUSH_PROMISE: 914 LOGF("recv: PUSH_PROMISE"); 915 916 iframe.frame.hd.flags &= (FrameFlags.END_HEADERS | FrameFlags.PADDED); 917 918 rv = cast(ErrorCode)iframe.handlePad(); 919 if (rv < 0) { 920 busy = true; 921 iframe.state = IGN_PAYLOAD; 922 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: insufficient padding space"); 923 if (isFatal(rv)) { 924 return rv; 925 } 926 break; 927 } 928 929 if (rv == 1) { 930 iframe.state = READ_NBYTE; 931 break; 932 } 933 934 if (iframe.payloadleft < 4) { 935 busy = true; 936 iframe.state = FRAME_SIZE_ERROR; 937 break; 938 } 939 940 iframe.state = READ_NBYTE; 941 942 iframe.setMark(4); 943 944 break; 945 case FrameType.PING: 946 LOGF("recv: PING"); 947 948 iframe.frame.hd.flags &= FrameFlags.ACK; 949 950 if (iframe.payloadleft != 8) { 951 busy = true; 952 iframe.state = FRAME_SIZE_ERROR; 953 break; 954 } 955 956 iframe.state = READ_NBYTE; 957 iframe.setMark(8); 958 959 break; 960 case FrameType.GOAWAY: 961 LOGF("recv: GOAWAY"); 962 963 iframe.frame.hd.flags = FrameFlags.NONE; 964 965 if (iframe.payloadleft < 8) { 966 busy = true; 967 iframe.state = FRAME_SIZE_ERROR; 968 break; 969 } 970 971 iframe.state = READ_NBYTE; 972 iframe.setMark(8); 973 974 break; 975 case FrameType.CONTINUATION: 976 LOGF("recv: unexpected CONTINUATION"); 977 978 /* Receiving CONTINUATION in this state are subject to connection error of type PROTOCOL_ERROR */ 979 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "CONTINUATION: unexpected"); 980 if (isFatal(rv)) 981 { 982 return rv; 983 } 984 985 busy = true; 986 987 iframe.state = IGN_PAYLOAD; 988 989 break; 990 default: 991 LOGF("recv: unknown frame"); 992 993 /* Silently ignore unknown frame type. */ 994 995 busy = true; 996 997 iframe.state = IGN_PAYLOAD; 998 999 break; 1000 } 1001 1002 if (!on_frame_header_called) { 1003 switch (iframe.state) { 1004 case IGN_HEADER_BLOCK: 1005 case IGN_PAYLOAD: 1006 case FRAME_SIZE_ERROR: 1007 case IGN_DATA: 1008 break; 1009 default: 1010 bool ok = callOnFrameHeader(iframe.frame.hd); 1011 1012 if (!ok) { 1013 return ErrorCode.CALLBACK_FAILURE; 1014 } 1015 } 1016 } 1017 1018 break; 1019 } 1020 case READ_NBYTE: 1021 LOGF("recv: [READ_NBYTE]"); 1022 1023 readlen = iframe.read(pos, last); 1024 pos += readlen; 1025 iframe.payloadleft -= readlen; 1026 1027 LOGF("recv: readlen=%d, payloadleft=%d, left=%d, type=%s", readlen, iframe.payloadleft, iframe.sbuf.markAvailable, iframe.frame.hd.type); 1028 1029 if (iframe.sbuf.markAvailable) { 1030 return cast(int)(pos - first); 1031 } 1032 1033 switch (iframe.frame.hd.type) { 1034 case FrameType.HEADERS: 1035 if (iframe.padlen == 0 && (iframe.frame.hd.flags & FrameFlags.PADDED)) { 1036 padlen = iframe.computePad(); 1037 if (padlen < 0) { 1038 busy = true; 1039 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "HEADERS: invalid padding"); 1040 if (isFatal(rv)) { 1041 return rv; 1042 } 1043 iframe.state = IGN_PAYLOAD; 1044 break; 1045 } 1046 iframe.frame.headers.padlen = padlen; 1047 1048 pri_fieldlen = priorityLength(iframe.frame.hd.flags); 1049 if (pri_fieldlen > 0) { 1050 if (iframe.payloadleft < pri_fieldlen) { 1051 busy = true; 1052 iframe.state = FRAME_SIZE_ERROR; 1053 break; 1054 } 1055 iframe.state = READ_NBYTE; 1056 iframe.setMark(pri_fieldlen); 1057 break; 1058 } else { 1059 /* Truncate buffers used for padding spec */ 1060 iframe.setMark(0); 1061 } 1062 } 1063 1064 rv = processHeadersFrame(); 1065 if (isFatal(rv)) { 1066 return rv; 1067 } 1068 1069 busy = true; 1070 1071 if (rv == ErrorCode.IGN_HEADER_BLOCK) { 1072 iframe.state = IGN_HEADER_BLOCK; 1073 break; 1074 } 1075 1076 iframe.state = READ_HEADER_BLOCK; 1077 1078 break; 1079 case FrameType.PRIORITY: 1080 rv = processPriorityFrame(); 1081 if (isFatal(rv)) { 1082 return rv; 1083 } 1084 1085 iframe.reset(); 1086 1087 break; 1088 case FrameType.RST_STREAM: 1089 rv = processRstStreamFrame(); 1090 if (isFatal(rv)) { 1091 return rv; 1092 } 1093 1094 iframe.reset(); 1095 1096 break; 1097 case FrameType.PUSH_PROMISE: 1098 if (iframe.padlen == 0 && (iframe.frame.hd.flags & FrameFlags.PADDED)) { 1099 padlen = iframe.computePad(); 1100 if (padlen < 0) { 1101 busy = true; 1102 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: invalid padding"); 1103 if (isFatal(rv)) { 1104 return rv; 1105 } 1106 iframe.state = IGN_PAYLOAD; 1107 break; 1108 } 1109 1110 iframe.frame.push_promise.padlen = padlen; 1111 1112 if (iframe.payloadleft < 4) { 1113 busy = true; 1114 iframe.state = FRAME_SIZE_ERROR; 1115 break; 1116 } 1117 1118 iframe.state = READ_NBYTE; 1119 1120 iframe.setMark(4); 1121 1122 break; 1123 } 1124 1125 rv = processPushPromiseFrame(); 1126 if (isFatal(rv)) { 1127 return rv; 1128 } 1129 1130 busy = true; 1131 1132 if (rv == ErrorCode.IGN_HEADER_BLOCK) { 1133 iframe.state = IGN_HEADER_BLOCK; 1134 break; 1135 } 1136 1137 iframe.state = READ_HEADER_BLOCK; 1138 1139 break; 1140 case FrameType.PING: 1141 rv = processPingFrame(); 1142 if (isFatal(rv)) { 1143 return rv; 1144 } 1145 1146 iframe.reset(); 1147 1148 break; 1149 case FrameType.GOAWAY: { 1150 size_t debuglen; 1151 1152 /* 8 is Last-stream-ID + Error Code */ 1153 debuglen = iframe.frame.hd.length - 8; 1154 1155 if (debuglen > 0) { 1156 iframe.raw_lbuf = Mem.alloc!(ubyte[])(debuglen); 1157 iframe.lbuf = Buffer(iframe.raw_lbuf); 1158 } 1159 1160 busy = true; 1161 1162 iframe.state = READ_GOAWAY_DEBUG; 1163 1164 break; 1165 } 1166 case FrameType.WINDOW_UPDATE: 1167 rv = processWindowUpdateFrame(); 1168 if (isFatal(rv)) { 1169 return rv; 1170 } 1171 1172 iframe.reset(); 1173 1174 break; 1175 default: 1176 /* This is unknown frame */ 1177 iframe.reset(); 1178 1179 break; 1180 } 1181 break; 1182 case READ_HEADER_BLOCK: 1183 case IGN_HEADER_BLOCK: { 1184 int data_readlen; 1185 static if (DEBUG) { 1186 if (iframe.state == READ_HEADER_BLOCK) { 1187 LOGF("recv: [READ_HEADER_BLOCK]"); 1188 } else { 1189 LOGF("recv: [IGN_HEADER_BLOCK]"); 1190 } 1191 } 1192 1193 readlen = iframe.readLength(pos, last); 1194 1195 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft - readlen); 1196 1197 data_readlen = iframe.effectiveReadLength(iframe.payloadleft - readlen, readlen); 1198 1199 if (data_readlen >= 0) { 1200 size_t trail_padlen; 1201 size_t hd_proclen = 0; 1202 trail_padlen = iframe.frame.trailPadlen(iframe.padlen); 1203 LOGF("recv: block final=%d", (iframe.frame.hd.flags & FrameFlags.END_HEADERS) && iframe.payloadleft - data_readlen == trail_padlen); 1204 1205 rv = inflateHeaderBlock(iframe.frame, hd_proclen, cast(ubyte[])pos[0 .. data_readlen], 1206 (iframe.frame.hd.flags & FrameFlags.END_HEADERS) && iframe.payloadleft - data_readlen == trail_padlen, 1207 iframe.state == READ_HEADER_BLOCK); 1208 1209 if (isFatal(rv)) { 1210 return rv; 1211 } 1212 1213 if (rv == ErrorCode.PAUSE) { 1214 pos += hd_proclen; 1215 iframe.payloadleft -= hd_proclen; 1216 1217 return cast(int)(pos - first); 1218 } 1219 1220 if (rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) { 1221 /* The application says no more headers. We decompress the 1222 rest of the header block but not invoke on_header_callback 1223 and on_frame_recv_callback. */ 1224 pos += hd_proclen; 1225 iframe.payloadleft -= hd_proclen; 1226 1227 addRstStream(iframe.frame.hd.stream_id, FrameError.INTERNAL_ERROR); 1228 busy = true; 1229 iframe.state = IGN_HEADER_BLOCK; 1230 break; 1231 } 1232 1233 pos += readlen; 1234 iframe.payloadleft -= readlen; 1235 1236 if (rv == ErrorCode.HEADER_COMP) { 1237 /* GOAWAY is already issued */ 1238 if (iframe.payloadleft == 0) { 1239 iframe.reset(); 1240 } else { 1241 busy = true; 1242 iframe.state = IGN_PAYLOAD; 1243 } 1244 break; 1245 } 1246 } else { 1247 pos += readlen; 1248 iframe.payloadleft -= readlen; 1249 } 1250 1251 if (iframe.payloadleft) { 1252 break; 1253 } 1254 1255 if ((iframe.frame.hd.flags & FrameFlags.END_HEADERS) == 0) { 1256 1257 iframe.setMark(FRAME_HDLEN); 1258 1259 iframe.padlen = 0; 1260 1261 if (iframe.state == READ_HEADER_BLOCK) 1262 iframe.state = EXPECT_CONTINUATION; 1263 else 1264 iframe.state = IGN_CONTINUATION; 1265 1266 } else { 1267 if (iframe.state == READ_HEADER_BLOCK) { 1268 rv = afterHeaderBlockReceived(); 1269 if (isFatal(rv)) { 1270 return rv; 1271 } 1272 } 1273 iframe.reset(); 1274 } 1275 break; 1276 } 1277 case IGN_PAYLOAD: 1278 LOGF("recv: [IGN_PAYLOAD]"); 1279 1280 readlen = iframe.readLength(pos, last); 1281 iframe.payloadleft -= readlen; 1282 pos += readlen; 1283 1284 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1285 1286 if (iframe.payloadleft) { 1287 break; 1288 } 1289 1290 switch (iframe.frame.hd.type) { 1291 case FrameType.HEADERS: 1292 case FrameType.PUSH_PROMISE: 1293 case FrameType.CONTINUATION: 1294 /* Mark inflater bad so that we won't perform further decoding */ 1295 hd_inflater.ctx.bad = 1; 1296 break; 1297 default: 1298 break; 1299 } 1300 1301 iframe.reset(); 1302 1303 break; 1304 case FRAME_SIZE_ERROR: 1305 LOGF("recv: [FRAME_SIZE_ERROR]"); 1306 1307 rv = terminateSession(FrameError.FRAME_SIZE_ERROR); 1308 if (isFatal(rv)) { 1309 return rv; 1310 } 1311 1312 busy = true; 1313 1314 iframe.state = IGN_PAYLOAD; 1315 1316 break; 1317 case READ_SETTINGS: 1318 LOGF("recv: [READ_SETTINGS]"); 1319 1320 readlen = iframe.read(pos, last); 1321 iframe.payloadleft -= readlen; 1322 pos += readlen; 1323 1324 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1325 1326 if (iframe.sbuf.markAvailable) { 1327 break; 1328 } 1329 1330 if (readlen > 0) 1331 iframe.unpackSetting(); 1332 1333 if (iframe.payloadleft) { 1334 iframe.setMark(FRAME_SETTINGS_ENTRY_LENGTH); 1335 break; 1336 } 1337 1338 rv = processSettingsFrame(); 1339 1340 if (isFatal(rv)) { 1341 return rv; 1342 } 1343 1344 iframe.reset(); 1345 1346 break; 1347 case READ_GOAWAY_DEBUG: 1348 LOGF("recv: [READ_GOAWAY_DEBUG]"); 1349 1350 readlen = iframe.readLength(pos, last); 1351 1352 iframe.lbuf.last[0 .. readlen] = pos[0 .. readlen]; 1353 iframe.lbuf.last += readlen; 1354 1355 iframe.payloadleft -= readlen; 1356 pos += readlen; 1357 1358 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1359 1360 if (iframe.payloadleft) { 1361 assert(iframe.lbuf.available > 0); 1362 1363 break; 1364 } 1365 1366 rv = processGoAwayFrame(); 1367 1368 if (isFatal(rv)) { 1369 return rv; 1370 } 1371 1372 iframe.reset(); 1373 1374 break; 1375 case EXPECT_CONTINUATION: 1376 case IGN_CONTINUATION: 1377 static if (DEBUG) { 1378 if (iframe.state == EXPECT_CONTINUATION) { 1379 LOGF("recv: [EXPECT_CONTINUATION]"); 1380 } else { 1381 LOGF("recv: [IGN_CONTINUATION]"); 1382 } 1383 } 1384 1385 readlen = iframe.read(pos, last); 1386 pos += readlen; 1387 1388 if (iframe.sbuf.markAvailable) { 1389 return cast(int)(pos - first); 1390 } 1391 1392 cont_hd.unpack(iframe.sbuf.pos); 1393 iframe.payloadleft = cont_hd.length; 1394 1395 LOGF("recv: payloadlen=%d, type=%u, flags=0x%02x, stream_id=%d", cont_hd.length, cont_hd.type, cont_hd.flags, cont_hd.stream_id); 1396 1397 if (cont_hd.type != FrameType.CONTINUATION || 1398 cont_hd.stream_id != iframe.frame.hd.stream_id) { 1399 LOGF("recv: expected stream_id=%d, type=%d, but got stream_id=%d, type=%d", 1400 iframe.frame.hd.stream_id, FrameType.CONTINUATION, cont_hd.stream_id, cont_hd.type); 1401 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "unexpected non-CONTINUATION frame or stream_id is invalid"); 1402 if (isFatal(rv)) { 1403 return rv; 1404 } 1405 1406 busy = true; 1407 1408 iframe.state = IGN_PAYLOAD; 1409 1410 break; 1411 } 1412 1413 /* CONTINUATION won't bear FrameFlags.PADDED flag */ 1414 iframe.frame.hd.flags |= cont_hd.flags & FrameFlags.END_HEADERS; 1415 iframe.frame.hd.length += cont_hd.length; 1416 1417 busy = true; 1418 1419 if (iframe.state == EXPECT_CONTINUATION) { 1420 iframe.state = READ_HEADER_BLOCK; 1421 1422 bool ok = callOnFrameHeader(cont_hd); 1423 1424 if (!ok) { 1425 return ErrorCode.CALLBACK_FAILURE; 1426 } 1427 } else { 1428 iframe.state = IGN_HEADER_BLOCK; 1429 } 1430 1431 break; 1432 case READ_PAD_DATA: 1433 LOGF("recv: [READ_PAD_DATA]"); 1434 1435 readlen = iframe.read(pos, last); 1436 pos += readlen; 1437 iframe.payloadleft -= readlen; 1438 1439 LOGF("recv: readlen=%d, payloadleft=%d, left=%d", readlen, iframe.payloadleft, iframe.sbuf.markAvailable); 1440 1441 if (iframe.sbuf.markAvailable) { 1442 return cast(int)(pos - first); 1443 } 1444 1445 /* Pad Length field is subject to flow control */ 1446 rv = updateRecvConnectionWindowSize(readlen); 1447 if (isFatal(rv)) { 1448 return rv; 1449 } 1450 1451 /* Pad Length field is consumed immediately */ 1452 rv = consume(iframe.frame.hd.stream_id, readlen); 1453 1454 if (isFatal(rv)) { 1455 return rv; 1456 } 1457 1458 stream = getStream(iframe.frame.hd.stream_id); 1459 if (stream) 1460 updateRecvStreamWindowSize(stream, readlen, iframe.payloadleft || (iframe.frame.hd.flags & FrameFlags.END_STREAM) == 0); 1461 1462 busy = true; 1463 1464 padlen = iframe.computePad(); 1465 if (padlen < 0) { 1466 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "DATA: invalid padding"); 1467 if (isFatal(rv)) { 1468 return rv; 1469 } 1470 iframe.state = IGN_DATA; 1471 break; 1472 } 1473 1474 iframe.frame.data.padlen = padlen; 1475 1476 iframe.state = READ_DATA; 1477 1478 break; 1479 case READ_DATA: 1480 LOGF("recv: [READ_DATA]"); 1481 1482 readlen = iframe.readLength(pos, last); 1483 iframe.payloadleft -= readlen; 1484 pos += readlen; 1485 1486 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1487 1488 if (readlen > 0) { 1489 int data_readlen; 1490 1491 rv = updateRecvConnectionWindowSize(readlen); 1492 if (isFatal(rv)) { 1493 return rv; 1494 } 1495 1496 stream = getStream(iframe.frame.hd.stream_id); 1497 if (stream) 1498 updateRecvStreamWindowSize(stream, readlen, iframe.payloadleft || (iframe.frame.hd.flags & FrameFlags.END_STREAM) == 0); 1499 1500 data_readlen = iframe.effectiveReadLength(iframe.payloadleft, readlen); 1501 1502 padlen = cast(int)(readlen - data_readlen); 1503 1504 if (padlen > 0) { 1505 /* Padding is considered as "consumed" immediately */ 1506 rv = consume(iframe.frame.hd.stream_id, padlen); 1507 1508 if (isFatal(rv)) { 1509 return rv; 1510 } 1511 } 1512 1513 LOGF("recv: data_readlen=%d", data_readlen); 1514 1515 if (stream && data_readlen > 0) { 1516 if (isHTTPMessagingEnabled()) { 1517 if (!stream.onDataChunk(data_readlen)) { 1518 addRstStream(iframe.frame.hd.stream_id, FrameError.PROTOCOL_ERROR); 1519 busy = true; 1520 iframe.state = IGN_DATA; 1521 break; 1522 } 1523 } 1524 1525 ubyte[] data_nopad = cast(ubyte[])(pos - readlen)[0 .. data_readlen]; 1526 FrameFlags flags = iframe.frame.hd.flags; 1527 int stream_id = iframe.frame.hd.stream_id; 1528 bool pause; 1529 bool ok; 1530 try ok = connector.onDataChunk(flags, stream_id, data_nopad, pause); 1531 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 1532 1533 if (pause) { 1534 return cast(int)(pos - first); 1535 } 1536 1537 if (!ok) { 1538 return ErrorCode.CALLBACK_FAILURE; 1539 } 1540 1541 } 1542 } 1543 1544 if (iframe.payloadleft) { 1545 break; 1546 } 1547 1548 rv = processDataFrame(); 1549 if (isFatal(rv)) { 1550 return rv; 1551 } 1552 1553 iframe.reset(); 1554 1555 break; 1556 case IGN_DATA: 1557 LOGF("recv: [IGN_DATA]"); 1558 1559 readlen = iframe.readLength(pos, last); 1560 iframe.payloadleft -= readlen; 1561 pos += readlen; 1562 1563 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1564 1565 if (readlen > 0) { 1566 /* Update connection-level flow control window for ignored DATA frame too */ 1567 rv = updateRecvConnectionWindowSize(readlen); 1568 if (isFatal(rv)) { 1569 return rv; 1570 } 1571 1572 if (opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE) { 1573 1574 /* Ignored DATA is considered as "consumed" immediately. */ 1575 rv = updateConnectionConsumedSize(readlen); 1576 1577 if (isFatal(rv)) { 1578 return rv; 1579 } 1580 } 1581 } 1582 1583 if (iframe.payloadleft) { 1584 break; 1585 } 1586 1587 iframe.reset(); 1588 1589 break; 1590 case IGN_ALL: 1591 return cast(int)input.length; 1592 } 1593 1594 if (!busy && pos == last) { 1595 break; 1596 } 1597 1598 busy = false; 1599 } 1600 1601 assert(pos == last); 1602 1603 return cast(int)(pos - first); 1604 } 1605 1606 /** 1607 * Puts back previously deferred DATA frame in the stream |stream_id| 1608 * to the outbound queue. 1609 * 1610 * This function returns 0 if it succeeds, or one of the following 1611 * negative error codes: 1612 * 1613 * $(D ErrorCode.INVALID_ARGUMENT) 1614 * The stream does not exist; or no deferred data exist. 1615 */ 1616 ErrorCode resumeData(int stream_id) 1617 { 1618 Stream stream = getStream(stream_id); 1619 1620 if (!stream || !stream.isItemDeferred()) 1621 return ErrorCode.INVALID_ARGUMENT; 1622 1623 stream.resumeDeferredItem(StreamFlags.DEFERRED_USER, this); 1624 return ErrorCode.OK; 1625 } 1626 1627 /** 1628 * Returns true value if $(D Session) wants to receive data from the 1629 * remote peer. 1630 * 1631 * If both `wantRead()` and `wantWrite()` return false, the application should 1632 * drop the connection. 1633 */ 1634 bool wantRead() 1635 { 1636 size_t num_active_streams; 1637 1638 /* If this flag is set, we don't want to read. The application should drop the connection. */ 1639 if (goaway_flags & GoAwayFlags.TERM_SENT) { 1640 return false; 1641 } 1642 1643 num_active_streams = getNumActiveStreams(); 1644 1645 /* Unless termination GOAWAY is sent or received, we always want to read incoming frames. */ 1646 if (num_active_streams > 0) { 1647 return true; 1648 } 1649 1650 /* If there is no active streams and GOAWAY has been sent or received, we are done with this session. */ 1651 return (goaway_flags & (GoAwayFlags.SENT | GoAwayFlags.RECV)) == 0; 1652 } 1653 1654 /** 1655 * Returns true value if $(D Session) wants to send data to the remote 1656 * peer. 1657 * 1658 * If both `wantRead()` and `wantWrite()` return false, the application should 1659 * drop the connection. 1660 */ 1661 bool wantWrite() 1662 { 1663 /* If these flag is set, we don't want to write any data. The application should drop the connection. */ 1664 if (goaway_flags & GoAwayFlags.TERM_SENT) 1665 { 1666 return false; 1667 } 1668 1669 /* 1670 * Unless termination GOAWAY is sent or received, we want to write 1671 * frames if there is pending ones. If pending frame is request/push 1672 * response HEADERS and concurrent stream limit is reached, we don't 1673 * want to write them. 1674 */ 1675 1676 if (!aob.item && ob_pq.empty && 1677 (ob_da_pq.empty || remote_window_size == 0) && 1678 (ob_ss_pq.empty || isOutgoingConcurrentStreamsMax())) 1679 { 1680 return false; 1681 } 1682 1683 /* If there is no active streams and GOAWAY has been sent or received, we are done with this session. */ 1684 return (goaway_flags & (GoAwayFlags.SENT | GoAwayFlags.RECV)) == 0; 1685 } 1686 1687 /** 1688 * Returns stream_user_data for the stream |stream_id|. The 1689 * stream_user_data is provided by `submitRequest()`, 1690 * `submitHeaders()` or `setStreamUserData()`. 1691 * Unless it is set using `setStreamUserData()`, if the stream is 1692 * initiated by the remote endpoint, stream_user_data is always 1693 * `null`. If the stream does not exist, this function returns 1694 * `null`. 1695 */ 1696 void* getStreamUserData(int stream_id) { 1697 Stream stream = getStream(stream_id); 1698 if (stream) { 1699 return stream.userData; 1700 } else { 1701 return null; 1702 } 1703 } 1704 1705 /** 1706 * Sets the |stream_user_data| to the stream denoted by the 1707 * |stream_id|. If a stream user data is already set to the stream, 1708 * it is replaced with the |stream_user_data|. It is valid to specify 1709 * `null` in the |stream_user_data|, which nullifies the associated 1710 * data pointer. 1711 * 1712 * It is valid to set the |stream_user_data| to the stream reserved by 1713 * PUSH_PROMISE frame. 1714 * 1715 * This function returns 0 if it succeeds, or one of following 1716 * negative error codes: 1717 * 1718 * $(D ErrorCode.INVALID_ARGUMENT) 1719 * The stream does not exist 1720 */ 1721 ErrorCode setStreamUserData(int stream_id, void* stream_user_data){ 1722 Stream stream = getStream(stream_id); 1723 if (!stream) 1724 return ErrorCode.INVALID_ARGUMENT; 1725 stream.userData = stream_user_data; 1726 return ErrorCode.OK; 1727 } 1728 1729 /** 1730 * Returns the number of frames in the outbound queue. This does not 1731 * include the deferred DATA frames. 1732 */ 1733 size_t getOutboundQueueSize() { 1734 return ob_pq.length + ob_ss_pq.length + ob_da_pq.length; 1735 } 1736 1737 /** 1738 * Returns the number of DATA payload in bytes received without 1739 * WINDOW_UPDATE transmission for the stream |stream_id|. The local 1740 * (receive) window size can be adjusted by 1741 * $(D submitWindowUpdate). This function takes into account 1742 * that and returns effective data length. In particular, if the 1743 * local window size is reduced by submitting negative 1744 * window_size_increment with $(D submitWindowUpdate), this 1745 * function returns the number of bytes less than actually received. 1746 * 1747 * This function returns -1 if it fails. 1748 */ 1749 int getStreamEffectiveRecvDataLength(int stream_id) 1750 { 1751 Stream stream = getStream(stream_id); 1752 if (!stream) 1753 return -1; 1754 return stream.recvWindowSize < 0 ? 0 : stream.recvWindowSize; 1755 } 1756 1757 /** 1758 * Returns the local (receive) window size for the stream |stream_id|. 1759 * The local window size can be adjusted by 1760 * $(D submitWindowUpdate). This function takes into account 1761 * that and returns effective window size. 1762 * 1763 * This function returns -1 if it fails. 1764 */ 1765 int getStreamEffectiveLocalWindowSize(int stream_id) 1766 { 1767 Stream stream = getStream(stream_id); 1768 if (!stream) 1769 return -1; 1770 return stream.localWindowSize; 1771 } 1772 1773 /** 1774 * Returns the number of DATA payload in bytes received without 1775 * WINDOW_UPDATE transmission for a connection. The local (receive) 1776 * window size can be adjusted by $(D submitWindowUpdate). 1777 * This function takes into account that and returns effective data 1778 * length. In particular, if the local window size is reduced by 1779 * submitting negative window_size_increment with 1780 * $(D submitWindowUpdate), this function returns the number 1781 * of bytes less than actually received. 1782 * 1783 * This function returns -1 if it fails. 1784 */ 1785 int getEffectiveRecvDataLength() 1786 { 1787 return recv_window_size < 0 ? 0 : recv_window_size; 1788 } 1789 1790 /** 1791 * Returns the local (receive) window size for a connection. The 1792 * local window size can be adjusted by 1793 * $(D submitWindowUpdate). This function takes into account 1794 * that and returns effective window size. 1795 * 1796 * This function returns -1 if it fails. 1797 */ 1798 int getEffectiveLocalWindowSize() 1799 { 1800 return local_window_size; 1801 } 1802 1803 /** 1804 * Returns the remote window size for a given stream |stream_id|. 1805 * 1806 * This is the amount of flow-controlled payload (e.g., DATA) that the 1807 * local endpoint can send without stream level WINDOW_UPDATE. There 1808 * is also connection level flow control, so the effective size of 1809 * payload that the local endpoint can actually send is 1810 * min(getStreamRemoteWindowSize(), getRemoteWindowSize()). 1811 * 1812 * This function returns -1 if it fails. 1813 */ 1814 int getStreamRemoteWindowSize(int stream_id) 1815 { 1816 Stream stream = getStream(stream_id); 1817 if (!stream) 1818 return -1; 1819 1820 /* stream.remoteWindowSize can be negative when Setting.INITIAL_WINDOW_SIZE is changed. */ 1821 return max(0, stream.remoteWindowSize); 1822 } 1823 1824 /** 1825 * Returns the remote window size for a connection. 1826 * 1827 * This function always succeeds. 1828 */ 1829 int getRemoteWindowSize() { 1830 return remote_window_size; 1831 } 1832 1833 /** 1834 * Returns 1 if local peer half closed the given stream |stream_id|. 1835 * Returns 0 if it did not. Returns -1 if no such stream exists. 1836 */ 1837 int getStreamLocalClose(int stream_id) 1838 { 1839 Stream stream = getStream(stream_id); 1840 1841 if (!stream) 1842 return -1; 1843 1844 return (stream.shutFlags & ShutdownFlag.WR) != 0; 1845 } 1846 1847 /** 1848 * Returns 1 if remote peer half closed the given stream |stream_id|. 1849 * Returns 0 if it did not. Returns -1 if no such stream exists. 1850 */ 1851 int getStreamRemoteClose(int stream_id) 1852 { 1853 Stream stream = getStream(stream_id); 1854 1855 if (!stream) 1856 return -1; 1857 1858 return (stream.shutFlags & ShutdownFlag.RD) != 0; 1859 } 1860 1861 /** 1862 * Signals the session so that the connection should be terminated. 1863 * 1864 * The last stream ID is the minimum value between the stream ID of a 1865 * stream for which $(D Connector.onFrame) was called 1866 * most recently and the last stream ID we have sent to the peer 1867 * previously. 1868 * 1869 * The |error_code| is the error code of this GOAWAY frame. The 1870 * pre-defined error code is one of $(D FrameError). 1871 * 1872 * After the transmission, both `wantRead()` and 1873 * `wantWrite()` return 0. 1874 * 1875 * This function should be called when the connection should be 1876 * terminated after sending GOAWAY. If the remaining streams should 1877 * be processed after GOAWAY, use `submitGoAway()` instead. 1878 */ 1879 ErrorCode terminateSession(FrameError error_code) 1880 { 1881 return terminateSession(last_proc_stream_id, error_code, null); 1882 } 1883 1884 1885 /** 1886 * Signals the session so that the connection should be terminated. 1887 * 1888 * This function behaves like $(D Session.terminateSession), 1889 * but the last stream ID can be specified by the application for fine 1890 * grained control of stream. The HTTP/2 specification does not allow 1891 * last_stream_id to be increased. So the actual value sent as 1892 * last_stream_id is the minimum value between the given 1893 * |last_stream_id| and the last_stream_id we have previously sent to 1894 * the peer. 1895 * 1896 * The |last_stream_id| is peer's stream ID or 0. So if $(D Session) is 1897 * initialized as client, |last_stream_id| must be even or 0. If 1898 * $(D Session) is initialized as server, |last_stream_id| must be odd or 1899 * 0. 1900 * 1901 * This function returns 0 if it succeeds, or one of the following 1902 * negative error codes: 1903 * 1904 * $(D ErrorCode.INVALID_ARGUMENT) 1905 * The |last_stream_id| is invalid. 1906 */ 1907 ErrorCode terminateSession(int last_stream_id, FrameError error_code) { 1908 return terminateSession(last_stream_id, error_code, null); 1909 } 1910 1911 /** 1912 * Returns the value of SETTINGS |id| notified by a remote endpoint. 1913 * The |id| must be one of values defined in $(D SettingsID). 1914 */ 1915 uint getRemoteSettings(SettingsID id) { 1916 switch (id) { 1917 case Setting.HEADER_TABLE_SIZE: 1918 return remote_settings.header_table_size; 1919 case Setting.ENABLE_PUSH: 1920 return remote_settings.enable_push; 1921 case Setting.MAX_CONCURRENT_STREAMS: 1922 return remote_settings.max_concurrent_streams; 1923 case Setting.INITIAL_WINDOW_SIZE: 1924 return remote_settings.initial_window_size; 1925 case Setting.MAX_FRAME_SIZE: 1926 return remote_settings.max_frame_size; 1927 case Setting.MAX_HEADER_LIST_SIZE: 1928 return remote_settings.max_header_list_size; 1929 default: return -1; 1930 } 1931 } 1932 1933 /** 1934 * Tells the $(D Session) that next stream ID is |next_stream_id|. The 1935 * |next_stream_id| must be equal or greater than the value returned 1936 * by `getNextStreamID()`. 1937 * 1938 * This function returns 0 if it succeeds, or one of the following 1939 * negative error codes: 1940 * 1941 * $(D ErrorCode.INVALID_ARGUMENT) 1942 * The |next_stream_id| is strictly less than the value 1943 * `getNextStreamID()` returns. 1944 */ 1945 ErrorCode setNextStreamID(int _next_stream_id) 1946 { 1947 if (_next_stream_id < 0 || next_stream_id > cast(uint)next_stream_id) { 1948 return ErrorCode.INVALID_ARGUMENT; 1949 } 1950 1951 next_stream_id = _next_stream_id; 1952 1953 return ErrorCode.OK; 1954 } 1955 1956 /** 1957 * Returns the next outgoing stream ID. Notice that return type is 1958 * uint. If we run out of stream ID for this session, this 1959 * function returns 1 << 31. 1960 */ 1961 uint getNextStreamID() 1962 { 1963 return next_stream_id; 1964 } 1965 1966 /** 1967 * Tells the $(D Session) that |size| bytes for a stream denoted by 1968 * |stream_id| were consumed by application and are ready to 1969 * WINDOW_UPDATE. The consumed bytes are counted towards both connection and 1970 * stream level WINDOW_UPDATE (see `consumeConnection` and `consumeStream`). 1971 * This function is intended to be used without 1972 * automatic window update (see $(D Options.setNoAutoWindowUpdate). 1973 * 1974 * This function returns 0 if it succeeds, or one of the following 1975 * negative error codes: 1976 * 1977 * $(D ErrorCode.INVALID_ARGUMENT) 1978 * The |stream_id| is 0. 1979 * $(D ErrorCode.INVALID_STATE) 1980 * Automatic WINDOW_UPDATE is not disabled. 1981 */ 1982 ErrorCode consume(int stream_id, size_t size) { 1983 ErrorCode rv; 1984 Stream stream; 1985 1986 if (stream_id == 0) { 1987 return ErrorCode.INVALID_ARGUMENT; 1988 } 1989 1990 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) { 1991 return ErrorCode.INVALID_STATE; 1992 } 1993 1994 rv = updateConnectionConsumedSize(size); 1995 1996 if (isFatal(rv)) { 1997 return rv; 1998 } 1999 2000 stream = getStream(stream_id); 2001 2002 if (!stream) 2003 return ErrorCode.OK; 2004 2005 rv = updateStreamConsumedSize(stream, size); 2006 2007 if (isFatal(rv)) { 2008 return rv; 2009 } 2010 2011 return ErrorCode.OK; 2012 } 2013 2014 /** 2015 * Like $(D consume), but this only tells library that 2016 * |size| bytes were consumed only for connection level. Note that 2017 * HTTP/2 maintains connection and stream level flow control windows 2018 * independently. 2019 * 2020 * This function returns 0 if it succeeds, or one of the following 2021 * negative error codes: 2022 * 2023 * $(D ErrorCode.INVALID_STATE) 2024 * Automatic WINDOW_UPDATE is not disabled. 2025 */ 2026 ErrorCode consumeConnection(size_t size) 2027 { 2028 ErrorCode rv; 2029 2030 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) 2031 return ErrorCode.INVALID_STATE; 2032 rv = updateConnectionConsumedSize(size); 2033 if (isFatal(rv)) 2034 return rv; 2035 return ErrorCode.OK; 2036 } 2037 2038 /** 2039 * @function 2040 * 2041 * Like $(D consume), but this only tells library that 2042 * |size| bytes were consumed only for stream denoted by |stream_id|. 2043 * Note that HTTP/2 maintains connection and stream level flow control 2044 * windows independently. 2045 * 2046 * This function returns 0 if it succeeds, or one of the following 2047 * negative error codes: 2048 * 2049 * $(D ErrorCode.INVALID_ARGUMENT) 2050 * The |stream_id| is 0. 2051 * $(D ErrorCode.INVALID_STATE) 2052 * Automatic WINDOW_UPDATE is not disabled. 2053 */ 2054 ErrorCode consumeStream(int stream_id, size_t size) 2055 { 2056 ErrorCode rv; 2057 Stream stream; 2058 2059 if (stream_id == 0) 2060 return ErrorCode.INVALID_ARGUMENT; 2061 2062 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) 2063 return ErrorCode.INVALID_STATE; 2064 2065 stream = getStream(stream_id); 2066 2067 if (!stream) 2068 return ErrorCode.OK; 2069 2070 rv = updateStreamConsumedSize(stream, size); 2071 2072 if (isFatal(rv)) { 2073 return rv; 2074 } 2075 2076 return ErrorCode.OK; 2077 2078 } 2079 2080 /** 2081 * Performs post-process of HTTP Upgrade request. This function can 2082 * be called from both client and server, but the behavior is very 2083 * different in each other. 2084 * 2085 * If called from client side, the |settings_payload| must be the 2086 * value sent in `HTTP2-Settings` header field and must be decoded 2087 * by base64url decoder. The |settings_payload| is unpacked and its 2088 * setting values will be submitted using $(D submitSettings). 2089 * This means that the client application code does not need to submit 2090 * SETTINGS by itself. The stream with stream ID=1 is opened and the 2091 * |stream_user_data| is used for its stream_user_data. The opened 2092 * stream becomes half-closed (local) state. 2093 * 2094 * If called from server side, the |settings_payload| must be the 2095 * value received in `HTTP2-Settings` header field and must be 2096 * decoded by base64url decoder. It is treated as if the SETTINGS 2097 * frame with that payload is received. Thus, callback functions for 2098 * the reception of SETTINGS frame will be invoked. The stream with 2099 * stream ID=1 is opened. The |stream_user_data| is ignored. The 2100 * opened stream becomes half-closed (remote). 2101 * 2102 * This function returns 0 if it succeeds, or one of the following 2103 * negative error codes: 2104 * 2105 * $(D ErrorCode.INVALID_ARGUMENT) 2106 * The |settings_payload| is badly formed. 2107 * $(D ErrorCode.PROTO) 2108 * The stream ID 1 is already used or closed; or is not available. 2109 */ 2110 ErrorCode upgrade(in ubyte[] settings_payload, void* stream_user_data = null) 2111 { 2112 Stream stream; 2113 Frame frame; 2114 Setting[] iva; 2115 ErrorCode rv; 2116 PrioritySpec pri_spec; 2117 2118 if ((!is_server && next_stream_id != 1) || (is_server && last_recv_stream_id >= 1)) { 2119 return ErrorCode.PROTO; 2120 } 2121 2122 if (settings_payload.length % FRAME_SETTINGS_ENTRY_LENGTH) { 2123 return ErrorCode.INVALID_ARGUMENT; 2124 } 2125 2126 Settings.unpack(iva, settings_payload); 2127 2128 if (is_server) { 2129 frame.hd = FrameHeader(cast(uint)settings_payload.length, FrameType.SETTINGS, FrameFlags.NONE, 0); 2130 frame.settings.iva = iva; 2131 rv = onSettings(frame, true /* No ACK */); 2132 } else { 2133 rv = submitSettings(this, iva); 2134 } 2135 2136 if (rv != 0) 2137 return rv; 2138 2139 if (iva) Mem.free(iva); 2140 2141 stream = openStream(1, StreamFlags.NONE, pri_spec, StreamState.OPENING, stream_user_data); 2142 2143 if (is_server) 2144 { 2145 stream.shutdown(ShutdownFlag.RD); 2146 last_recv_stream_id = 1; 2147 last_proc_stream_id = 1; 2148 } else { 2149 stream.shutdown(ShutdownFlag.WR); 2150 next_stream_id += 2; 2151 } 2152 2153 return ErrorCode.OK; 2154 } 2155 2156 /* 2157 * Returns true if |stream_id| is initiated by local endpoint. 2158 */ 2159 bool isMyStreamId(int stream_id) 2160 { 2161 int rem; 2162 if (stream_id == 0) { 2163 return false; 2164 } 2165 rem = stream_id & 0x1; 2166 if (is_server) { 2167 return rem == 0; 2168 } 2169 return rem == 1; 2170 } 2171 2172 /* 2173 * Adds |item| to the outbound queue in $(D Session). When this function 2174 * succeeds, it takes ownership of |item|. So caller must not free it 2175 * on success. 2176 * 2177 * This function returns 0 if it succeeds, or one of the following 2178 * negative error codes: 2179 * 2180 * ErrorCode.STREAM_CLOSED 2181 * Stream already closed (DATA frame only) 2182 * 2183 * ErrorCode.DATA_EXIST 2184 */ 2185 ErrorCode addItem(OutboundItem item) 2186 { 2187 /* TODO: Return error if stream is not found for the frame requiring stream presence. */ 2188 Stream stream = getStream(item.frame.hd.stream_id); 2189 Frame* frame = &item.frame; 2190 2191 if (frame.hd.type != FrameType.DATA) { 2192 switch (frame.hd.type) { 2193 case FrameType.RST_STREAM: 2194 if (stream) 2195 stream.state = StreamState.CLOSING; 2196 break; 2197 case FrameType.SETTINGS: 2198 item.weight = OB_SETTINGS_WEIGHT; 2199 break; 2200 case FrameType.PING: 2201 /* Ping has highest priority. */ 2202 item.weight = OB_PING_WEIGHT; 2203 break; 2204 default: 2205 break; 2206 } 2207 2208 if (frame.hd.type == FrameType.HEADERS) { 2209 /* We push request HEADERS and push response HEADERS to 2210 dedicated queue because their transmission is affected by 2211 Setting.MAX_CONCURRENT_STREAMS */ 2212 /* TODO: If 2 HEADERS are submitted for reserved stream, then 2213 both of them are queued into ob_ss_pq, which is not 2214 desirable. */ 2215 if (frame.headers.cat == HeadersCategory.REQUEST) { 2216 ob_ss_pq.push(item); 2217 item.queued = 1; 2218 } else if (stream && (stream.state == StreamState.RESERVED || item.aux_data.headers.attach_stream)) { 2219 item.weight = stream.effectiveWeight; 2220 item.cycle = last_cycle; 2221 stream.attachItem(item, this); 2222 } else { 2223 ob_pq.push(item); 2224 item.queued = 1; 2225 } 2226 } else { 2227 ob_pq.push(item); 2228 item.queued = 1; 2229 } 2230 2231 return ErrorCode.OK; 2232 } 2233 2234 if (!stream) { 2235 return ErrorCode.STREAM_CLOSED; 2236 } 2237 2238 if (stream.item) { 2239 return ErrorCode.DATA_EXIST; 2240 } 2241 2242 item.weight = stream.effectiveWeight; 2243 item.cycle = last_cycle; 2244 2245 stream.attachItem(item, this); 2246 2247 return ErrorCode.OK; 2248 } 2249 2250 /* 2251 * Adds RST_STREAM frame for the stream |stream_id| with the error 2252 * code |error_code|. This is a convenient function built on top of 2253 * $(D Session.addFrame) to add RST_STREAM easily. 2254 * 2255 * This function simply returns without adding RST_STREAM frame if 2256 * given stream is in StreamState.CLOSING state, because multiple 2257 * RST_STREAM for a stream is redundant. 2258 */ 2259 void addRstStream(int stream_id, FrameError error_code) 2260 { 2261 ErrorCode rv; 2262 OutboundItem item; 2263 Frame* frame; 2264 Stream stream; 2265 2266 stream = getStream(stream_id); 2267 if (stream && stream.state == StreamState.CLOSING) 2268 return; 2269 2270 /* Cancel pending request HEADERS in ob_ss_pq if this RST_STREAM refers to that stream. */ 2271 if (!is_server && isMyStreamId(stream_id) && ob_ss_pq.top) 2272 { 2273 OutboundItem top; 2274 Frame* headers_frame; 2275 2276 top = ob_ss_pq.top; 2277 headers_frame = &top.frame; 2278 2279 assert(headers_frame.hd.type == FrameType.HEADERS); 2280 2281 if (headers_frame.hd.stream_id <= stream_id && cast(uint)stream_id < next_stream_id) 2282 { 2283 foreach (OutboundItem item; ob_ss_pq) { 2284 2285 HeadersAuxData* aux_data = &item.aux_data.headers; 2286 2287 if (item.frame.hd.stream_id != stream_id || aux_data.canceled) 2288 { 2289 continue; 2290 } 2291 2292 aux_data.error_code = error_code; 2293 aux_data.canceled = 1; 2294 return; 2295 } 2296 } 2297 } 2298 2299 item = Mem.alloc!OutboundItem(this); 2300 frame = &item.frame; 2301 2302 frame.rst_stream = RstStream(stream_id, error_code); 2303 addItem(item); 2304 } 2305 2306 /* 2307 * Adds PING frame. This is a convenient functin built on top of 2308 * $(D Session.addFrame) to add PING easily. 2309 * 2310 * If the |opaque_data| is not null, it must point to 8 bytes memory 2311 * region of data. The data pointed by |opaque_data| is copied. It can 2312 * be null. In this case, 8 bytes null is used. 2313 * 2314 */ 2315 void addPing(FrameFlags flags, in ubyte[] opaque_data) 2316 { 2317 ErrorCode rv; 2318 OutboundItem item; 2319 Frame* frame; 2320 2321 item = Mem.alloc!OutboundItem(this); 2322 2323 frame = &item.frame; 2324 2325 frame.ping = Ping(flags, opaque_data); 2326 2327 addItem(item); 2328 } 2329 2330 /* 2331 * Adds GOAWAY frame with the last-stream-ID |last_stream_id| and the 2332 * error code |error_code|. This is a convenient function built on top 2333 * of $(D Session.addFrame) to add GOAWAY easily. The 2334 * |aux_flags| are bitwise-OR of one or more of 2335 * GoAwayAuxFlags. 2336 * 2337 * This function returns 0 if it succeeds, or one of the following 2338 * negative error codes: 2339 * 2340 * ErrorCode.INVALID_ARGUMENT 2341 * The |opaque_data_len| is too large. 2342 */ 2343 ErrorCode addGoAway(int last_stream_id, FrameError error_code, in string opaque_data, GoAwayAuxFlags aux_flags) { 2344 ErrorCode rv; 2345 OutboundItem item; 2346 Frame* frame; 2347 string opaque_data_copy; 2348 GoAwayAuxData* aux_data; 2349 2350 if (isMyStreamId(last_stream_id)) { 2351 return ErrorCode.INVALID_ARGUMENT; 2352 } 2353 2354 if (opaque_data.length > 0) { 2355 if (opaque_data.length + 8 > MAX_PAYLOADLEN) { 2356 return ErrorCode.INVALID_ARGUMENT; 2357 } 2358 opaque_data_copy = cast(string)Mem.copy(opaque_data); 2359 } 2360 2361 item = Mem.alloc!OutboundItem(this); 2362 2363 frame = &item.frame; 2364 2365 /* last_stream_id must not be increased from the value previously sent */ 2366 last_stream_id = min(last_stream_id, local_last_stream_id); 2367 2368 frame.goaway = GoAway(last_stream_id, error_code, opaque_data_copy); 2369 2370 aux_data = &item.aux_data.goaway; 2371 aux_data.flags = aux_flags; 2372 2373 addItem(item); 2374 return ErrorCode.OK; 2375 } 2376 /* 2377 * Adds WINDOW_UPDATE frame with stream ID |stream_id| and 2378 * window-size-increment |window_size_increment|. This is a convenient 2379 * function built on top of $(D Session.addFrame) to add 2380 * WINDOW_UPDATE easily. 2381 */ 2382 void addWindowUpdate(FrameFlags flags, int stream_id, int window_size_increment) { 2383 ErrorCode rv; 2384 OutboundItem item; 2385 Frame* frame; 2386 2387 item = Mem.alloc!OutboundItem(this); 2388 frame = &item.frame; 2389 2390 frame.window_update = WindowUpdate(flags, stream_id, window_size_increment); 2391 2392 addItem(item); 2393 } 2394 2395 /* 2396 * Adds SETTINGS frame. 2397 */ 2398 ErrorCode addSettings(FrameFlags flags, in Setting[] iva) 2399 { 2400 OutboundItem item; 2401 Frame* frame; 2402 Setting[] iva_copy; 2403 size_t i; 2404 2405 if (flags & FrameFlags.ACK) { 2406 if (iva.length != 0) { 2407 return ErrorCode.INVALID_ARGUMENT; 2408 } 2409 } 2410 else if (inflight_iva.length != 0) 2411 return ErrorCode.TOO_MANY_INFLIGHT_SETTINGS; 2412 2413 if (!iva.check()) 2414 return ErrorCode.INVALID_ARGUMENT; 2415 2416 item = Mem.alloc!OutboundItem(this); 2417 scope(failure) Mem.free(item); 2418 2419 if (iva.length > 0) 2420 iva_copy = iva.copy(); 2421 else 2422 iva_copy = null; 2423 2424 scope(failure) if(iva_copy) Mem.free(iva_copy); 2425 2426 if ((flags & FrameFlags.ACK) == 0) { 2427 if (iva.length > 0) 2428 inflight_iva = iva.copy(); 2429 else 2430 inflight_iva = null; 2431 2432 } 2433 2434 frame = &item.frame; 2435 2436 frame.settings = Settings(flags, iva_copy); 2437 2438 addItem(item); 2439 2440 /* Extract Setting.MAX_CONCURRENT_STREAMS and ENABLE_PUSH here. We use it to refuse the 2441 * incoming stream and PUSH_PROMISE with RST_STREAM. */ 2442 foreach_reverse(ref iv; iva) 2443 { 2444 if (iv.id == Setting.MAX_CONCURRENT_STREAMS) { 2445 pending_local_max_concurrent_stream = iv.value; 2446 break; 2447 } 2448 } 2449 foreach_reverse(ref iv; iva) 2450 { 2451 if (iv.id == Setting.ENABLE_PUSH) { 2452 pending_enable_push = iv.value>0; 2453 break; 2454 } 2455 } 2456 2457 return ErrorCode.OK; 2458 } 2459 2460 /** 2461 * Creates new stream in $(D Session) with stream ID |stream_id|, 2462 * priority |pri_spec| and flags |flags|. The |flags| is bitwise OR 2463 * of StreamFlags. Since this function is called when initial 2464 * HEADERS is sent or received, these flags are taken from it. The 2465 * state of stream is set to |initial_state|. The |stream_user_data| 2466 * is a pointer to the arbitrary user supplied data to be associated 2467 * to this stream. 2468 * 2469 * If |initial_state| is StreamState.RESERVED, this function sets the 2470 * StreamFlags.PUSH flag. 2471 * 2472 * This function returns a pointer to created new stream object. 2473 */ 2474 Stream openStream(int stream_id, StreamFlags flags, PrioritySpec pri_spec_in, StreamState initial_state, void *stream_user_data = null) 2475 { 2476 ErrorCode rv; 2477 Stream stream; 2478 Stream dep_stream; 2479 Stream root_stream; 2480 bool stream_alloc; 2481 PrioritySpec pri_spec_default; 2482 PrioritySpec pri_spec = pri_spec_in; 2483 stream = getStreamRaw(stream_id); 2484 2485 if (stream) { 2486 assert(stream.state == StreamState.IDLE); 2487 assert(stream.inDepTree()); 2488 detachIdleStream(stream); 2489 stream.remove(); 2490 } else { 2491 if (is_server && initial_state != StreamState.IDLE && !isMyStreamId(stream_id)) 2492 adjustClosedStream(1); 2493 stream_alloc = true; 2494 } 2495 2496 if (pri_spec.stream_id != 0) { 2497 dep_stream = getStreamRaw(pri_spec.stream_id); 2498 if (is_server && !dep_stream && idleStreamDetect(pri_spec.stream_id)) 2499 { 2500 /* Depends on idle stream, which does not exist in memory. Assign default priority for it. */ 2501 dep_stream = openStream(pri_spec.stream_id, StreamFlags.NONE, pri_spec_default, StreamState.IDLE, null); 2502 } else if (!dep_stream || !dep_stream.inDepTree()) { 2503 /* If dep_stream is not part of dependency tree, stream will get default priority. */ 2504 pri_spec = pri_spec_default; 2505 } 2506 } 2507 2508 if (initial_state == StreamState.RESERVED) 2509 flags |= StreamFlags.PUSH; 2510 2511 if (stream_alloc) 2512 stream = Mem.alloc!Stream(stream_id, flags, initial_state, pri_spec.weight, roots, 2513 remote_settings.initial_window_size, local_settings.initial_window_size, stream_user_data); 2514 else 2515 stream.initialize(stream_id, flags, initial_state, pri_spec.weight, roots, 2516 remote_settings.initial_window_size, local_settings.initial_window_size, stream_user_data); 2517 scope(failure) if (stream_alloc) Mem.free(stream); 2518 2519 if (stream_alloc) 2520 streams[stream_id] = stream; 2521 scope(failure) if (stream_alloc) streams.remove(stream_id); 2522 2523 switch (initial_state) { 2524 case StreamState.RESERVED: 2525 if (isMyStreamId(stream_id)) { 2526 /* half closed (remote) */ 2527 stream.shutdown(ShutdownFlag.RD); 2528 } else { 2529 /* half closed (local) */ 2530 stream.shutdown(ShutdownFlag.WR); 2531 } 2532 /* Reserved stream does not count in the concurrent streams limit. That is one of the DOS vector. */ 2533 break; 2534 case StreamState.IDLE: 2535 /* Idle stream does not count toward the concurrent streams limit. This is used as anchor node in dependency tree. */ 2536 assert(is_server); 2537 keepIdleStream(stream); 2538 break; 2539 default: 2540 if (isMyStreamId(stream_id)) { 2541 ++num_outgoing_streams; 2542 } else { 2543 ++num_incoming_streams; 2544 } 2545 } 2546 2547 /* We don't have to track dependency of received reserved stream */ 2548 if (stream.shutFlags & ShutdownFlag.WR) 2549 return stream; 2550 2551 if (pri_spec.stream_id == 0) 2552 { 2553 2554 ++roots.num_streams; 2555 2556 if (pri_spec.exclusive && roots.num_streams <= MAX_DEP_TREE_LENGTH) 2557 stream.makeTopmostRoot(this); 2558 else 2559 roots.add(stream); 2560 2561 return stream; 2562 } 2563 2564 2565 /* TODO Client does not have to track dependencies of streams except 2566 for those which have upload data. Currently, we just track 2567 everything. */ 2568 assert(dep_stream); 2569 2570 root_stream = dep_stream.getRoot(); 2571 2572 if (root_stream.subStreams < MAX_DEP_TREE_LENGTH) { 2573 if (pri_spec.exclusive) { 2574 dep_stream.insert(stream); 2575 } else { 2576 dep_stream.add(stream); 2577 } 2578 } else { 2579 stream.weight = DEFAULT_WEIGHT; 2580 roots.add(stream); 2581 } 2582 2583 return stream; 2584 } 2585 2586 /* 2587 * Closes stream whose stream ID is |stream_id|. The reason of closure 2588 * is indicated by the |error_code|. When closing the stream, 2589 * $(D Connector.onStreamExit) will be called. 2590 * 2591 * If the session is initialized as server and |stream| is incoming 2592 * stream, stream is just marked closed and this function calls 2593 * keepClosedStream() with |stream|. Otherwise, 2594 * |stream| will be deleted from memory. 2595 * 2596 * This function returns 0 if it succeeds, or one the following 2597 * negative error codes: 2598 * 2599 * ErrorCode.INVALID_ARGUMENT 2600 * The specified stream does not exist. 2601 * ErrorCode.CALLBACK_FAILURE 2602 * The callback function failed. 2603 */ 2604 ErrorCode closeStream(int stream_id, FrameError error_code) 2605 { 2606 Stream stream = getStream(stream_id); 2607 2608 if (!stream) { 2609 return ErrorCode.INVALID_ARGUMENT; 2610 } 2611 2612 LOGF("stream: stream(%s=%d close", stream, stream.id); 2613 2614 if (stream.item) { 2615 OutboundItem item = stream.item; 2616 2617 stream.detachItem(this); 2618 2619 /* If item is queued, it will be deleted when it is popped 2620 (prepareFrame() will fail). If aob.item 2621 points to this item, let active_outbound_item_reset() 2622 free the item. */ 2623 if (!item.queued && item != aob.item) { 2624 item.free(); 2625 Mem.free(item); 2626 } 2627 } 2628 2629 /* We call $(D Connector.onStreamExit) even if stream.state is 2630 StreamState.INITIAL. This will happen while sending request 2631 HEADERS, a local endpoint receives RST_STREAM for that stream. It 2632 may be PROTOCOL_ERROR, but without notifying stream closure will 2633 hang the stream in a local endpoint. 2634 */ 2635 try 2636 if (!connector.onStreamExit(stream_id, error_code)) 2637 return ErrorCode.CALLBACK_FAILURE; 2638 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 2639 2640 /* pushed streams which is not opened yet is not counted toward max concurrent limits */ 2641 if ((stream.flags & StreamFlags.PUSH) == 0) { 2642 if (isMyStreamId(stream_id)) { 2643 --num_outgoing_streams; 2644 } else { 2645 --num_incoming_streams; 2646 } 2647 } 2648 2649 /* Closes both directions just in case they are not closed yet */ 2650 stream.flags = cast(StreamFlags)(stream.flags | StreamFlags.CLOSED); 2651 2652 if (is_server && stream.inDepTree()) 2653 { 2654 /* On server side, retain stream at most MAX_CONCURRENT_STREAMS 2655 combined with the current active incoming streams to make 2656 dependency tree work better. */ 2657 keepClosedStream(stream); 2658 } else { 2659 destroyStream(stream); 2660 } 2661 return ErrorCode.OK; 2662 } 2663 2664 /* 2665 * Deletes |stream| from memory. After this function returns, stream 2666 * cannot be accessed. 2667 * 2668 */ 2669 void destroyStream(Stream stream) 2670 { 2671 LOGF("stream: destroy closed stream(%s=%d", stream, stream.id); 2672 2673 stream.remove(); 2674 2675 streams.remove(stream.id); 2676 stream.free(); 2677 Mem.free(stream); 2678 } 2679 2680 /* 2681 * Tries to keep incoming closed stream |stream|. Due to the 2682 * limitation of maximum number of streams in memory, |stream| is not 2683 * closed and just deleted from memory (see destroyStream). 2684 */ 2685 void keepClosedStream(Stream stream) 2686 { 2687 LOGF("stream: keep closed stream(%s=%d, state=%d", stream, stream.id, stream.state); 2688 2689 if (closed_stream_tail) { 2690 closed_stream_tail.closedNext = stream; 2691 stream.closedPrev = closed_stream_tail; 2692 } else { 2693 closed_stream_head = stream; 2694 } 2695 closed_stream_tail = stream; 2696 2697 ++num_closed_streams; 2698 2699 adjustClosedStream(0); 2700 } 2701 2702 /* 2703 * Appends |stream| to linked list |session.idle_stream_head|. We 2704 * apply fixed limit for list size. To fit into that limit, one or 2705 * more oldest streams are removed from list as necessary. 2706 */ 2707 void keepIdleStream(Stream stream) 2708 { 2709 LOGF("stream: keep idle stream(%s=%d, state=%d", stream, stream.id, stream.state); 2710 2711 if (idle_stream_tail) { 2712 idle_stream_tail.closedNext = stream; 2713 stream.closedPrev = idle_stream_tail; 2714 } else { 2715 idle_stream_head = stream; 2716 } 2717 idle_stream_tail = stream; 2718 2719 ++num_idle_streams; 2720 2721 adjustIdleStream(); 2722 } 2723 2724 /* 2725 * Detaches |stream| from idle streams linked list. 2726 */ 2727 2728 void detachIdleStream(Stream stream) 2729 { 2730 Stream prev_stream; 2731 Stream next_stream; 2732 2733 LOGF("stream: detach idle stream(%s=%d, state=%d", stream, stream.id, stream.state); 2734 2735 prev_stream = stream.closedPrev; 2736 next_stream = stream.closedNext; 2737 2738 if (prev_stream) { 2739 prev_stream.closedNext = next_stream; 2740 } else { 2741 idle_stream_head = next_stream; 2742 } 2743 2744 if (next_stream) { 2745 next_stream.closedPrev = prev_stream; 2746 } else { 2747 idle_stream_tail = prev_stream; 2748 } 2749 2750 stream.closedPrev = null; 2751 stream.closedNext = null; 2752 2753 --num_idle_streams; 2754 } 2755 2756 /* 2757 * Deletes closed stream to ensure that number of incoming streams 2758 * including active and closed is in the maximum number of allowed 2759 * stream. If |offset| is nonzero, it is decreased from the maximum 2760 * number of allowed stream when comparing number of active and closed 2761 * stream and the maximum number. 2762 */ 2763 void adjustClosedStream(int offset) 2764 { 2765 size_t num_stream_max; 2766 2767 num_stream_max = min(local_settings.max_concurrent_streams, pending_local_max_concurrent_stream); 2768 2769 LOGF("stream: adjusting kept closed streams num_closed_streams=%d, num_incoming_streams=%d, max_concurrent_streams=%d", 2770 num_closed_streams, num_incoming_streams, num_stream_max); 2771 2772 while (num_closed_streams > 0 && num_closed_streams + num_incoming_streams + offset > num_stream_max) 2773 { 2774 Stream head_stream; 2775 2776 head_stream = closed_stream_head; 2777 2778 assert(head_stream); 2779 2780 closed_stream_head = head_stream.closedNext; 2781 2782 if (closed_stream_head) 2783 closed_stream_head.closedPrev = null; 2784 else 2785 closed_stream_tail = null; 2786 2787 destroyStream(head_stream); 2788 /* head_stream is now freed */ 2789 --num_closed_streams; 2790 } 2791 } 2792 2793 /* 2794 * Deletes idle stream to ensure that number of idle streams is in 2795 * certain limit. 2796 */ 2797 void adjustIdleStream() 2798 { 2799 size_t _max; 2800 2801 /* Make minimum number of idle streams 2 so that allocating 2 2802 streams at once is easy. This happens when PRIORITY frame to 2803 idle stream, which depends on idle stream which does not 2804 exist. */ 2805 _max = max(2, min(local_settings.max_concurrent_streams, pending_local_max_concurrent_stream)); 2806 2807 LOGF("stream: adjusting kept idle streams num_idle_streams=%d, max=%d", num_idle_streams, _max); 2808 2809 while (num_idle_streams > _max) { 2810 Stream head; 2811 2812 head = idle_stream_head; 2813 assert(head); 2814 2815 idle_stream_head = head.closedNext; 2816 2817 if (idle_stream_head) { 2818 idle_stream_head.closedPrev = null; 2819 } else { 2820 idle_stream_tail = null; 2821 } 2822 2823 destroyStream(head); 2824 /* head is now destroyed */ 2825 --num_idle_streams; 2826 } 2827 } 2828 2829 /* 2830 * Closes stream with stream ID |stream_id| if both transmission and 2831 * reception of the stream were disallowed. The |error_code| indicates 2832 * the reason of the closure. 2833 * 2834 * This function returns 0 if it succeeds, or one of the following 2835 * negative error codes: 2836 * 2837 * ErrorCode.INVALID_ARGUMENT 2838 * The stream is not found. 2839 * ErrorCode.CALLBACK_FAILURE 2840 * The callback function failed. 2841 */ 2842 ErrorCode closeStreamIfShutRdWr(Stream stream) 2843 { 2844 if ((stream.shutFlags & ShutdownFlag.RDWR) == ShutdownFlag.RDWR) 2845 return closeStream(stream.id, FrameError.NO_ERROR); 2846 return ErrorCode.OK; 2847 } 2848 2849 void endRequestHeadersReceived(Frame frame, Stream stream) 2850 { 2851 if (frame.hd.flags & FrameFlags.END_STREAM) { 2852 stream.shutdown(ShutdownFlag.RD); 2853 } 2854 /* Here we assume that stream is not shutdown in ShutdownFlag.WR */ 2855 } 2856 2857 ErrorCode endResponseHeadersReceived(Frame frame, Stream stream) 2858 { 2859 if (frame.hd.flags & FrameFlags.END_STREAM) { 2860 /* This is the last frame of this stream, so disallow further receptions. */ 2861 stream.shutdown(ShutdownFlag.RD); 2862 return closeStreamIfShutRdWr(stream); 2863 } 2864 2865 return ErrorCode.OK; 2866 } 2867 2868 ErrorCode endHeadersReceived(Frame frame, Stream stream) 2869 { 2870 if (frame.hd.flags & FrameFlags.END_STREAM) { 2871 stream.shutdown(ShutdownFlag.RD); 2872 return closeStreamIfShutRdWr(stream); 2873 } 2874 return ErrorCode.OK; 2875 } 2876 2877 2878 ErrorCode onRequestHeaders(Frame frame) 2879 { 2880 ErrorCode rv; 2881 Stream stream; 2882 if (frame.hd.stream_id == 0) { 2883 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: stream_id == 0"); 2884 } 2885 2886 /* If client recieves idle stream from server, it is invalid 2887 regardless stream ID is even or odd. This is because client is 2888 not expected to receive request from server. */ 2889 if (!is_server) { 2890 if (idleStreamDetect(frame.hd.stream_id)) { 2891 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: client received request"); 2892 } 2893 2894 return ErrorCode.IGN_HEADER_BLOCK; 2895 } 2896 2897 if (!isNewPeerStreamId(frame.hd.stream_id)) 2898 { 2899 /* The spec says if an endpoint receives a HEADERS with invalid 2900 stream ID, it MUST issue connection error with error code 2901 PROTOCOL_ERROR. But we could get trailer HEADERS after we have 2902 sent RST_STREAM to this stream and peer have not received it. 2903 Then connection error is too harsh. It means that we only use 2904 connection error if stream ID refers idle stream. OTherwise we 2905 just ignore HEADERS for now. */ 2906 if (idleStreamDetect(frame.hd.stream_id)) { 2907 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: invalid stream_id"); 2908 } 2909 2910 return ErrorCode.IGN_HEADER_BLOCK; 2911 } 2912 2913 last_recv_stream_id = frame.hd.stream_id; 2914 2915 if (goaway_flags & GoAwayFlags.SENT) { 2916 /* We just ignore stream after GOAWAY was queued */ 2917 return ErrorCode.IGN_HEADER_BLOCK; 2918 } 2919 2920 if (isIncomingConcurrentStreamsMax()) { 2921 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: max concurrent streams exceeded"); 2922 } 2923 2924 if (frame.headers.pri_spec.stream_id == frame.hd.stream_id) { 2925 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: depend on itself"); 2926 } 2927 2928 if (isIncomingConcurrentStreamsPendingMax()) { 2929 return handleInflateInvalidStream(frame, FrameError.REFUSED_STREAM); 2930 } 2931 2932 stream = openStream(frame.hd.stream_id, StreamFlags.NONE, frame.headers.pri_spec, StreamState.OPENING, null); 2933 last_proc_stream_id = last_recv_stream_id; 2934 2935 if (!callOnHeaders(frame)) 2936 return ErrorCode.CALLBACK_FAILURE; 2937 2938 return ErrorCode.OK; 2939 } 2940 2941 ErrorCode onResponseHeaders(Frame frame, Stream stream) 2942 { 2943 ErrorCode rv; 2944 /* This function is only called if stream.state == StreamState.OPENING and stream_id is local side initiated. */ 2945 assert(stream.state == StreamState.OPENING && isMyStreamId(frame.hd.stream_id)); 2946 if (frame.hd.stream_id == 0) { 2947 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "response HEADERS: stream_id == 0"); 2948 } 2949 if (stream.shutFlags & ShutdownFlag.RD) { 2950 /* half closed (remote): from the spec: 2951 2952 If an endpoint receives additional frames for a stream that is 2953 in this state it MUST respond with a stream error (Section 2954 5.4.2) of type STREAM_CLOSED. 2955 */ 2956 return handleInflateInvalidStream(frame, FrameError.STREAM_CLOSED); 2957 } 2958 stream.state = StreamState.OPENED; 2959 if (!callOnHeaders(frame)) 2960 return ErrorCode.CALLBACK_FAILURE; 2961 return ErrorCode.OK; 2962 } 2963 2964 ErrorCode onPushResponseHeaders(Frame frame, Stream stream) 2965 { 2966 ErrorCode rv; 2967 assert(stream.state == StreamState.RESERVED); 2968 if (frame.hd.stream_id == 0) { 2969 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "push response HEADERS: stream_id == 0"); 2970 } 2971 if (goaway_flags) { 2972 /* We don't accept new stream after GOAWAY is sent or received. */ 2973 return ErrorCode.IGN_HEADER_BLOCK; 2974 } 2975 2976 if (isIncomingConcurrentStreamsMax()) { 2977 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "push response HEADERS: max concurrent streams exceeded"); 2978 } 2979 if (isIncomingConcurrentStreamsPendingMax()) { 2980 return handleInflateInvalidStream(frame, FrameError.REFUSED_STREAM); 2981 } 2982 2983 stream.promiseFulfilled(); 2984 ++num_incoming_streams; 2985 if (!callOnHeaders(frame)) 2986 return ErrorCode.CALLBACK_FAILURE; 2987 return ErrorCode.OK; 2988 } 2989 2990 /* 2991 * Called when HEADERS is received, assuming |frame| is properly 2992 * initialized. This function will first validate received frame and 2993 * then open stream sending it through callback functions. 2994 * 2995 * This function returns 0 if it succeeds, or one of the following 2996 * negative error codes: 2997 * 2998 * ErrorCode.IGN_HEADER_BLOCK 2999 * Frame was rejected and header block must be decoded but 3000 * result must be ignored. 3001 * ErrorCode.CALLBACK_FAILURE 3002 * The DataProvider failed 3003 */ 3004 ErrorCode onHeaders(Frame frame, Stream stream) 3005 { 3006 ErrorCode rv; 3007 if (frame.hd.stream_id == 0) { 3008 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "HEADERS: stream_id == 0"); 3009 } 3010 if (stream.state == StreamState.RESERVED) 3011 { 3012 /* reserved. The valid push response HEADERS is processed by 3013 onPushResponseHeaders(). This 3014 generic HEADERS is called invalid cases for HEADERS against 3015 reserved state. */ 3016 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "HEADERS: stream in reserved"); 3017 } 3018 if ((stream.shutFlags & ShutdownFlag.RD)) { 3019 /* half closed (remote): from the spec: 3020 3021 If an endpoint receives additional frames for a stream that is 3022 in this state it MUST respond with a stream error (Section 3023 5.4.2) of type STREAM_CLOSED. 3024 */ 3025 return handleInflateInvalidStream(frame, FrameError.STREAM_CLOSED); 3026 } 3027 if (isMyStreamId(frame.hd.stream_id)) { 3028 if (stream.state == StreamState.OPENED) { 3029 if (!callOnHeaders(frame)) 3030 return ErrorCode.CALLBACK_FAILURE; 3031 return ErrorCode.OK; 3032 } else if (stream.state == StreamState.CLOSING) { 3033 /* This is race condition. StreamState.CLOSING indicates 3034 that we queued RST_STREAM but it has not been sent. It will 3035 eventually sent, so we just ignore this frame. */ 3036 return ErrorCode.IGN_HEADER_BLOCK; 3037 } else { 3038 return handleInflateInvalidStream(frame, FrameError.PROTOCOL_ERROR); 3039 } 3040 } 3041 /* If this is remote peer initiated stream, it is OK unless it 3042 has sent END_STREAM frame already. But if stream is in 3043 StreamState.CLOSING, we discard the frame. This is a race 3044 condition. */ 3045 if (stream.state != StreamState.CLOSING) 3046 { 3047 if (!callOnHeaders(frame)) 3048 return ErrorCode.CALLBACK_FAILURE; 3049 return ErrorCode.OK; 3050 } 3051 return ErrorCode.IGN_HEADER_BLOCK; 3052 } 3053 3054 /* 3055 * Called when PRIORITY is received, assuming |frame| is properly 3056 * initialized. 3057 * 3058 * This function returns 0 if it succeeds, or one of the following 3059 * negative error codes: 3060 * 3061 * ErrorCode.CALLBACK_FAILURE 3062 * The DataProvider failed 3063 */ 3064 ErrorCode onPriority(Frame frame) 3065 { 3066 ErrorCode rv; 3067 Stream stream; 3068 3069 if (frame.hd.stream_id == 0) { 3070 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PRIORITY: stream_id == 0"); 3071 } 3072 3073 if (!is_server) { 3074 /* Re-prioritization works only in server */ 3075 bool ok = callOnFrame(frame); 3076 if (!ok) 3077 return ErrorCode.CALLBACK_FAILURE; 3078 return ErrorCode.OK; 3079 } 3080 3081 stream = getStreamRaw(frame.hd.stream_id); 3082 3083 if (!stream) { 3084 /* PRIORITY against idle stream can create anchor node in dependency tree. */ 3085 if (!idleStreamDetect(frame.hd.stream_id)) { 3086 return ErrorCode.OK; 3087 } 3088 3089 stream = openStream(frame.hd.stream_id, StreamFlags.NONE, frame.priority.pri_spec, StreamState.IDLE, null); 3090 } else 3091 reprioritizeStream(stream, frame.priority.pri_spec); 3092 3093 bool ok = callOnFrame(frame); 3094 if (!ok) 3095 return ErrorCode.CALLBACK_FAILURE; 3096 return ErrorCode.OK; 3097 } 3098 3099 /* 3100 * Called when RST_STREAM is received, assuming |frame| is properly 3101 * initialized. 3102 * 3103 * This function returns 0 if it succeeds, or one the following 3104 * negative error codes: 3105 * 3106 * ErrorCode.CALLBACK_FAILURE 3107 * The DataProvider failed 3108 */ 3109 ErrorCode onRstStream(Frame frame) 3110 { 3111 ErrorCode rv; 3112 Stream stream; 3113 if (frame.hd.stream_id == 0) { 3114 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "RST_STREAM: stream_id == 0"); 3115 } 3116 stream = getStream(frame.hd.stream_id); 3117 if (!stream) { 3118 if (idleStreamDetect(frame.hd.stream_id)) { 3119 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "RST_STREAM: stream in idle"); 3120 } 3121 } 3122 3123 bool ok = callOnFrame(frame); 3124 if (!ok) 3125 return ErrorCode.CALLBACK_FAILURE; 3126 3127 rv = closeStream(frame.hd.stream_id, frame.rst_stream.error_code); 3128 if (isFatal(rv)) { 3129 return rv; 3130 } 3131 return ErrorCode.OK; 3132 } 3133 3134 /* 3135 * Called when SETTINGS is received, assuming |frame| is properly 3136 * initialized. If |noack| is non-zero, SETTINGS with ACK will not be 3137 * submitted. If |frame| has NGFrameFlags.ACK flag set, no SETTINGS 3138 * with ACK will not be submitted regardless of |noack|. 3139 * 3140 * This function returns 0 if it succeeds, or one the following 3141 * negative error codes: 3142 * 3143 * ErrorCode.CALLBACK_FAILURE 3144 * The DataProvider failed 3145 */ 3146 3147 ErrorCode onSettings(Frame frame, bool noack) 3148 { 3149 ErrorCode rv; 3150 size_t i; 3151 3152 if (frame.hd.stream_id != 0) { 3153 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: stream_id != 0"); 3154 } 3155 if (frame.hd.flags & FrameFlags.ACK) { 3156 if (frame.settings.iva.length != 0) { 3157 return handleInvalidConnection(frame, FrameError.FRAME_SIZE_ERROR, "SETTINGS: ACK and payload != 0"); 3158 } 3159 if (!inflight_iva) { 3160 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: unexpected ACK"); 3161 } 3162 rv = updateLocalSettings(inflight_iva); 3163 Mem.free(inflight_iva); 3164 inflight_iva = null; 3165 if (rv != ErrorCode.OK) { 3166 FrameError error_code = FrameError.INTERNAL_ERROR; 3167 if (isFatal(rv)) { 3168 return rv; 3169 } 3170 if (rv == ErrorCode.HEADER_COMP) { 3171 error_code = FrameError.COMPRESSION_ERROR; 3172 } 3173 return handleInvalidConnection(frame, error_code, null); 3174 } 3175 bool ok = callOnFrame(frame); 3176 if (!ok) 3177 return ErrorCode.CALLBACK_FAILURE; 3178 return ErrorCode.OK; 3179 } 3180 3181 for (i = 0; i < frame.settings.iva.length; ++i) { 3182 Setting entry = frame.settings.iva[i]; 3183 3184 with(Setting) switch (entry.id) { 3185 case HEADER_TABLE_SIZE: 3186 3187 if (entry.value > MAX_HEADER_TABLE_SIZE) { 3188 return handleInvalidConnection(frame, FrameError.COMPRESSION_ERROR, "SETTINGS: too large Setting.HEADER_TABLE_SIZE"); 3189 } 3190 3191 hd_deflater.changeTableSize(entry.value); 3192 3193 remote_settings.header_table_size = entry.value; 3194 3195 break; 3196 case ENABLE_PUSH: 3197 3198 if (entry.value != 0 && entry.value != 1) { 3199 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: invalid Setting.ENABLE_PUSH"); 3200 } 3201 3202 if (!is_server && entry.value != 0) { 3203 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: server attempted to enable push"); 3204 } 3205 3206 remote_settings.enable_push = entry.value; 3207 3208 break; 3209 case MAX_CONCURRENT_STREAMS: 3210 3211 remote_settings.max_concurrent_streams = entry.value; 3212 3213 break; 3214 case INITIAL_WINDOW_SIZE: 3215 /* Update the initial window size of the all active streams */ 3216 /* Check that initial_window_size < (1u << 31) */ 3217 if (entry.value > MAX_WINDOW_SIZE) { 3218 return handleInvalidConnection(frame, FrameError.FLOW_CONTROL_ERROR, "SETTINGS: too large Setting.INITIAL_WINDOW_SIZE"); 3219 } 3220 3221 rv = updateRemoteInitialWindowSize(entry.value); 3222 3223 if (isFatal(rv)) { 3224 return rv; 3225 } 3226 3227 if (rv != ErrorCode.OK) { 3228 return handleInvalidConnection(frame, FrameError.FLOW_CONTROL_ERROR, null); 3229 } 3230 3231 remote_settings.initial_window_size = entry.value; 3232 3233 break; 3234 case MAX_FRAME_SIZE: 3235 3236 if (entry.value < MAX_FRAME_SIZE_MIN || 3237 entry.value > MAX_FRAME_SIZE_MAX) { 3238 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: invalid Setting.MAX_FRAME_SIZE"); 3239 } 3240 3241 remote_settings.max_frame_size = entry.value; 3242 3243 break; 3244 case MAX_HEADER_LIST_SIZE: 3245 3246 remote_settings.max_header_list_size = entry.value; 3247 3248 break; 3249 default: break; 3250 } 3251 } 3252 3253 if (!noack && !isClosing()) { 3254 rv = addSettings(FrameFlags.ACK, null); 3255 3256 if (rv != ErrorCode.OK) { 3257 if (isFatal(rv)) { 3258 return rv; 3259 } 3260 3261 return handleInvalidConnection(frame, FrameError.INTERNAL_ERROR, null); 3262 } 3263 } 3264 bool ok = callOnFrame(frame); 3265 if (!ok) 3266 return ErrorCode.CALLBACK_FAILURE; 3267 return ErrorCode.OK; 3268 } 3269 /* 3270 * Called when PUSH_PROMISE is received, assuming |frame| is properly 3271 * initialized. 3272 * 3273 * This function returns 0 if it succeeds, or one of the following 3274 * negative error codes: 3275 * 3276 * ErrorCode.IGN_HEADER_BLOCK 3277 * Frame was rejected and header block must be decoded but 3278 * result must be ignored. 3279 * ErrorCode.CALLBACK_FAILURE 3280 * The DataProvider failed 3281 */ 3282 3283 ErrorCode onPushPromise(Frame frame) 3284 { 3285 ErrorCode rv; 3286 Stream stream; 3287 Stream promised_stream; 3288 PrioritySpec pri_spec; 3289 3290 if (frame.hd.stream_id == 0) { 3291 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: stream_id == 0"); 3292 } 3293 if (is_server || local_settings.enable_push == 0) { 3294 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: push disabled"); 3295 } 3296 if (goaway_flags) { 3297 /* We just dicard PUSH_PROMISE after GOAWAY is sent or received. */ 3298 return ErrorCode.IGN_HEADER_BLOCK; 3299 } 3300 3301 if (!isMyStreamId(frame.hd.stream_id)) { 3302 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: invalid stream_id"); 3303 } 3304 3305 if (!isNewPeerStreamId(frame.push_promise.promised_stream_id)) { 3306 /* The spec says if an endpoint receives a PUSH_PROMISE with 3307 illegal stream ID is subject to a connection error of type 3308 PROTOCOL_ERROR. */ 3309 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: invalid promised_stream_id"); 3310 } 3311 last_recv_stream_id = frame.push_promise.promised_stream_id; 3312 stream = getStream(frame.hd.stream_id); 3313 if (!stream || stream.state == StreamState.CLOSING || !pending_enable_push) { 3314 if (!stream) { 3315 if (idleStreamDetect(frame.hd.stream_id)) { 3316 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: stream in idle"); 3317 } 3318 } 3319 addRstStream(frame.push_promise.promised_stream_id, FrameError.REFUSED_STREAM); 3320 return ErrorCode.IGN_HEADER_BLOCK; 3321 } 3322 if (stream.shutFlags & ShutdownFlag.RD) { 3323 try 3324 if (!connector.onInvalidFrame(frame, FrameError.PROTOCOL_ERROR)) 3325 return ErrorCode.CALLBACK_FAILURE; 3326 catch(Exception e) return ErrorCode.CALLBACK_FAILURE; 3327 3328 addRstStream(frame.push_promise.promised_stream_id, FrameError.PROTOCOL_ERROR); 3329 return ErrorCode.IGN_HEADER_BLOCK; 3330 } 3331 3332 /* TODO: It is unclear reserved stream depends on associated stream with or without exclusive flag set */ 3333 pri_spec = PrioritySpec(stream.id, DEFAULT_WEIGHT, 0); 3334 3335 promised_stream = openStream(frame.push_promise.promised_stream_id, StreamFlags.NONE, pri_spec, StreamState.RESERVED, null); 3336 3337 last_proc_stream_id = last_recv_stream_id; 3338 if (!callOnHeaders(frame)) 3339 return ErrorCode.CALLBACK_FAILURE; 3340 return ErrorCode.OK; 3341 } 3342 3343 /* 3344 * Called when PING is received, assuming |frame| is properly 3345 * initialized. 3346 * 3347 * This function returns 0 if it succeeds, or one of the following 3348 * negative error codes: 3349 * 3350 * ErrorCode.CALLBACK_FAILURE 3351 * The callback function failed. 3352 */ 3353 ErrorCode onPing(Frame frame) 3354 { 3355 int rv = 0; 3356 if (frame.hd.stream_id != 0) { 3357 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PING: stream_id != 0"); 3358 } 3359 if ((frame.hd.flags & FrameFlags.ACK) == 0 && !isClosing()) 3360 { 3361 /* Peer sent ping, so ping it back */ 3362 addPing(FrameFlags.ACK, frame.ping.opaque_data); 3363 } 3364 bool ok = callOnFrame(frame); 3365 if (!ok) 3366 return ErrorCode.CALLBACK_FAILURE; 3367 return ErrorCode.OK; 3368 } 3369 3370 /* 3371 * Called when GOAWAY is received, assuming |frame| is properly 3372 * initialized. 3373 * 3374 * This function returns 0 if it succeeds, or one of the following 3375 * negative error codes: 3376 * 3377 * ErrorCode.CALLBACK_FAILURE 3378 * The callback function failed. 3379 */ 3380 ErrorCode onGoAway(Frame frame) 3381 { 3382 if (frame.hd.stream_id != 0) 3383 { 3384 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "GOAWAY: stream_id != 0"); 3385 } 3386 3387 /* Spec says Endpoints MUST NOT increase the value they send in the last stream identifier. */ 3388 if ((frame.goaway.last_stream_id > 0 && !isMyStreamId(frame.goaway.last_stream_id)) || 3389 remote_last_stream_id < frame.goaway.last_stream_id) 3390 { 3391 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "GOAWAY: invalid last_stream_id"); 3392 } 3393 3394 goaway_flags |= GoAwayFlags.RECV; 3395 3396 remote_last_stream_id = frame.goaway.last_stream_id; 3397 3398 bool ok = callOnFrame(frame); 3399 if (!ok) 3400 return ErrorCode.CALLBACK_FAILURE; 3401 3402 return closeStreamOnGoAway(frame.goaway.last_stream_id, false); 3403 } 3404 3405 /* 3406 * Called when WINDOW_UPDATE is recieved, assuming |frame| is properly 3407 * initialized. 3408 * 3409 * This function returns 0 if it succeeds, or one of the following 3410 * negative error codes: 3411 * ErrorCode.CALLBACK_FAILURE 3412 * The callback function failed. 3413 */ 3414 ErrorCode onWindowUpdate(Frame frame) 3415 { 3416 if (frame.hd.stream_id == 0) 3417 { 3418 /* Handle connection-level flow control */ 3419 if (frame.window_update.window_size_increment == 0) 3420 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, null); 3421 3422 if (MAX_WINDOW_SIZE - frame.window_update.window_size_increment < remote_window_size) 3423 return handleInvalidConnection(frame, FrameError.FLOW_CONTROL_ERROR, null); 3424 3425 remote_window_size += frame.window_update.window_size_increment; 3426 3427 } else { 3428 /* handle stream window update */ 3429 Stream stream = getStream(frame.hd.stream_id); 3430 3431 if (!stream) { 3432 if (idleStreamDetect(frame.hd.stream_id)) 3433 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "WINDOW_UPDATE to idle stream"); 3434 return ErrorCode.OK; 3435 } 3436 3437 if (isReservedRemote(stream)) 3438 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "WINDOW_UPADATE to reserved stream"); 3439 3440 if (frame.window_update.window_size_increment == 0) 3441 return handleInvalidStream(frame, FrameError.PROTOCOL_ERROR); 3442 3443 if (MAX_WINDOW_SIZE - frame.window_update.window_size_increment < stream.remoteWindowSize) 3444 return handleInvalidStream(frame, FrameError.FLOW_CONTROL_ERROR); 3445 3446 stream.remoteWindowSize = stream.remoteWindowSize + frame.window_update.window_size_increment; 3447 3448 if (stream.remoteWindowSize > 0 && stream.isDeferredByFlowControl()) 3449 stream.resumeDeferredItem(StreamFlags.DEFERRED_FLOW_CONTROL, this); 3450 } 3451 3452 bool ok = callOnFrame(frame); 3453 3454 if (!ok) 3455 return ErrorCode.CALLBACK_FAILURE; 3456 3457 return ErrorCode.OK; 3458 } 3459 3460 /* 3461 * Called when DATA is received, assuming |frame| is properly 3462 * initialized. 3463 * 3464 * This function returns 0 if it succeeds, or one of the following 3465 * negative error codes: 3466 * ErrorCode.CALLBACK_FAILURE 3467 * The callback function failed. 3468 */ 3469 ErrorCode onData(Frame frame) 3470 { 3471 ErrorCode rv; 3472 bool call_cb = true; 3473 Stream stream = getStream(frame.hd.stream_id); 3474 3475 /* We don't call on_frame_recv_callback if stream has been closed already or being closed. */ 3476 if (!stream || stream.state == StreamState.CLOSING) { 3477 /* This should be treated as stream error, but it results in lots 3478 of RST_STREAM. So just ignore frame against nonexistent stream 3479 for now. */ 3480 return ErrorCode.OK; 3481 } 3482 3483 if (isHTTPMessagingEnabled() && (frame.hd.flags & FrameFlags.END_STREAM)) 3484 { 3485 if (!stream.validateRemoteEndStream()) { 3486 call_cb = false; 3487 handleInvalidStream2(stream.id, frame, FrameError.PROTOCOL_ERROR); 3488 } 3489 } 3490 3491 if (call_cb) { 3492 bool ok = callOnFrame(frame); 3493 if (!ok) { 3494 return ErrorCode.CALLBACK_FAILURE; 3495 } 3496 } 3497 3498 if (frame.hd.flags & FrameFlags.END_STREAM) 3499 { 3500 stream.shutdown(ShutdownFlag.RD); 3501 rv = closeStreamIfShutRdWr(stream); 3502 if (isFatal(rv)) { 3503 return rv; 3504 } 3505 } 3506 return ErrorCode.OK; 3507 } 3508 3509 /* 3510 * Packs DATA frame |frame| in wire frame format and stores it in 3511 * |bufs|. Payload will be read using |aux_data.data_prd|. The 3512 * length of payload is at most |datamax| bytes. 3513 * 3514 * This function returns 0 if it succeeds, or one of the following 3515 * negative error codes: 3516 * 3517 * ErrorCode.DEFERRED 3518 * The DATA frame is postponed. 3519 * ErrorCode.TEMPORAL_CALLBACK_FAILURE 3520 * The DataProvider failed (stream error). 3521 * ErrorCode.CALLBACK_FAILURE 3522 * The DataProvider failed (session error). 3523 */ 3524 ErrorCode packData(Buffers bufs, int datamax, ref Frame frame, ref DataAuxData aux_data) { 3525 ErrorCode rv; 3526 DataFlags data_flags; 3527 int payloadlen; 3528 int padded_payloadlen; 3529 Buffer* buf; 3530 size_t max_payloadlen; 3531 3532 assert(bufs.head == bufs.cur); 3533 3534 buf = &bufs.cur.buf; 3535 3536 Stream stream = getStream(frame.hd.stream_id); 3537 3538 if (!stream) 3539 return ErrorCode.INVALID_ARGUMENT; 3540 3541 try payloadlen = connector.maxFrameSize(frame.hd.type, stream.id, remote_window_size, stream.remoteWindowSize, remote_settings.max_frame_size); 3542 catch(Exception e) payloadlen = min(remote_window_size, stream.remoteWindowSize, remote_settings.max_frame_size); 3543 LOGF("send: read_length_callback=%d", payloadlen); 3544 3545 payloadlen = enforceFlowControlLimits(stream, payloadlen); 3546 3547 LOGF("send: read_length_callback after flow control=%d", payloadlen); 3548 3549 if (payloadlen <= 0) 3550 return ErrorCode.CALLBACK_FAILURE; 3551 3552 if (payloadlen > buf.available) { 3553 import core.exception : OutOfMemoryError; 3554 /* Resize the current buffer(s). The reason why we do +1 for buffer size is for possible padding field. */ 3555 try { 3556 aob.framebufs.realloc(FRAME_HDLEN + 1 + payloadlen); 3557 assert(aob.framebufs == bufs); 3558 buf = &bufs.cur.buf; 3559 } catch (OutOfMemoryError oom) { 3560 /* If reallocation failed, old buffers are still in tact. So use safe limit. */ 3561 payloadlen = datamax; 3562 rv = ErrorCode.NOMEM; 3563 } 3564 } 3565 3566 datamax = payloadlen; 3567 3568 /* Current max DATA length is less then buffer chunk size */ 3569 assert(buf.available >= datamax); 3570 3571 data_flags = DataFlags.NONE; 3572 3573 payloadlen = aux_data.data_prd(buf.pos[0 .. datamax], data_flags); 3574 3575 if (payloadlen == ErrorCode.DEFERRED || 3576 payloadlen == ErrorCode.TEMPORAL_CALLBACK_FAILURE) 3577 { 3578 import libhttp2.helpers : toString; 3579 LOGF("send: DATA postponed due to %s", toString(cast(ErrorCode)payloadlen)); 3580 3581 return cast(ErrorCode)payloadlen; 3582 } 3583 3584 if (payloadlen < 0 || datamax < cast(size_t)payloadlen) 3585 { 3586 /* This is the error code when callback is failed. */ 3587 return ErrorCode.CALLBACK_FAILURE; 3588 } 3589 3590 buf.last = buf.pos + payloadlen; 3591 buf.pos -= FRAME_HDLEN; 3592 3593 /* Clear flags, because this may contain previous flags of previous DATA */ 3594 frame.hd.flags = FrameFlags.NONE; 3595 3596 if (data_flags & DataFlags.EOF) { 3597 aux_data.eof = true; 3598 if (aux_data.flags & DataFlags.EOF) 3599 frame.hd.flags |= FrameFlags.END_STREAM; 3600 } 3601 3602 if (data_flags & DataFlags.NO_COPY) 3603 aux_data.no_copy = true; 3604 3605 frame.hd.length = payloadlen; 3606 frame.data.padlen = 0; 3607 3608 max_payloadlen = min(datamax, frame.hd.length + MAX_PADLEN); 3609 3610 padded_payloadlen = callSelectPadding(frame, max_payloadlen); 3611 3612 if (isFatal(cast(int)padded_payloadlen)) { 3613 return cast(ErrorCode)padded_payloadlen; 3614 } 3615 3616 frame.data.padlen = padded_payloadlen - payloadlen; 3617 3618 frame.hd.pack((*buf)[]); 3619 3620 frame.hd.addPad(bufs, frame.data.padlen, aux_data.no_copy); 3621 3622 return ErrorCode.OK; 3623 } 3624 /* 3625 * This function is called when HTTP header field |hf| in |frame| is 3626 * received for |stream|. This function will validate |hf| against 3627 * the current state of stream. 3628 * 3629 * This function returns 0 if it succeeds, or one of the following 3630 * negative error codes: 3631 * 3632 * ErrorCode.HTTP_HEADER 3633 * Invalid HTTP header field was received. 3634 * ErrorCode.IGN_HTTP_HEADER 3635 * Invalid HTTP header field was received but it can be treated as 3636 * if it was not received because of compatibility reasons. 3637 */ 3638 ErrorCode validateHeaderField(Stream stream, in Frame frame, HeaderField hf, bool trailer) 3639 { 3640 /* We are strict for pseudo header field. One bad character 3641 should lead to fail. OTOH, we should be a bit forgiving for 3642 regular headers, since existing public internet has so much 3643 illegal headers floating around and if we kill the stream 3644 because of this, we may disrupt many web sites and/or 3645 libraries. So we become conservative here, and just ignore 3646 those illegal regular headers. */ 3647 if (!hf.validateName()) 3648 { 3649 size_t i; 3650 if (hf.name.length > 0 && hf.name[0] == ':') { 3651 return ErrorCode.HTTP_HEADER; 3652 } 3653 /* header field name must be lower-cased without exception */ 3654 for (i = 0; i < hf.name.length; ++i) { 3655 char c = hf.name[i]; 3656 if ('A' <= c && c <= 'Z') { 3657 return ErrorCode.HTTP_HEADER; 3658 } 3659 } 3660 3661 /* When ignoring regular headers, we set this flag so that we 3662 still enforce header field ordering rule for pseudo header 3663 fields. */ 3664 stream.httpFlags = cast(HTTPFlags)(stream.httpFlags | HTTPFlags.PSEUDO_HEADER_DISALLOWED); 3665 return ErrorCode.IGN_HTTP_HEADER; 3666 } 3667 3668 if (!hf.validateValue()) 3669 { 3670 assert(hf.name.length > 0); 3671 if (hf.name[0] == ':') { 3672 return ErrorCode.HTTP_HEADER; 3673 } 3674 3675 /* When ignoring regular headers, we set this flag so that we 3676 still enforce header field ordering rule for pseudo header 3677 fields. */ 3678 stream.httpFlags = cast(HTTPFlags)(stream.httpFlags | HTTPFlags.PSEUDO_HEADER_DISALLOWED); 3679 return ErrorCode.IGN_HTTP_HEADER; 3680 } 3681 3682 if (is_server || frame.hd.type == FrameType.PUSH_PROMISE) 3683 return hf.validateRequestHeader(stream, trailer) ? ErrorCode.OK : ErrorCode.HTTP_HEADER; 3684 3685 return hf.validateResponseHeader(stream, trailer) ? ErrorCode.OK : ErrorCode.HTTP_HEADER; 3686 } 3687 3688 /* 3689 * Pops and returns next item to send. If there is no such item, 3690 * returns null. This function takes into account max concurrent 3691 * streams. That means if session.ob_pq is empty but 3692 * session.ob_ss_pq has item and max concurrent streams is reached, 3693 * then this function returns null. 3694 */ 3695 OutboundItem popNextOutboundItem() { 3696 OutboundItem item; 3697 OutboundItem headers_item; 3698 3699 if (ob_pq.empty) { 3700 if (ob_ss_pq.empty) { 3701 if (remote_window_size == 0 || ob_da_pq.empty) 3702 return null; 3703 item = ob_da_pq.top; 3704 ob_da_pq.pop(); 3705 item.queued = 0; 3706 return item; 3707 } 3708 3709 /* Pop item only when concurrent connection limit is not reached */ 3710 if (isOutgoingConcurrentStreamsMax()) { 3711 if (remote_window_size == 0 || ob_da_pq.empty) 3712 return null; 3713 3714 item = ob_da_pq.top; 3715 ob_da_pq.pop(); 3716 item.queued = 0; 3717 return item; 3718 } 3719 3720 item = ob_ss_pq.top; 3721 ob_ss_pq.pop(); 3722 item.queued = 0; 3723 return item; 3724 } 3725 3726 if (ob_ss_pq.empty) { 3727 item = ob_pq.top; 3728 ob_pq.pop(); 3729 item.queued = 0; 3730 return item; 3731 } 3732 3733 item = ob_pq.top; 3734 headers_item = ob_ss_pq.top; 3735 3736 if (isOutgoingConcurrentStreamsMax() || 3737 item.weight > headers_item.weight || 3738 (item.weight == headers_item.weight && item.seq < headers_item.seq)) 3739 { 3740 ob_pq.pop(); 3741 item.queued = 0; 3742 return item; 3743 } 3744 3745 ob_ss_pq.pop(); 3746 headers_item.queued = 0; 3747 return headers_item; 3748 } 3749 3750 /* 3751 * Returns next item to send. If there is no such item, this function 3752 * returns null. This function takes into account max concurrent 3753 * streams. That means if session.ob_pq is empty but 3754 * session.ob_ss_pq has item and max concurrent streams is reached, 3755 * then this function returns null. 3756 */ 3757 OutboundItem getNextOutboundItem() { 3758 OutboundItem item; 3759 OutboundItem headers_item; 3760 3761 if (ob_pq.empty) { 3762 if (ob_ss_pq.empty) { 3763 if (remote_window_size == 0 || ob_da_pq.empty) 3764 return null; 3765 3766 return ob_da_pq.top; 3767 } 3768 3769 /* Return item only when concurrent connection limit is not reached */ 3770 if (isOutgoingConcurrentStreamsMax()) { 3771 if (remote_window_size == 0 || ob_da_pq.empty) 3772 return null; 3773 3774 return ob_da_pq.top; 3775 } 3776 3777 return ob_ss_pq.top; 3778 } 3779 3780 if (ob_ss_pq.empty) { 3781 return ob_pq.top; 3782 } 3783 3784 item = ob_pq.top; 3785 headers_item = ob_ss_pq.top; 3786 3787 if (isOutgoingConcurrentStreamsMax() || item.weight > headers_item.weight || 3788 (item.weight == headers_item.weight && item.seq < headers_item.seq)) 3789 { 3790 return item; 3791 } 3792 3793 return headers_item; 3794 } 3795 3796 /* 3797 * Updates local settings with the |iva|. The number of elements in the 3798 * array pointed by the |iva| is given by the |iva.length|. This function 3799 * assumes that the all settings_id member in |iva| are in range 1 to 3800 * Setting.MAX_HEADER_LIST_SIZE, inclusive. 3801 * 3802 * While updating individual stream's local window size, if the window 3803 * size becomes strictly larger than MAX_WINDOW_SIZE, 3804 * RST_STREAM is issued against such a stream. 3805 * 3806 * This function returns 0 if it succeeds, or one of the following 3807 * negative error codes: 3808 * 3809 * ErrorCode.HEADER_COMP 3810 * The header table size is out of range 3811 */ 3812 ErrorCode updateLocalSettings(in Setting[] iva) 3813 { 3814 ErrorCode rv; 3815 size_t i; 3816 int new_initial_window_size = -1; 3817 int header_table_size = -1; 3818 bool header_table_size_seen; 3819 /* Use the value last seen. */ 3820 foreach(iv; iva) { 3821 switch (iv.id) { 3822 case Setting.HEADER_TABLE_SIZE: 3823 header_table_size_seen = true; 3824 header_table_size = iv.value; 3825 break; 3826 case Setting.INITIAL_WINDOW_SIZE: 3827 new_initial_window_size = iv.value; 3828 break; 3829 default: break; 3830 } 3831 } 3832 if (header_table_size_seen) 3833 hd_inflater.changeTableSize(header_table_size); 3834 3835 if (new_initial_window_size != -1) { 3836 rv = updateLocalInitialWindowSize(new_initial_window_size, local_settings.initial_window_size); 3837 if (rv != ErrorCode.OK) { 3838 return rv; 3839 } 3840 } 3841 3842 foreach(iv; iva) { 3843 with(Setting) switch (iv.id) { 3844 case HEADER_TABLE_SIZE: 3845 local_settings.header_table_size = iv.value; 3846 break; 3847 case ENABLE_PUSH: 3848 local_settings.enable_push = iv.value; 3849 break; 3850 case MAX_CONCURRENT_STREAMS: 3851 local_settings.max_concurrent_streams = iv.value; 3852 break; 3853 case INITIAL_WINDOW_SIZE: 3854 local_settings.initial_window_size = iv.value; 3855 break; 3856 case MAX_FRAME_SIZE: 3857 local_settings.max_frame_size = iv.value; 3858 break; 3859 case MAX_HEADER_LIST_SIZE: 3860 local_settings.max_header_list_size = iv.value; 3861 break; 3862 default: break; 3863 } 3864 } 3865 3866 pending_local_max_concurrent_stream = INITIAL_MAX_CONCURRENT_STREAMS; 3867 pending_enable_push = true; 3868 3869 return ErrorCode.OK; 3870 } 3871 3872 /* 3873 * Re-prioritize |stream|. The new priority specification is |pri_spec|. 3874 */ 3875 void reprioritizeStream(Stream stream, ref PrioritySpec pri_spec) 3876 { 3877 Stream dep_stream; 3878 Stream root_stream; 3879 PrioritySpec pri_spec_default; 3880 3881 if (!stream.inDepTree()) 3882 return; 3883 3884 if (pri_spec.stream_id == stream.id) { 3885 terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "depend on itself"); 3886 return; 3887 } 3888 3889 if (pri_spec.stream_id != 0) { 3890 dep_stream = getStreamRaw(pri_spec.stream_id); 3891 3892 if (is_server && !dep_stream && idleStreamDetect(pri_spec.stream_id)) 3893 { 3894 dep_stream = openStream(pri_spec.stream_id, StreamFlags.NONE, pri_spec_default, StreamState.IDLE, null); 3895 3896 } else if (!dep_stream || !dep_stream.inDepTree()) { 3897 pri_spec = pri_spec_default; 3898 } 3899 } 3900 3901 if (pri_spec.stream_id == 0) { 3902 stream.removeSubtree(); 3903 3904 /* We have to update weight after removing stream from tree */ 3905 stream.weight = pri_spec.weight; 3906 3907 if (pri_spec.exclusive && 3908 roots.num_streams <= MAX_DEP_TREE_LENGTH) { 3909 3910 stream.makeTopmostRoot(this); 3911 } else { 3912 stream.makeRoot(this); 3913 } 3914 3915 return; 3916 } 3917 3918 assert(dep_stream); 3919 3920 if (stream.subtreeContains(dep_stream)) { 3921 LOGF("stream: cycle detected, dep_stream(%s=%d stream(%s)=%d", dep_stream, dep_stream.id, stream, stream.id); 3922 3923 dep_stream.removeSubtree(); 3924 dep_stream.makeRoot(this); 3925 } 3926 3927 stream.removeSubtree(); 3928 3929 /* We have to update weight after removing stream from tree */ 3930 stream.weight = pri_spec.weight; 3931 3932 root_stream = dep_stream.getRoot(); 3933 3934 if (root_stream.subStreams + stream.subStreams > MAX_DEP_TREE_LENGTH) 3935 { 3936 stream.weight = DEFAULT_WEIGHT; 3937 3938 stream.makeRoot(this); 3939 } else { 3940 if (pri_spec.exclusive) 3941 dep_stream.insertSubtree(stream, this); 3942 else 3943 dep_stream.addSubtree(stream, this); 3944 } 3945 } 3946 3947 /* 3948 * Terminates current $(D Session) with the |error_code|. The |reason| 3949 * is null-terminated debug string. 3950 * 3951 * This function returns 0 if it succeeds, or one of the following 3952 * negative error codes: 3953 * 3954 * ErrorCode.INVALID_ARGUMENT 3955 * The |reason| is too long. 3956 */ 3957 ErrorCode terminateSessionWithReason(FrameError error_code, string reason) 3958 { 3959 return terminateSession(last_proc_stream_id, error_code, reason); 3960 } 3961 3962 /* 3963 * Returns true if the number of outgoing opened streams is larger than or equal to 3964 * remote_settings.max_concurrent_streams. 3965 */ 3966 bool isOutgoingConcurrentStreamsMax() 3967 { 3968 return remote_settings.max_concurrent_streams <= num_outgoing_streams; 3969 } 3970 3971 /* 3972 * Returns true if the number of incoming opened streams is larger 3973 * than or equal to local_settings.max_concurrent_streams. 3974 */ 3975 bool isIncomingConcurrentStreamsMax() 3976 { 3977 return local_settings.max_concurrent_streams <= num_incoming_streams; 3978 } 3979 3980 /* 3981 * Returns true if the number of incoming opened streams is larger 3982 * than or equal to session.pending_local_max_concurrent_stream. 3983 */ 3984 bool isIncomingConcurrentStreamsPendingMax() 3985 { 3986 return pending_local_max_concurrent_stream <= num_incoming_streams; 3987 } 3988 3989 bool isHTTPMessagingEnabled() 3990 { 3991 return (opt_flags & OptionsMask.NO_HTTP_MESSAGING) == 0; 3992 } 3993 3994 /* 3995 * Returns true if |frame| is trailer headers. 3996 */ 3997 bool isTrailerHeaders(Stream stream, in Frame frame) 3998 { 3999 if (!stream || frame.hd.type != FrameType.HEADERS) { 4000 return false; 4001 } 4002 if (is_server) { 4003 return frame.headers.cat == HeadersCategory.HEADERS; 4004 } 4005 4006 return frame.headers.cat == HeadersCategory.HEADERS && (stream.httpFlags & HTTPFlags.EXPECT_FINAL_RESPONSE) == 0; 4007 } 4008 4009 /* Returns true if the |stream| is in reserved(remote) state */ 4010 bool isReservedRemote(Stream stream) 4011 { 4012 return stream.state == StreamState.RESERVED && !isMyStreamId(stream.id); 4013 } 4014 4015 /* Returns true if the |stream| is in reserved(local) state */ 4016 bool isReservedLocal(Stream stream) { 4017 return stream.state == StreamState.RESERVED && isMyStreamId(stream.id); 4018 } 4019 4020 /* 4021 * Checks whether received stream_id is valid. 4022 */ 4023 bool isNewPeerStreamId(int stream_id) 4024 { 4025 return stream_id != 0 && !isMyStreamId(stream_id) && last_recv_stream_id < stream_id; 4026 } 4027 4028 4029 /** 4030 * @function 4031 * 4032 * Returns the last stream ID of a stream for which 4033 * $(D Connector.onFrame) was invoked most recently. 4034 * The returned value can be used as last_stream_id parameter for 4035 * `submitGoAway()` and `terminateSession()`. 4036 * 4037 * This function always succeeds. 4038 */ 4039 int getLastProcStreamID() 4040 { 4041 return last_proc_stream_id; 4042 } 4043 4044 bool idleStreamDetect(int stream_id) 4045 { 4046 /* Assume that stream object with stream_id does not exist */ 4047 if (isMyStreamId(stream_id)) { 4048 if (next_stream_id <= cast(uint)stream_id) 4049 return true; 4050 return false; 4051 } 4052 if (isNewPeerStreamId(stream_id)) 4053 return true; 4054 4055 return false; 4056 } 4057 4058 void freeAllStreams() { 4059 foreach(stream; streams) 4060 { 4061 OutboundItem item = stream.item; 4062 4063 if (item && !item.queued && item != aob.item) 4064 { 4065 item.free(); 4066 Mem.free(item); 4067 } 4068 4069 stream.free(); 4070 Mem.free(stream); 4071 } 4072 } 4073 4074 /* 4075 * Returns Stream object whose stream ID is |stream_id|. It 4076 * could be null if such stream does not exist. This function returns 4077 * null if stream is marked as closed. 4078 */ 4079 Stream getStream(int stream_id) 4080 { 4081 Stream stream; 4082 4083 stream = streams.get(stream_id); 4084 4085 if (!stream || (stream.flags & StreamFlags.CLOSED) || stream.state == StreamState.IDLE) 4086 { 4087 return null; 4088 } 4089 4090 return stream; 4091 } 4092 /* 4093 * This function behaves like getStream(), but it 4094 * returns stream object even if it is marked as closed or in 4095 * StreamState.IDLE state. 4096 */ 4097 Stream getStreamRaw(int stream_id) 4098 { 4099 return streams.get(stream_id); 4100 } 4101 4102 // terminates the session 4103 ErrorCode terminateSession(int last_stream_id, FrameError error_code, string reason) 4104 { 4105 ErrorCode rv; 4106 string debug_data; 4107 4108 if (goaway_flags & GoAwayFlags.TERM_ON_SEND) { 4109 return ErrorCode.OK; 4110 } 4111 4112 if (!reason) { 4113 debug_data = null; 4114 } else { 4115 debug_data = reason; 4116 } 4117 4118 rv = addGoAway(last_stream_id, error_code, debug_data, GoAwayAuxFlags.TERM_ON_SEND); 4119 4120 if (rv != ErrorCode.OK) { 4121 return rv; 4122 } 4123 4124 goaway_flags |= GoAwayFlags.TERM_ON_SEND; 4125 4126 return ErrorCode.OK; 4127 } 4128 4129 /* 4130 * This function returns nonzero if session is closing. 4131 */ 4132 bool isClosing() 4133 { 4134 return (goaway_flags & GoAwayFlags.TERM_ON_SEND) != 0; 4135 } 4136 4137 /* 4138 * Check that we can send a frame to the |stream|. This function 4139 * returns 0 if we can send a frame to the |frame|, or one of the 4140 * following negative error codes: 4141 * 4142 * ErrorCode.STREAM_CLOSED 4143 * The stream is already closed. 4144 * ErrorCode.STREAM_SHUT_WR 4145 * The stream is half-closed for transmission. 4146 * ErrorCode.SESSION_CLOSING 4147 * This session is closing. 4148 */ 4149 ErrorCode predicateForStreamSend(Stream stream) 4150 { 4151 if (!stream) { 4152 return ErrorCode.STREAM_CLOSED; 4153 } 4154 if (isClosing()) { 4155 return ErrorCode.SESSION_CLOSING; 4156 } 4157 if (stream.shutFlags & ShutdownFlag.WR) { 4158 return ErrorCode.STREAM_SHUT_WR; 4159 } 4160 return ErrorCode.OK; 4161 } 4162 4163 /* 4164 * This function checks request HEADERS frame, which opens stream, can 4165 * be sent at this time. 4166 * 4167 * This function returns 0 if it succeeds, or one of the following 4168 * negative error codes: 4169 * 4170 * ErrorCode.START_STREAM_NOT_ALLOWED 4171 * New stream cannot be created because of GOAWAY: session is 4172 * going down or received last_stream_id is strictly less than 4173 * frame.hd.stream_id. 4174 * ErrorCode.STREAM_CLOSING 4175 * request HEADERS was canceled by RST_STREAM while it is in queue. 4176 */ 4177 ErrorCode predicateRequestHeadersSend(OutboundItem item) 4178 { 4179 if (item.aux_data.headers.canceled) { 4180 return ErrorCode.STREAM_CLOSING; 4181 } 4182 /* If we are terminating session (GoAwayFlags.TERM_ON_SEND) or 4183 * GOAWAY was received from peer, new request is not allowed. */ 4184 4185 if (goaway_flags & (GoAwayFlags.TERM_ON_SEND | GoAwayFlags.RECV)) 4186 { 4187 return ErrorCode.START_STREAM_NOT_ALLOWED; 4188 } 4189 return ErrorCode.OK; 4190 } 4191 4192 /* 4193 * This function checks HEADERS, which is the first frame from the 4194 * server, with the |stream| can be sent at this time. The |stream| 4195 * can be null. 4196 * 4197 * This function returns 0 if it succeeds, or one of the following 4198 * negative error codes: 4199 * 4200 * ErrorCode.STREAM_CLOSED 4201 * The stream is already closed or does not exist. 4202 * ErrorCode.STREAM_SHUT_WR 4203 * The transmission is not allowed for this stream (e.g., a frame 4204 * with END_STREAM flag set has already sent) 4205 * ErrorCode.INVALID_STREAM_ID 4206 * The stream ID is invalid. 4207 * ErrorCode.STREAM_CLOSING 4208 * RST_STREAM was queued for this stream. 4209 * ErrorCode.INVALID_STREAM_STATE 4210 * The state of the stream is not valid. 4211 * ErrorCode.SESSION_CLOSING 4212 * This session is closing. 4213 */ 4214 ErrorCode predicateResponseHeadersSend(Stream stream) 4215 { 4216 ErrorCode rv; 4217 rv = predicateForStreamSend(stream); 4218 if (rv != ErrorCode.OK) { 4219 return rv; 4220 } 4221 assert(stream); 4222 if (isMyStreamId(stream.id)) { 4223 return ErrorCode.INVALID_STREAM_ID; 4224 } 4225 if (stream.state == StreamState.OPENING) { 4226 return ErrorCode.OK; 4227 } 4228 if (stream.state == StreamState.CLOSING) { 4229 return ErrorCode.STREAM_CLOSING; 4230 } 4231 return ErrorCode.INVALID_STREAM_STATE; 4232 } 4233 4234 /* 4235 * This function checks HEADERS for reserved stream can be sent. The 4236 * |stream| must be reserved state and the |session| is server side. 4237 * The |stream| can be null. 4238 * 4239 * This function returns 0 if it succeeds, or one of the following 4240 * error codes: 4241 * 4242 * ErrorCode.STREAM_CLOSED 4243 * The stream is already closed. 4244 * ErrorCode.STREAM_SHUT_WR 4245 * The stream is half-closed for transmission. 4246 * ErrorCode.PROTO 4247 * The stream is not reserved state 4248 * ErrorCode.STREAM_CLOSED 4249 * RST_STREAM was queued for this stream. 4250 * ErrorCode.SESSION_CLOSING 4251 * This session is closing. 4252 */ 4253 ErrorCode predicatePushResponseHeadersSend(Stream stream) 4254 { 4255 ErrorCode rv; 4256 /* TODO Should disallow HEADERS if GOAWAY has already been issued? */ 4257 rv = predicateForStreamSend(stream); 4258 if (rv != ErrorCode.OK) { 4259 return rv; 4260 } 4261 assert(stream); 4262 if (stream.state != StreamState.RESERVED) { 4263 return ErrorCode.PROTO; 4264 } 4265 if (stream.state == StreamState.CLOSING) { 4266 return ErrorCode.STREAM_CLOSING; 4267 } 4268 return ErrorCode.OK; 4269 } 4270 4271 /* 4272 * This function checks HEADERS, which is neither stream-opening nor 4273 * first response header, with the |stream| can be sent at this time. 4274 * The |stream| can be null. 4275 * 4276 * This function returns 0 if it succeeds, or one of the following 4277 * negative error codes: 4278 * 4279 * ErrorCode.STREAM_CLOSED 4280 * The stream is already closed or does not exist. 4281 * ErrorCode.STREAM_SHUT_WR 4282 * The transmission is not allowed for this stream (e.g., a frame 4283 * with END_STREAM flag set has already sent) 4284 * ErrorCode.STREAM_CLOSING 4285 * RST_STREAM was queued for this stream. 4286 * ErrorCode.INVALID_STREAM_STATE 4287 * The state of the stream is not valid. 4288 * ErrorCode.SESSION_CLOSING 4289 * This session is closing. 4290 */ 4291 ErrorCode predicateHeadersSend(Stream stream) 4292 { 4293 ErrorCode rv; 4294 rv = predicateForStreamSend(stream); 4295 if (rv != ErrorCode.OK) { 4296 return rv; 4297 } 4298 assert(stream); 4299 if (isMyStreamId(stream.id)) 4300 { 4301 if (stream.state == StreamState.CLOSING) { 4302 return ErrorCode.STREAM_CLOSING; 4303 } 4304 return ErrorCode.OK; 4305 } 4306 if (stream.state == StreamState.OPENED) { 4307 return ErrorCode.OK; 4308 } 4309 if (stream.state == StreamState.CLOSING) { 4310 return ErrorCode.STREAM_CLOSING; 4311 } 4312 return ErrorCode.INVALID_STREAM_STATE; 4313 } 4314 4315 /* 4316 * This function checks PUSH_PROMISE frame |frame| with the |stream| 4317 * can be sent at this time. The |stream| can be null. 4318 * 4319 * This function returns 0 if it succeeds, or one of the following 4320 * negative error codes: 4321 * 4322 * ErrorCode.START_STREAM_NOT_ALLOWED 4323 * New stream cannot be created because GOAWAY is already sent or 4324 * received. 4325 * ErrorCode.PROTO 4326 * The client side attempts to send PUSH_PROMISE, or the server 4327 * sends PUSH_PROMISE for the stream not initiated by the client. 4328 * ErrorCode.STREAM_CLOSED 4329 * The stream is already closed or does not exist. 4330 * ErrorCode.STREAM_CLOSING 4331 * RST_STREAM was queued for this stream. 4332 * ErrorCode.STREAM_SHUT_WR 4333 * The transmission is not allowed for this stream (e.g., a frame 4334 * with END_STREAM flag set has already sent) 4335 * ErrorCode.PUSH_DISABLED 4336 * The remote peer disabled reception of PUSH_PROMISE. 4337 * ErrorCode.SESSION_CLOSING 4338 * This session is closing. 4339 */ 4340 ErrorCode predicatePushPromiseSend(Stream stream) 4341 { 4342 ErrorCode rv; 4343 4344 if (!is_server) { 4345 return ErrorCode.PROTO; 4346 } 4347 4348 rv = predicateForStreamSend(stream); 4349 if (rv != ErrorCode.OK) { 4350 return rv; 4351 } 4352 4353 assert(stream); 4354 4355 if (remote_settings.enable_push == 0) { 4356 return ErrorCode.PUSH_DISABLED; 4357 } 4358 if (stream.state == StreamState.CLOSING) { 4359 return ErrorCode.STREAM_CLOSING; 4360 } 4361 if (goaway_flags & GoAwayFlags.RECV) { 4362 return ErrorCode.START_STREAM_NOT_ALLOWED; 4363 } 4364 return ErrorCode.OK; 4365 } 4366 4367 /* 4368 * This function checks WINDOW_UPDATE with the stream ID |stream_id| 4369 * can be sent at this time. Note that END_STREAM flag of the previous 4370 * frame does not affect the transmission of the WINDOW_UPDATE frame. 4371 * 4372 * This function returns 0 if it succeeds, or one of the following 4373 * negative error codes: 4374 * 4375 * ErrorCode.STREAM_CLOSED 4376 * The stream is already closed or does not exist. 4377 * ErrorCode.STREAM_CLOSING 4378 * RST_STREAM was queued for this stream. 4379 * ErrorCode.INVALID_STREAM_STATE 4380 * The state of the stream is not valid. 4381 * ErrorCode.SESSION_CLOSING 4382 * This session is closing. 4383 */ 4384 ErrorCode predicateWindowUpdateSend(int stream_id) 4385 { 4386 Stream stream; 4387 if (stream_id == 0) { 4388 /* Connection-level window update */ 4389 return ErrorCode.OK; 4390 } 4391 stream = getStream(stream_id); 4392 if (!stream) { 4393 return ErrorCode.STREAM_CLOSED; 4394 } 4395 if (isClosing()) { 4396 return ErrorCode.SESSION_CLOSING; 4397 } 4398 if (stream.state == StreamState.CLOSING) { 4399 return ErrorCode.STREAM_CLOSING; 4400 } 4401 if (isReservedLocal(stream)) { 4402 return ErrorCode.INVALID_STREAM_STATE; 4403 } 4404 return ErrorCode.OK; 4405 } 4406 4407 /* 4408 * This function checks DATA with the |stream| can be sent at this 4409 * time. The |stream| can be null. 4410 * 4411 * This function returns 0 if it succeeds, or one of the following 4412 * negative error codes: 4413 * 4414 * ErrorCode.STREAM_CLOSED 4415 * The stream is already closed or does not exist. 4416 * ErrorCode.STREAM_SHUT_WR 4417 * The transmission is not allowed for this stream (e.g., a frame 4418 * with END_STREAM flag set has already sent) 4419 * ErrorCode.STREAM_CLOSING 4420 * RST_STREAM was queued for this stream. 4421 * ErrorCode.INVALID_STREAM_STATE 4422 * The state of the stream is not valid. 4423 * ErrorCode.SESSION_CLOSING 4424 * This session is closing. 4425 */ 4426 ErrorCode predicateDataSend(Stream stream) 4427 { 4428 ErrorCode rv; 4429 rv = predicateForStreamSend(stream); 4430 if (rv != ErrorCode.OK) { 4431 return rv; 4432 } 4433 assert(stream); 4434 if (isMyStreamId(stream.id)) { 4435 /* Request body data */ 4436 /* If stream.state is StreamState.CLOSING, RST_STREAM was queued but not yet sent. In this case, we won't send DATA frames. */ 4437 if (stream.state == StreamState.CLOSING) { 4438 return ErrorCode.STREAM_CLOSING; 4439 } 4440 if (stream.state == StreamState.RESERVED) { 4441 return ErrorCode.INVALID_STREAM_STATE; 4442 } 4443 return ErrorCode.OK; 4444 } 4445 /* Response body data */ 4446 if (stream.state == StreamState.OPENED) { 4447 return ErrorCode.OK; 4448 } 4449 if (stream.state == StreamState.CLOSING) { 4450 return ErrorCode.STREAM_CLOSING; 4451 } 4452 return ErrorCode.INVALID_STREAM_STATE; 4453 } 4454 4455 4456 /* Take into account settings max frame size and both connection-level flow control here */ 4457 int enforceFlowControlLimits(Stream stream, int requested_window_size) 4458 { 4459 LOGF("send: remote windowsize connection=%d, remote maxframsize=%u, stream(id %d=%d", 4460 remote_window_size, 4461 remote_settings.max_frame_size, stream.id, 4462 stream.remoteWindowSize); 4463 4464 return min(min(min(requested_window_size, stream.remoteWindowSize), remote_window_size), cast(int)remote_settings.max_frame_size); 4465 } 4466 4467 /* 4468 * Now we have SETTINGS synchronization, flow control error can be 4469 * detected strictly. If DATA frame is received with length > 0 and 4470 * current received window size + delta length is strictly larger than 4471 * local window size, it is subject to FLOW_CONTROL_ERROR, so return 4472 * false. Note that local_window_size is calculated after SETTINGS ACK is 4473 * received from peer, so peer must honor this limit. If the resulting 4474 * recv_window_size is strictly larger than MAX_WINDOW_SIZE, 4475 * return false too. 4476 */ 4477 bool adjustRecvWindowSize(ref int _recv_window_size, size_t delta, int local_window_size) 4478 { 4479 if (_recv_window_size > local_window_size - cast(int)delta || 4480 _recv_window_size > MAX_WINDOW_SIZE - cast(int)delta) 4481 { 4482 return false; 4483 } 4484 _recv_window_size += delta; 4485 return true; 4486 } 4487 /* 4488 * Accumulates received bytes |delta_size| for stream-level flow 4489 * control and decides whether to send WINDOW_UPDATE to that stream. 4490 * If OptionFlags.NO_AUTO_WINDOW_UPDATE is set, WINDOW_UPDATE will not 4491 * be sent. 4492 */ 4493 void updateRecvStreamWindowSize(Stream stream, size_t delta_size, int send_window_update) 4494 { 4495 bool ok = adjustRecvWindowSize(stream.recvWindowSize, delta_size, stream.localWindowSize); 4496 if (!ok) { 4497 addRstStream(stream.id, FrameError.FLOW_CONTROL_ERROR); 4498 return; 4499 } 4500 /* We don't have to send WINDOW_UPDATE if the data received is the last chunk in the incoming stream. */ 4501 if (send_window_update && !(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) { 4502 /* We have to use local_settings here because it is the constraint the remote endpoint should honor. */ 4503 if (shouldSendWindowUpdate(stream.localWindowSize, stream.recvWindowSize)) { 4504 addWindowUpdate(FrameFlags.NONE, stream.id, stream.recvWindowSize); 4505 stream.recvWindowSize = 0; 4506 } 4507 } 4508 } 4509 4510 /* 4511 * Accumulates received bytes |delta_size| for connection-level flow 4512 * control and decides whether to send WINDOW_UPDATE to the 4513 * connection. If OptionFlags.NO_AUTO_WINDOW_UPDATE is set, 4514 * WINDOW_UPDATE will not be sent. 4515 */ 4516 ErrorCode updateRecvConnectionWindowSize(size_t delta_size) 4517 { 4518 ErrorCode rv; 4519 bool ok = adjustRecvWindowSize(recv_window_size, delta_size, local_window_size); 4520 if (!ok) { 4521 return terminateSession(FrameError.FLOW_CONTROL_ERROR); 4522 } 4523 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) 4524 { 4525 4526 if (shouldSendWindowUpdate(local_window_size, recv_window_size)) 4527 { 4528 /* Use stream ID 0 to update connection-level flow control window */ 4529 addWindowUpdate(FrameFlags.NONE, 0, recv_window_size); 4530 recv_window_size = 0; 4531 } 4532 } 4533 return ErrorCode.OK; 4534 } 4535 4536 ErrorCode updateConsumedSize(ref int consumed_size, ref int recv_window_size, int stream_id, size_t delta_size, int local_window_size) 4537 { 4538 int recv_size; 4539 ErrorCode rv; 4540 4541 if (cast(size_t)consumed_size > MAX_WINDOW_SIZE - delta_size) 4542 { 4543 return terminateSession(FrameError.FLOW_CONTROL_ERROR); 4544 } 4545 4546 consumed_size += delta_size; 4547 4548 /* recv_window_size may be smaller than consumed_size, because it may be decreased by negative value with http2_submit_window_update(). */ 4549 recv_size = min(consumed_size, recv_window_size); 4550 4551 if (shouldSendWindowUpdate(local_window_size, recv_size)) 4552 { 4553 addWindowUpdate(FrameFlags.NONE, stream_id, recv_size); 4554 recv_window_size -= recv_size; 4555 consumed_size -= recv_size; 4556 } 4557 4558 return ErrorCode.OK; 4559 } 4560 4561 ErrorCode updateStreamConsumedSize(Stream stream, size_t delta_size) 4562 { 4563 return updateConsumedSize(stream.consumedSize, stream.recvWindowSize, stream.id, delta_size, stream.localWindowSize); 4564 } 4565 4566 ErrorCode updateConnectionConsumedSize(size_t delta_size) 4567 { 4568 return updateConsumedSize(consumed_size, recv_window_size, 0, delta_size, local_window_size); 4569 } 4570 4571 4572 /* 4573 * Returns the maximum length of next data read. If the 4574 * connection-level and/or stream-wise flow control are enabled, the 4575 * return value takes into account those current window sizes. The remote 4576 * settings for max frame size is also taken into account. 4577 */ 4578 int nextDataRead(Stream stream) 4579 { 4580 int window_size; 4581 window_size = enforceFlowControlLimits(stream, DATA_PAYLOADLEN); 4582 4583 LOGF("send: available window=%d", window_size); 4584 4585 return window_size > 0 ? window_size : 0; 4586 } 4587 4588 int callSelectPadding(in Frame frame, size_t max_payloadlen) 4589 { 4590 int rv; 4591 4592 if (frame.hd.length >= max_payloadlen) { 4593 return frame.hd.length; 4594 } 4595 4596 int max_paddedlen = cast(int) min(frame.hd.length + MAX_PADLEN, max_payloadlen); 4597 4598 try rv = connector.selectPaddingLength(frame, max_paddedlen); 4599 catch (Exception e) return cast(int) ErrorCode.CALLBACK_FAILURE; 4600 if (rv < cast(int)frame.hd.length || rv > cast(int)max_paddedlen) { 4601 return cast(int) ErrorCode.CALLBACK_FAILURE; 4602 } 4603 return rv; 4604 } 4605 4606 ErrorCode callWriteData(OutboundItem item, Buffers framebufs) 4607 { 4608 Buffer* buf = &framebufs.cur.buf; 4609 Frame* frame = &item.frame; 4610 uint length = frame.hd.length - frame.data.padlen; 4611 FrameHeader hd; 4612 hd.unpack(buf.pos[0 .. FRAME_HDLEN]); 4613 ErrorCode rv = connector.writeData(*frame, buf.pos[0 .. FRAME_HDLEN], length); 4614 4615 if (rv == ErrorCode.OK || rv == ErrorCode.WOULDBLOCK || rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) 4616 return rv; 4617 return ErrorCode.CALLBACK_FAILURE; 4618 4619 } 4620 4621 bool callOnFrameReady(in Frame frame) 4622 { 4623 try return connector.onFrameReady(frame); 4624 catch(Exception e) return false; 4625 } 4626 4627 bool callOnFrameSent(in Frame frame) 4628 { 4629 try return connector.onFrameSent(frame); 4630 catch(Exception e) return false; 4631 } 4632 4633 bool callOnFrameHeader(in FrameHeader hd) 4634 { 4635 try return connector.onFrameHeader(hd); 4636 catch(Exception e) return false; 4637 } 4638 4639 bool callOnHeaders(in Frame frame) 4640 { 4641 LOGF("recv: call onHeaders callback stream_id=%d", frame.hd.stream_id); 4642 try return connector.onHeaders(frame); 4643 catch(Exception e) return false; 4644 } 4645 4646 bool callOnHeaderField(in Frame frame, in HeaderField hf, ref bool pause, ref bool close) 4647 { 4648 try return connector.onHeaderField(frame, hf, pause, close); 4649 catch(Exception e) return false; 4650 } 4651 4652 int callRead(ubyte[] buf) 4653 { 4654 int len; 4655 try len = connector.read(buf); 4656 catch(Exception e) return ErrorCode.CALLBACK_FAILURE; 4657 4658 if (len > 0) { 4659 if (cast(size_t) len > buf.length) 4660 return ErrorCode.CALLBACK_FAILURE; 4661 } else if (len < 0 && len != cast(int) ErrorCode.WOULDBLOCK && len != cast(int)ErrorCode.EOF) 4662 return ErrorCode.CALLBACK_FAILURE; 4663 4664 return len; 4665 } 4666 4667 bool callOnFrame(in Frame frame) 4668 { 4669 try return connector.onFrame(frame); 4670 catch(Exception e) return false; 4671 } 4672 4673 /* 4674 * Checks that we can receive the DATA frame for stream, which is 4675 * indicated by |session.iframe.frame.hd.stream_id|. If it is a 4676 * connection error situation, GOAWAY frame will be issued by this 4677 * function. 4678 * 4679 * If the DATA frame is allowed, returns 0. 4680 * 4681 * This function returns 0 if it succeeds, or one of the following 4682 * negative error codes: 4683 * 4684 * ErrorCode.IGN_PAYLOAD 4685 * The reception of DATA frame is connection error; or should be 4686 * ignored. 4687 */ 4688 ErrorCode onDataFailFast() 4689 { 4690 ErrorCode rv; 4691 Stream stream; 4692 int stream_id; 4693 string failure_reason; 4694 FrameError error_code = FrameError.PROTOCOL_ERROR; 4695 4696 stream_id = iframe.frame.hd.stream_id; 4697 4698 if (stream_id == 0) { 4699 /* The spec says that if a DATA frame is received whose stream ID 4700 is 0, the recipient MUST respond with a connection error of 4701 type PROTOCOL_ERROR. */ 4702 failure_reason = "DATA: stream_id == 0"; 4703 goto fail; 4704 } 4705 stream = getStream(stream_id); 4706 if (!stream) { 4707 if (idleStreamDetect(stream_id)) 4708 { 4709 failure_reason = "DATA: stream in idle"; 4710 error_code = FrameError.STREAM_CLOSED; 4711 goto fail; 4712 } 4713 return ErrorCode.IGN_PAYLOAD; 4714 } 4715 if (stream.shutFlags & ShutdownFlag.RD) { 4716 failure_reason = "DATA: stream in half-closed(remote)"; 4717 error_code = FrameError.STREAM_CLOSED; 4718 goto fail; 4719 } 4720 4721 if (isMyStreamId(stream_id)) { 4722 if (stream.state == StreamState.CLOSING) { 4723 return ErrorCode.IGN_PAYLOAD; 4724 } 4725 if (stream.state != StreamState.OPENED) { 4726 failure_reason = "DATA: stream not opened"; 4727 goto fail; 4728 } 4729 return ErrorCode.OK; 4730 } 4731 if (stream.state == StreamState.RESERVED) { 4732 failure_reason = "DATA: stream in reserved"; 4733 goto fail; 4734 } 4735 if (stream.state == StreamState.CLOSING) { 4736 return ErrorCode.IGN_PAYLOAD; 4737 } 4738 return ErrorCode.OK; 4739 fail: 4740 rv = terminateSessionWithReason(error_code, failure_reason); 4741 if (isFatal(rv)) { 4742 return rv; 4743 } 4744 return ErrorCode.IGN_PAYLOAD; 4745 } 4746 4747 4748 ErrorCode afterHeaderBlockReceived() 4749 { 4750 ErrorCode rv; 4751 bool call_cb = true; 4752 Frame* frame = &iframe.frame; 4753 Stream stream; 4754 4755 /* We don't call Connector.onFrame if stream has been closed already or being closed. */ 4756 stream = getStream(frame.hd.stream_id); 4757 if (!stream || stream.state == StreamState.CLOSING) 4758 { 4759 return ErrorCode.OK; 4760 } 4761 4762 if (isHTTPMessagingEnabled()) { 4763 if (frame.hd.type == FrameType.PUSH_PROMISE) { 4764 Stream subject_stream; 4765 4766 subject_stream = getStream(frame.push_promise.promised_stream_id); 4767 if (subject_stream) { 4768 if (!subject_stream.onRequestHeaders(*frame)) 4769 rv = ErrorCode.ERROR; 4770 } 4771 } else { 4772 assert(frame.hd.type == FrameType.HEADERS); 4773 with(HeadersCategory) switch (frame.headers.cat) { 4774 case REQUEST: 4775 if (!stream.onRequestHeaders(*frame)) 4776 rv = ErrorCode.ERROR; 4777 break; 4778 case RESPONSE: 4779 case PUSH_RESPONSE: 4780 if (!stream.onResponseHeaders()) 4781 rv = ErrorCode.ERROR; 4782 break; 4783 case HEADERS: 4784 if (stream.httpFlags & HTTPFlags.EXPECT_FINAL_RESPONSE) { 4785 assert(!is_server); 4786 if (!stream.onResponseHeaders()) 4787 rv = ErrorCode.ERROR; 4788 } else { 4789 if (!stream.validateTrailerHeaders(*frame)) 4790 rv = ErrorCode.ERROR; 4791 } 4792 break; 4793 default: 4794 assert(0); 4795 } 4796 if (rv == 0 && (frame.hd.flags & FrameFlags.END_STREAM)) { 4797 if (!stream.validateRemoteEndStream()) 4798 rv = ErrorCode.ERROR; 4799 } 4800 } 4801 if (rv != ErrorCode.OK) { 4802 int stream_id; 4803 4804 if (frame.hd.type == FrameType.PUSH_PROMISE) { 4805 stream_id = frame.push_promise.promised_stream_id; 4806 } else { 4807 stream_id = frame.hd.stream_id; 4808 } 4809 4810 call_cb = false; 4811 4812 handleInvalidStream2(stream_id, *frame, FrameError.PROTOCOL_ERROR); 4813 } 4814 } 4815 4816 if (call_cb) { 4817 bool ok = callOnFrame(*frame); 4818 if (!ok) 4819 return ErrorCode.CALLBACK_FAILURE; 4820 } 4821 4822 if (frame.hd.type != FrameType.HEADERS) { 4823 return ErrorCode.OK; 4824 } 4825 4826 switch (frame.headers.cat) { 4827 case HeadersCategory.REQUEST: 4828 endRequestHeadersReceived(*frame, stream); 4829 return ErrorCode.OK; 4830 case HeadersCategory.RESPONSE: 4831 case HeadersCategory.PUSH_RESPONSE: 4832 return endResponseHeadersReceived(*frame, stream); 4833 case HeadersCategory.HEADERS: 4834 return endHeadersReceived(*frame, stream); 4835 default: 4836 assert(0); 4837 } 4838 } 4839 4840 ErrorCode processHeadersFrame() 4841 { 4842 Frame* frame = &iframe.frame; 4843 Stream stream; 4844 4845 frame.headers.unpack(iframe.sbuf[]); 4846 4847 stream = getStream(frame.hd.stream_id); 4848 if (!stream) { 4849 frame.headers.cat = HeadersCategory.REQUEST; 4850 return onRequestHeaders(*frame); 4851 } 4852 4853 if (isMyStreamId(frame.hd.stream_id)) 4854 { 4855 if (stream.state == StreamState.OPENING) { 4856 frame.headers.cat = HeadersCategory.RESPONSE; 4857 return onResponseHeaders(*frame, stream); 4858 } 4859 frame.headers.cat = HeadersCategory.HEADERS; 4860 return onHeaders(*frame, stream); 4861 } 4862 if (stream.state == StreamState.RESERVED) { 4863 frame.headers.cat = HeadersCategory.PUSH_RESPONSE; 4864 return onPushResponseHeaders(*frame, stream); 4865 } 4866 frame.headers.cat = HeadersCategory.HEADERS; 4867 return onHeaders(*frame, stream); 4868 } 4869 4870 ErrorCode processPriorityFrame() 4871 { 4872 Frame* frame = &iframe.frame; 4873 4874 frame.priority.unpack(iframe.sbuf[]); 4875 4876 return onPriority(*frame); 4877 } 4878 4879 ErrorCode processRstStreamFrame() 4880 { 4881 Frame* frame = &iframe.frame; 4882 4883 frame.rst_stream.unpack(iframe.sbuf[]); 4884 4885 return onRstStream(*frame); 4886 } 4887 4888 4889 ErrorCode processSettingsFrame() 4890 { 4891 Frame* frame = &iframe.frame; 4892 size_t i; 4893 Setting min_header_size_entry; 4894 min_header_size_entry = iframe.iva[INBOUND_NUM_IV - 1]; 4895 4896 if (min_header_size_entry.value < uint.max) { 4897 /* If we have less value, then we must have Setting.HEADER_TABLE_SIZE in i < iframe.niv */ 4898 for (i = 0; i < iframe.niv; ++i) { 4899 if (iframe.iva[i].id == Setting.HEADER_TABLE_SIZE) { 4900 break; 4901 } 4902 } 4903 4904 assert(i < iframe.niv); 4905 4906 if (min_header_size_entry.value != iframe.iva[i].value) { 4907 iframe.iva[iframe.niv++] = iframe.iva[i]; 4908 iframe.iva[i] = min_header_size_entry; 4909 } 4910 } 4911 4912 frame.settings.unpack(iframe.iva[0 .. iframe.niv]); 4913 4914 return onSettings(*frame, false /* ACK */); 4915 } 4916 4917 ErrorCode processPushPromiseFrame() 4918 { 4919 Frame* frame = &iframe.frame; 4920 4921 frame.push_promise.unpack(iframe.sbuf[]); 4922 4923 return onPushPromise(*frame); 4924 } 4925 4926 ErrorCode processPingFrame() 4927 { 4928 Frame* frame = &iframe.frame; 4929 4930 frame.ping.unpack(iframe.sbuf[]); 4931 4932 return onPing(*frame); 4933 } 4934 4935 ErrorCode processGoAwayFrame() 4936 { 4937 Frame* frame = &iframe.frame; 4938 4939 frame.goaway.unpack(iframe.sbuf[], iframe.lbuf[]); 4940 4941 iframe.lbuf = Buffer(null); 4942 4943 return onGoAway(*frame); 4944 } 4945 4946 ErrorCode processWindowUpdateFrame() 4947 { 4948 Frame* frame = &iframe.frame; 4949 4950 frame.window_update.unpack(iframe.sbuf[]); 4951 4952 return onWindowUpdate(*frame); 4953 } 4954 4955 /* For errors, this function only returns FATAL error. */ 4956 ErrorCode processDataFrame() 4957 { 4958 ErrorCode rv; 4959 rv = onData(iframe.frame); 4960 if (isFatal(rv)) { 4961 return rv; 4962 } 4963 return ErrorCode.OK; 4964 } 4965 4966 ErrorCode handleInvalidStream(Frame frame, FrameError error_code) { 4967 4968 return handleInvalidStream2(frame.hd.stream_id, frame, error_code); 4969 } 4970 4971 ErrorCode handleInvalidStream2(int stream_id, Frame frame, FrameError error_code) { 4972 4973 addRstStream(stream_id, error_code); 4974 4975 try 4976 if (!connector.onInvalidFrame(frame, error_code)) 4977 return ErrorCode.CALLBACK_FAILURE; 4978 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 4979 4980 return ErrorCode.OK; 4981 } 4982 4983 ErrorCode handleInflateInvalidStream(Frame frame, FrameError error_code) { 4984 ErrorCode rv; 4985 rv = handleInvalidStream(frame, error_code); 4986 if (isFatal(rv)) { 4987 return rv; 4988 } 4989 return ErrorCode.IGN_HEADER_BLOCK; 4990 } 4991 4992 /* 4993 * Handles invalid frame which causes connection error. 4994 */ 4995 ErrorCode handleInvalidConnection(Frame frame, FrameError error_code, string reason) 4996 { 4997 try 4998 if (!connector.onInvalidFrame(frame, error_code)) 4999 return ErrorCode.CALLBACK_FAILURE; 5000 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 5001 5002 return terminateSessionWithReason(error_code, reason); 5003 } 5004 5005 ErrorCode handleInflateInvalidConnection(Frame frame, FrameError error_code, string reason) { 5006 ErrorCode rv; 5007 rv = handleInvalidConnection(frame, error_code, reason); 5008 if (isFatal(rv)) { 5009 return rv; 5010 } 5011 return ErrorCode.IGN_HEADER_BLOCK; 5012 } 5013 5014 5015 /* Add padding to HEADERS or PUSH_PROMISE. We use frame.headers.padlen in this function 5016 * to use the fact that frame.push_promise has also padlen in the same position. */ 5017 ErrorCode headersAddPad(Frame frame) 5018 { 5019 ErrorCode rv; 5020 int padded_payloadlen; 5021 Buffers framebufs = aob.framebufs; 5022 int padlen; 5023 int max_payloadlen; 5024 5025 max_payloadlen = min(MAX_PAYLOADLEN, frame.hd.length + MAX_PADLEN); 5026 5027 padded_payloadlen = callSelectPadding(frame, max_payloadlen); 5028 5029 if (isFatal(padded_payloadlen)) { 5030 return cast(ErrorCode)padded_payloadlen; 5031 } 5032 5033 padlen = padded_payloadlen - frame.hd.length; 5034 5035 LOGF("send: padding selected: payloadlen=%d, padlen=%d", padded_payloadlen, padlen); 5036 5037 frame.hd.addPad(framebufs, padlen, false); 5038 5039 frame.headers.padlen = padlen; 5040 5041 return ErrorCode.OK; 5042 } 5043 5044 size_t estimateHeadersPayload(in HeaderField[] hfa, size_t additional) 5045 { 5046 return hd_deflater.upperBound(hfa) + additional; 5047 } 5048 5049 /* 5050 * Updates the remote initial window size of all active streams. If 5051 * error occurs, all streams may not be updated. 5052 * 5053 */ 5054 ErrorCode updateRemoteInitialWindowSize(int new_initial_window_size) 5055 { 5056 ErrorCode rv; 5057 auto new_window_size = new_initial_window_size; 5058 auto old_window_size = remote_settings.initial_window_size; 5059 5060 foreach (stream; streams) 5061 { 5062 5063 bool ok = stream.updateRemoteInitialWindowSize(new_window_size, old_window_size); 5064 if (!ok) 5065 return terminateSession(FrameError.FLOW_CONTROL_ERROR); 5066 5067 /* If window size gets positive, push deferred DATA frame to outbound queue. */ 5068 if (stream.remoteWindowSize > 0 && stream.isDeferredByFlowControl()) 5069 stream.resumeDeferredItem(StreamFlags.DEFERRED_FLOW_CONTROL, this); 5070 5071 } 5072 5073 return rv; 5074 } 5075 5076 5077 /* 5078 * Updates the local initial window size of all active streams. If 5079 * error occurs, all streams may not be updated. 5080 */ 5081 ErrorCode updateLocalInitialWindowSize(int new_initial_window_size, int old_initial_window_size) 5082 { 5083 ErrorCode rv; 5084 auto new_window_size = new_initial_window_size; 5085 auto old_window_size = old_initial_window_size; 5086 5087 foreach(stream; streams) { 5088 if (!stream.updateLocalInitialWindowSize(new_window_size, old_window_size)) 5089 return terminateSession(FrameError.FLOW_CONTROL_ERROR); 5090 5091 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) { 5092 5093 if (shouldSendWindowUpdate(stream.localWindowSize, stream.recvWindowSize)) { 5094 5095 addWindowUpdate(FrameFlags.NONE, stream.id, stream.recvWindowSize); 5096 stream.recvWindowSize = 0; 5097 } 5098 } 5099 } 5100 return rv; 5101 } 5102 5103 /* 5104 * Returns the number of active streams, which includes streams in 5105 * reserved state. 5106 */ 5107 size_t getNumActiveStreams() { 5108 return streams.length - num_closed_streams - num_idle_streams; 5109 } 5110 5111 /* Closes non-idle and non-closed streams whose stream ID > last_stream_id. 5112 * If incoming is nonzero, we are going to close incoming streams. 5113 * Otherwise, close outgoing streams. */ 5114 ErrorCode closeStreamOnGoAway(int last_stream_id, bool incoming) 5115 { 5116 ErrorCode rv; 5117 5118 foreach(stream; streams) { 5119 5120 if ((!isMyStreamId(stream.id) && !incoming) || (isMyStreamId(stream.id) && incoming)) 5121 continue; 5122 5123 if (stream.state != StreamState.IDLE && !(stream.flags & StreamFlags.CLOSED) && stream.id > last_stream_id) 5124 { 5125 rv = closeStream(stream.id, FrameError.REFUSED_STREAM); 5126 if (isFatal(rv)) 5127 return rv; 5128 } 5129 } 5130 5131 return rv; 5132 } 5133 5134 void cycleWeightOutboundItem(OutboundItem item, int ini_weight) 5135 { 5136 if (item.weight == MIN_WEIGHT || item.weight > ini_weight) { 5137 5138 item.weight = ini_weight; 5139 5140 if (item.cycle == last_cycle) { 5141 item.cycle = ++last_cycle; 5142 } else { 5143 item.cycle = last_cycle; 5144 } 5145 } else { 5146 --item.weight; 5147 } 5148 } 5149 5150 /* 5151 * This function serializes frame for transmission. 5152 * 5153 * This function returns 0 if it succeeds, or one of negative error 5154 * codes, including both fatal and non-fatal ones. 5155 */ 5156 ErrorCode prepareFrame(OutboundItem item) 5157 { 5158 ErrorCode rv; 5159 Frame* frame = &item.frame; 5160 if (frame.hd.type != FrameType.DATA) { 5161 with(FrameType) switch (frame.hd.type) { 5162 case HEADERS: { 5163 HeadersAuxData *aux_data; 5164 size_t estimated_payloadlen; 5165 5166 aux_data = &item.aux_data.headers; 5167 5168 estimated_payloadlen = estimateHeadersPayload(frame.headers.hfa, PRIORITY_SPECLEN); 5169 5170 if (estimated_payloadlen > MAX_HEADERSLEN) { 5171 return ErrorCode.FRAME_SIZE_ERROR; 5172 } 5173 5174 if (frame.headers.cat == HeadersCategory.REQUEST) { 5175 /* initial HEADERS, which opens stream */ 5176 Stream stream = openStream(frame.hd.stream_id, StreamFlags.NONE, frame.headers.pri_spec, StreamState.INITIAL, aux_data.stream_user_data); 5177 5178 rv = predicateRequestHeadersSend(item); 5179 if (rv != ErrorCode.OK) { 5180 return rv; 5181 } 5182 5183 if (isHTTPMessagingEnabled()) { 5184 stream.setRequestMethod(*frame); 5185 } 5186 } else { 5187 Stream stream = getStream(frame.hd.stream_id); 5188 5189 if (predicatePushResponseHeadersSend(stream) == 0) 5190 { 5191 frame.headers.cat = HeadersCategory.PUSH_RESPONSE; 5192 if (aux_data.stream_user_data) 5193 stream.userData = aux_data.stream_user_data; 5194 } else if (predicateResponseHeadersSend(stream) == 0) { 5195 frame.headers.cat = HeadersCategory.RESPONSE; 5196 } else { 5197 frame.headers.cat = HeadersCategory.HEADERS; 5198 5199 rv = predicateHeadersSend(stream); 5200 5201 if (rv != ErrorCode.OK) { 5202 if (stream && stream.item == item) 5203 stream.detachItem(this); 5204 return rv; 5205 } 5206 } 5207 } 5208 5209 rv = frame.headers.pack(aob.framebufs, hd_deflater); 5210 5211 if (rv != ErrorCode.OK) { 5212 return rv; 5213 } 5214 5215 LOGF("send: before padding, HEADERS serialized in %d bytes", aob.framebufs.length); 5216 5217 rv = headersAddPad(*frame); 5218 5219 if (rv != ErrorCode.OK) { 5220 return rv; 5221 } 5222 5223 LOGF("send: HEADERS finally serialized in %d bytes", aob.framebufs.length); 5224 5225 break; 5226 } 5227 case PRIORITY: { 5228 if (isClosing()) { 5229 return ErrorCode.SESSION_CLOSING; 5230 } 5231 /* PRIORITY frame can be sent at any time and to any stream ID. */ 5232 frame.priority.pack(aob.framebufs); 5233 5234 /* Peer can send PRIORITY frame against idle stream to create 5235 "anchor" in dependency tree. Only client can do this in 5236 libhttp2. In libhttp2, only server retains non-active (closed 5237 or idle) streams in memory, so we don't open stream here. */ 5238 break; 5239 } 5240 case RST_STREAM: 5241 if (isClosing()) { 5242 return ErrorCode.SESSION_CLOSING; 5243 } 5244 frame.rst_stream.pack(aob.framebufs); 5245 break; 5246 case SETTINGS: { 5247 rv = frame.settings.pack(aob.framebufs); 5248 if (rv != ErrorCode.OK) { 5249 return rv; 5250 } 5251 break; 5252 } 5253 case PUSH_PROMISE: { 5254 Stream stream; 5255 HeadersAuxData *aux_data; 5256 PrioritySpec pri_spec; 5257 size_t estimated_payloadlen; 5258 5259 aux_data = &item.aux_data.headers; 5260 5261 stream = getStream(frame.hd.stream_id); 5262 5263 /* stream could be null if associated stream was already closed. */ 5264 if (stream) 5265 pri_spec = PrioritySpec(stream.id, DEFAULT_WEIGHT, 0); 5266 5267 openStream(frame.push_promise.promised_stream_id, StreamFlags.NONE, pri_spec, StreamState.RESERVED, aux_data.stream_user_data); 5268 5269 estimated_payloadlen = estimateHeadersPayload(frame.push_promise.hfa, 0); 5270 5271 if (estimated_payloadlen > MAX_HEADERSLEN) 5272 return ErrorCode.FRAME_SIZE_ERROR; 5273 5274 /* predicte should fail if stream is null. */ 5275 rv = predicatePushPromiseSend(stream); 5276 if (rv != ErrorCode.OK) { 5277 return rv; 5278 } 5279 5280 assert(stream); 5281 5282 rv = frame.push_promise.pack(aob.framebufs, hd_deflater); 5283 if (rv != 0) 5284 return rv; 5285 5286 rv = headersAddPad(*frame); 5287 if (rv != 0) 5288 return rv; 5289 5290 break; 5291 } 5292 case PING: 5293 if (isClosing()) { 5294 return ErrorCode.SESSION_CLOSING; 5295 } 5296 frame.ping.pack(aob.framebufs); 5297 break; 5298 case WINDOW_UPDATE: { 5299 rv = predicateWindowUpdateSend(frame.hd.stream_id); 5300 if (rv != ErrorCode.OK) { 5301 return rv; 5302 } 5303 frame.window_update.pack(aob.framebufs); 5304 break; 5305 } 5306 case GOAWAY: 5307 rv = frame.goaway.pack(aob.framebufs); 5308 if (rv != ErrorCode.OK) { 5309 return rv; 5310 } 5311 local_last_stream_id = frame.goaway.last_stream_id; 5312 5313 break; 5314 default: 5315 return ErrorCode.INVALID_ARGUMENT; 5316 } 5317 return ErrorCode.OK; 5318 } else { 5319 int next_readmax; 5320 Stream stream = getStream(frame.hd.stream_id); 5321 5322 if (stream) { 5323 assert(stream.item == item); 5324 } 5325 5326 rv = predicateDataSend(stream); 5327 if (rv != ErrorCode.OK) { 5328 if (stream) 5329 stream.detachItem(this); 5330 5331 return rv; 5332 } 5333 /* Assuming stream is not null */ 5334 assert(stream); 5335 next_readmax = nextDataRead(stream); 5336 5337 if (next_readmax == 0) { 5338 5339 /* This must be true since we only pop DATA frame item from queue when session.remote_window_size > 0 */ 5340 assert(remote_window_size > 0); 5341 5342 stream.deferItem(StreamFlags.DEFERRED_FLOW_CONTROL, this); 5343 aob.item = null; 5344 aob.reset(); 5345 return ErrorCode.DEFERRED; 5346 } 5347 5348 rv = packData(aob.framebufs, next_readmax, *frame, item.aux_data.data); 5349 if (rv == ErrorCode.DEFERRED) { 5350 stream.deferItem(StreamFlags.DEFERRED_USER, this); 5351 aob.item = null; 5352 aob.reset(); 5353 return ErrorCode.DEFERRED; 5354 } 5355 if (rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) { 5356 stream.detachItem(this); 5357 addRstStream(frame.hd.stream_id, FrameError.INTERNAL_ERROR); 5358 return ErrorCode.TEMPORAL_CALLBACK_FAILURE; 5359 } 5360 if (rv != 0) { 5361 stream.detachItem(this); 5362 return rv; 5363 } 5364 return ErrorCode.OK; 5365 } 5366 } 5367 5368 /* 5369 * Called after a frame is sent. This function runs 5370 * $(D Connector.onFrameSent) and handles stream closure upon END_STREAM 5371 * or RST_STREAM. This function does not reset aob. It is a 5372 * responsibility of $(D resetActiveOutboundItem). 5373 * 5374 * This function returns 0 if it succeeds, or one of the following 5375 * negative error codes: 5376 * 5377 * ErrorCode.CALLBACK_FAILURE 5378 * The callback function failed. 5379 */ 5380 ErrorCode afterFrameSent() 5381 { 5382 ErrorCode rv; 5383 OutboundItem item = aob.item; 5384 Buffers framebufs = aob.framebufs; 5385 Frame* frame = &item.frame; 5386 5387 if (frame.hd.type != FrameType.DATA) { 5388 5389 if (frame.hd.type == FrameType.HEADERS || frame.hd.type == FrameType.PUSH_PROMISE) { 5390 5391 if (framebufs.nextPresent()) { 5392 LOGF("send: CONTINUATION exists, just return"); 5393 return ErrorCode.OK; 5394 } 5395 } 5396 bool ok = callOnFrameSent(*frame); 5397 if (!ok) { 5398 return ErrorCode.CALLBACK_FAILURE; 5399 } 5400 with(FrameType) switch (frame.hd.type) { 5401 case HEADERS: { 5402 HeadersAuxData *aux_data; 5403 Stream stream = getStream(frame.hd.stream_id); 5404 if (!stream) 5405 break; 5406 if (stream.item == item) 5407 stream.detachItem(this); 5408 5409 final switch (frame.headers.cat) { 5410 case HeadersCategory.REQUEST: { 5411 stream.state = StreamState.OPENING; 5412 if (frame.hd.flags & FrameFlags.END_STREAM) { 5413 stream.shutdown(ShutdownFlag.WR); 5414 } 5415 rv = closeStreamIfShutRdWr(stream); 5416 if (isFatal(rv)) { 5417 return rv; 5418 } 5419 /* We assume aux_data is a pointer to HeadersAuxData */ 5420 aux_data = &item.aux_data.headers; 5421 if (aux_data.data_prd) { 5422 /* submitData() makes a copy of aux_data.data_prd */ 5423 rv = submitData(this, FrameFlags.END_STREAM, frame.hd.stream_id, aux_data.data_prd); 5424 if (isFatal(rv)) { 5425 return rv; 5426 } 5427 /* TODO: submitData() may fail if stream has already DATA frame item. We might have to handle it here. */ 5428 } 5429 break; 5430 } 5431 case HeadersCategory.PUSH_RESPONSE: 5432 stream.flags = cast(StreamFlags)(stream.flags & ~StreamFlags.PUSH); 5433 ++num_outgoing_streams; 5434 goto case HeadersCategory.RESPONSE; 5435 case HeadersCategory.RESPONSE: 5436 stream.state = StreamState.OPENED; 5437 goto case HeadersCategory.HEADERS; 5438 case HeadersCategory.HEADERS: 5439 if (frame.hd.flags & FrameFlags.END_STREAM) { 5440 stream.shutdown(ShutdownFlag.WR); 5441 } 5442 rv = closeStreamIfShutRdWr(stream); 5443 if (isFatal(rv)) { 5444 return rv; 5445 } 5446 /* We assume aux_data is a pointer to HeadersAuxData */ 5447 aux_data = &item.aux_data.headers; 5448 if (aux_data.data_prd) { 5449 rv = submitData(this, FrameFlags.END_STREAM, frame.hd.stream_id, aux_data.data_prd); 5450 if (isFatal(rv)) { 5451 return rv; 5452 } 5453 /* TODO submitData() may fail if stream has already DATA frame item. 5454 * We might have to handle it here. */ 5455 } 5456 break; 5457 } 5458 break; 5459 } 5460 case PRIORITY: { 5461 Stream stream; 5462 5463 if (is_server) { 5464 break; 5465 } 5466 5467 stream = getStreamRaw(frame.hd.stream_id); 5468 5469 if (!stream) { 5470 break; 5471 } 5472 5473 reprioritizeStream(stream, frame.priority.pri_spec); 5474 5475 break; 5476 } 5477 case RST_STREAM: 5478 rv = closeStream(frame.hd.stream_id, frame.rst_stream.error_code); 5479 if (isFatal(rv)) { 5480 return rv; 5481 } 5482 break; 5483 case GOAWAY: { 5484 GoAwayAuxData aux_data = item.aux_data.goaway; 5485 5486 if ((aux_data.flags & GoAwayAuxFlags.SHUTDOWN_NOTICE) == 0) { 5487 5488 if (aux_data.flags & GoAwayAuxFlags.TERM_ON_SEND) { 5489 goaway_flags |= GoAwayFlags.TERM_SENT; 5490 } 5491 5492 goaway_flags |= GoAwayFlags.SENT; 5493 5494 rv = closeStreamOnGoAway(frame.goaway.last_stream_id, true); 5495 5496 if (isFatal(rv)) { 5497 return rv; 5498 } 5499 } 5500 5501 break; 5502 } 5503 default: 5504 break; 5505 } 5506 5507 return ErrorCode.OK; 5508 } 5509 5510 Stream stream = getStream(frame.hd.stream_id); 5511 DataAuxData *aux_data = &item.aux_data.data; 5512 /* We update flow control window after a frame was completely 5513 sent. This is possible because we choose payload length not to 5514 exceed the window */ 5515 remote_window_size -= frame.hd.length; 5516 if (stream) { 5517 stream.remoteWindowSize = stream.remoteWindowSize - frame.hd.length; 5518 } 5519 5520 if (stream && aux_data.eof) { 5521 stream.detachItem(this); 5522 5523 /* Call onFrameSent after detachItem(), so that application can issue submitData() in the callback. */ 5524 bool ok = callOnFrameSent(*frame); 5525 if (!ok) { 5526 return ErrorCode.CALLBACK_FAILURE; 5527 } 5528 5529 if (frame.hd.flags & FrameFlags.END_STREAM) { 5530 int stream_closed; 5531 5532 stream_closed = (stream.shutFlags & ShutdownFlag.RDWR) == ShutdownFlag.RDWR; 5533 5534 stream.shutdown(ShutdownFlag.WR); 5535 5536 rv = closeStreamIfShutRdWr(stream); 5537 if (isFatal(rv)) { 5538 return rv; 5539 } 5540 /* stream may be null if it was closed */ 5541 if (stream_closed) 5542 stream = null; 5543 } 5544 return ErrorCode.OK; 5545 } 5546 5547 bool ok = callOnFrameSent(*frame); 5548 5549 if (!ok) { 5550 return ErrorCode.CALLBACK_FAILURE; 5551 } 5552 5553 return ErrorCode.OK; 5554 } 5555 5556 /* 5557 * Called after a frame is sent and after $(D afterFrameSent). 5558 * This function is responsible for resetting aob. 5559 * 5560 * This function returns 0 if it succeeds, or one of the following 5561 * negative error codes: 5562 * 5563 * ErrorCode.CALLBACK_FAILURE 5564 * The callback function failed. 5565 */ 5566 ErrorCode resetActiveOutboundItem() 5567 { 5568 ErrorCode rv; 5569 OutboundItem item = aob.item; 5570 Buffers framebufs = aob.framebufs; 5571 Frame* frame = &item.frame; 5572 5573 if (frame.hd.type != FrameType.DATA) { 5574 5575 if (frame.hd.type == FrameType.HEADERS || 5576 frame.hd.type == FrameType.PUSH_PROMISE) { 5577 5578 if (framebufs.nextPresent()) { 5579 framebufs.cur = framebufs.cur.next; 5580 5581 LOGF("send: next CONTINUATION frame, %d bytes", framebufs.cur.buf.length); 5582 5583 return ErrorCode.OK; 5584 } 5585 } 5586 5587 aob.reset(); 5588 5589 return ErrorCode.OK; 5590 5591 } 5592 5593 OutboundItem next_item; 5594 Stream stream; 5595 DataAuxData* aux_data = &item.aux_data.data; 5596 5597 /* On EOF, we have already detached data. Please note that 5598 application may issue submitData() in 5599 $(D Connector.onFrameSent) (call from afterFrameSent), 5600 which attach data to stream. We don't want to detach it. */ 5601 if (aux_data.eof) { 5602 aob.reset(); 5603 return ErrorCode.OK; 5604 } 5605 5606 aux_data.no_copy = false; 5607 5608 stream = getStream(frame.hd.stream_id); 5609 5610 /* If Session is closed or RST_STREAM was queued, we won't send further data. */ 5611 if (predicateDataSend(stream) != 0) { 5612 if (stream) 5613 stream.detachItem(this); 5614 aob.reset(); 5615 5616 return ErrorCode.OK; 5617 } 5618 5619 /* Assuming stream is not null */ 5620 assert(stream); 5621 next_item = getNextOutboundItem(); 5622 5623 /* Imagine we hit connection window size limit while sending DATA 5624 frame. If we decrement weight here, its stream might get 5625 inferior share because the other streams' weight is not 5626 decremented because of flow control. */ 5627 if (remote_window_size > 0 || stream.remoteWindowSize <= 0) { 5628 cycleWeightOutboundItem(aob.item, stream.effectiveWeight); 5629 } 5630 5631 /* If priority of this stream is higher or equal to other stream 5632 waiting at the top of the queue, we continue to send this 5633 data. */ 5634 if (stream.dpri == StreamDPRI.TOP && (!next_item || PriorityQueue.compare(item, next_item) < 0)) 5635 { 5636 int next_readmax = nextDataRead(stream); 5637 5638 if (next_readmax == 0) { 5639 5640 if (remote_window_size == 0 && stream.remoteWindowSize > 0) { 5641 5642 /* If DATA cannot be sent solely due to connection level 5643 window size, just push item to queue again. We never pop 5644 DATA item while connection level window size is 0. */ 5645 ob_da_pq.push(aob.item); 5646 5647 if (isFatal(rv)) { 5648 return rv; 5649 } 5650 5651 aob.item.queued = 1; 5652 } else 5653 stream.deferItem(StreamFlags.DEFERRED_FLOW_CONTROL, this); 5654 5655 aob.item = null; 5656 aob.reset(); 5657 5658 return ErrorCode.OK; 5659 } 5660 5661 framebufs.reset(); 5662 5663 rv = packData(framebufs, next_readmax, *frame, item.aux_data.data); 5664 if (isFatal(rv)) { 5665 return rv; 5666 } 5667 5668 if (rv == ErrorCode.DEFERRED) { 5669 stream.deferItem(StreamFlags.DEFERRED_USER, this); 5670 5671 aob.item = null; 5672 aob.reset(); 5673 5674 return ErrorCode.OK; 5675 } 5676 if (rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) 5677 { 5678 /* Stop DATA frame chain and issue RST_STREAM to close the stream. We don't return ErrorCode.TEMPORAL_CALLBACK_FAILURE intentionally. */ 5679 addRstStream(frame.hd.stream_id, FrameError.INTERNAL_ERROR); 5680 stream.detachItem(this); 5681 aob.reset(); 5682 return ErrorCode.OK; 5683 } 5684 5685 assert(rv == 0); 5686 5687 if (aux_data.no_copy) 5688 aob.state = OutboundState.SEND_NO_COPY; 5689 else 5690 aob.state = OutboundState.SEND_DATA; 5691 return ErrorCode.OK; 5692 } 5693 5694 if (stream.dpri == StreamDPRI.TOP) { 5695 ob_da_pq.push(aob.item); 5696 5697 aob.item.queued = true; 5698 } 5699 5700 aob.item = null; 5701 aob.reset(); 5702 return ErrorCode.OK; 5703 } 5704 5705 // fetch data and feed it to data_arr 5706 ErrorCode memSendInternal(ref ubyte[] data_arr, bool fast_cb) 5707 { 5708 ErrorCode rv; 5709 Buffers framebufs = aob.framebufs; 5710 5711 data_arr = null; 5712 5713 for (;;) { 5714 final switch (aob.state) { 5715 case OutboundState.POP_ITEM: { 5716 OutboundItem item; 5717 5718 item = popNextOutboundItem(); 5719 if (!item) { 5720 return ErrorCode.OK; 5721 } 5722 5723 if (item.frame.hd.type == FrameType.DATA || item.frame.hd.type == FrameType.HEADERS) { 5724 Frame* frame = &item.frame; 5725 Stream stream = getStream(frame.hd.stream_id); 5726 5727 if (stream && item == stream.item && stream.dpri != StreamDPRI.TOP) { 5728 // We have DATA with higher priority in queue within the same dependency tree. 5729 break; 5730 } 5731 } 5732 5733 rv = prepareFrame(item); 5734 if (rv == ErrorCode.DEFERRED) { 5735 LOGF("send: frame transmission deferred"); 5736 break; 5737 } 5738 5739 if (rv < 0) { 5740 int opened_stream_id; 5741 FrameError error_code = FrameError.INTERNAL_ERROR; 5742 import libhttp2.helpers : toString; 5743 LOGF("send: frame preparation failed with %s", toString(cast(ErrorCode)rv)); 5744 /* TODO: If the error comes from compressor, the connection must be closed. */ 5745 if (item.frame.hd.type != FrameType.DATA && !isFatal(rv)) { 5746 Frame* frame = &item.frame; 5747 /* The library is responsible for the transmission of WINDOW_UPDATE frame, so we don't call error callback for it. */ 5748 try if (frame.hd.type != FrameType.WINDOW_UPDATE && !connector.onFrameFailure(*frame, rv)) 5749 { 5750 item.free(); 5751 Mem.free(item); 5752 return ErrorCode.CALLBACK_FAILURE; 5753 } catch (Exception e) { 5754 item.free(); 5755 Mem.free(item); 5756 return ErrorCode.CALLBACK_FAILURE; 5757 } 5758 } 5759 5760 /* We have to close stream opened by failed request HEADERS or PUSH_PROMISE. */ 5761 switch (item.frame.hd.type) { 5762 case FrameType.HEADERS: 5763 if (item.frame.headers.cat == HeadersCategory.REQUEST) { 5764 opened_stream_id = item.frame.hd.stream_id; 5765 if (item.aux_data.headers.canceled) { 5766 error_code = item.aux_data.headers.error_code; 5767 } 5768 } 5769 break; 5770 case FrameType.PUSH_PROMISE: 5771 opened_stream_id = item.frame.push_promise.promised_stream_id; 5772 break; 5773 5774 default: break; 5775 } 5776 if (opened_stream_id) { 5777 /* careful not to override rv */ 5778 ErrorCode rv2; 5779 rv2 = closeStream(opened_stream_id, error_code); 5780 5781 if (isFatal(rv2)) { 5782 return rv2; 5783 } 5784 } 5785 5786 item.free(); 5787 Mem.free(item); 5788 aob.reset(); 5789 5790 if (rv == ErrorCode.HEADER_COMP) { 5791 /* If header compression error occurred, should terminiate connection. */ 5792 rv = terminateSession(FrameError.INTERNAL_ERROR); 5793 } 5794 if (isFatal(rv)) { 5795 return rv; 5796 } 5797 break; 5798 } 5799 5800 aob.item = item; 5801 5802 framebufs.rewind(); 5803 5804 if (item.frame.hd.type != FrameType.DATA) { 5805 Frame* frame = &item.frame; 5806 5807 LOGF("send: next frame: payloadlen=%d, type=%u, flags=0x%02x, stream_id=%d", 5808 frame.hd.length, frame.hd.type, frame.hd.flags, 5809 frame.hd.stream_id); 5810 5811 bool ok = callOnFrameReady(*frame); 5812 if (!ok) { 5813 return ErrorCode.CALLBACK_FAILURE; 5814 } 5815 } else { 5816 LOGF("send: next frame: DATA"); 5817 5818 if (item.aux_data.data.no_copy) 5819 { 5820 aob.state = OutboundState.SEND_NO_COPY; 5821 break; 5822 } 5823 } 5824 5825 LOGF("send: start transmitting frame type=%u, length=%d", 5826 framebufs.cur.buf.pos[3], 5827 framebufs.cur.buf.last - framebufs.cur.buf.pos); 5828 5829 aob.state = OutboundState.SEND_DATA; 5830 5831 break; 5832 } 5833 case OutboundState.SEND_DATA: { 5834 size_t datalen; 5835 Buffer* buf = &framebufs.cur.buf; 5836 5837 if (buf.pos is buf.last) { 5838 LOGF("send: end transmission of a frame"); 5839 5840 /* Frame has completely sent */ 5841 if (fast_cb) { 5842 rv = resetActiveOutboundItem(); 5843 } else { 5844 rv = afterFrameSent(); 5845 if (rv < 0) { 5846 /* FATAL */ 5847 assert(isFatal(rv)); 5848 return rv; 5849 } 5850 rv = resetActiveOutboundItem(); 5851 } 5852 if (rv < 0) { 5853 /* FATAL */ 5854 assert(isFatal(rv)); 5855 return rv; 5856 } 5857 /* We have already adjusted the next state */ 5858 break; 5859 } 5860 5861 datalen = buf.length; 5862 data_arr = buf.pos[0 .. datalen]; 5863 5864 /* We increment the offset here. If write() does not send everything, we will adjust it. */ 5865 buf.pos += datalen; 5866 5867 return ErrorCode.OK; 5868 } 5869 case OutboundState.SEND_NO_COPY: 5870 { 5871 LOGF("send: no copy DATA\n"); 5872 5873 Frame* frame = &aob.item.frame; 5874 Stream stream = getStream(frame.hd.stream_id); 5875 5876 5877 if (!stream) { 5878 LOGF("send: no copy DATA cancelled because stream was closed\n"); 5879 aob.reset(); 5880 break; 5881 } 5882 5883 rv = callWriteData(aob.item, framebufs); 5884 if (isFatal(rv)) { 5885 return rv; 5886 } 5887 5888 if (rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) { 5889 stream.detachItem(this); 5890 5891 addRstStream(frame.hd.stream_id, FrameError.INTERNAL_ERROR); 5892 5893 aob.reset(); 5894 5895 break; 5896 } 5897 5898 if (rv == ErrorCode.WOULDBLOCK) { 5899 return ErrorCode.OK; 5900 } 5901 5902 assert(rv == ErrorCode.OK); 5903 5904 rv = afterFrameSent(); 5905 if (rv < 0) { 5906 assert(isFatal(rv)); 5907 return rv; 5908 } 5909 rv = resetActiveOutboundItem(); 5910 if (rv < 0) { 5911 assert(isFatal(rv)); 5912 return rv; 5913 } 5914 5915 /* We have already adjusted the next state */ 5916 5917 break; 5918 } 5919 } 5920 } 5921 } 5922 5923 /* 5924 * Inflates header block in the memory pointed by |input| with |input.length| 5925 * bytes. If this function returns ErrorCode.PAUSE, the caller must 5926 * call this function again, until it returns 0 or one of negative 5927 * error code. If |call_header_cb| is zero, the on_header_callback 5928 * are not invoked and the function never return ErrorCode.PAUSE. If 5929 * the given |in| is the last chunk of header block, the |final| must 5930 * be nonzero. If header block is successfully processed (which is 5931 * indicated by the return value 0, ErrorCode.PAUSE or 5932 * ErrorCode.TEMPORAL_CALLBACK_FAILURE), the number of processed 5933 * input bytes is assigned to the |*readlen_ptr|. 5934 * 5935 * This function return 0 if it succeeds, or one of the negative error 5936 * codes: 5937 * 5938 * ErrorCode.CALLBACK_FAILURE 5939 * The callback function failed. 5940 * ErrorCode.TEMPORAL_CALLBACK_FAILURE 5941 * The callback returns this error code, indicating that this 5942 * stream should be RST_STREAMed.. 5943 * ErrorCode.PAUSE 5944 * The callback function returned ErrorCode.PAUSE 5945 * ErrorCode.HEADER_COMP 5946 * Header decompression failed 5947 */ 5948 ErrorCode inflateHeaderBlock(Frame frame, ref size_t readlen_ref, ubyte[] input, bool is_final, bool call_header_cb) 5949 { 5950 int proclen; 5951 ErrorCode rv; 5952 InflateFlag inflate_flag; 5953 HeaderField hf; 5954 Stream stream; 5955 Stream subject_stream; 5956 bool trailer; 5957 5958 readlen_ref = 0; 5959 stream = getStream(frame.hd.stream_id); 5960 5961 if (frame.hd.type == FrameType.PUSH_PROMISE) { 5962 subject_stream = getStream(frame.push_promise.promised_stream_id); 5963 } else { 5964 subject_stream = stream; 5965 trailer = isTrailerHeaders(stream, frame); 5966 } 5967 5968 LOGF("recv: decoding header block %d bytes", input.length); 5969 size_t inlen = input.length; 5970 ubyte* inptr = input.ptr; 5971 for (;;) { 5972 inflate_flag = InflateFlag.NONE; 5973 proclen = hd_inflater.inflate(hf, inflate_flag, inptr[0 .. inlen], is_final); 5974 5975 if (isFatal(cast(int)proclen)) { 5976 return cast(ErrorCode)proclen; 5977 } 5978 5979 if (proclen < 0) { 5980 if (iframe.state == InboundState.READ_HEADER_BLOCK) 5981 { 5982 if (stream && stream.state != StreamState.CLOSING) 5983 { 5984 /* Adding RST_STREAM here is very important. It prevents 5985 from invoking subsequent callbacks for the same stream ID. */ 5986 addRstStream(frame.hd.stream_id, FrameError.COMPRESSION_ERROR); 5987 5988 } 5989 } 5990 rv = terminateSession(FrameError.COMPRESSION_ERROR); 5991 if (isFatal(rv)) { 5992 return rv; 5993 } 5994 5995 return ErrorCode.HEADER_COMP; 5996 } 5997 5998 inptr += proclen; 5999 inlen -= proclen; 6000 readlen_ref += proclen; 6001 6002 LOGF("recv: proclen=%d", proclen); 6003 6004 if (call_header_cb && (inflate_flag & InflateFlag.EMIT)) { 6005 rv = ErrorCode.OK; 6006 if (subject_stream && isHTTPMessagingEnabled()) { 6007 rv = validateHeaderField(subject_stream, frame, hf, trailer); 6008 if (rv == ErrorCode.HTTP_HEADER) { 6009 LOGF("recv: HTTP error: type=%d, id=%d, header %.*s: %.*s", 6010 frame.hd.type, subject_stream.id, cast(int)hf.name.length, 6011 hf.name, cast(int)hf.value.length, hf.value); 6012 6013 handleInvalidStream2(subject_stream.id, frame, FrameError.PROTOCOL_ERROR); 6014 return ErrorCode.TEMPORAL_CALLBACK_FAILURE; 6015 } 6016 else if (rv == ErrorCode.IGN_HTTP_HEADER) { 6017 /* header is ignored */ 6018 LOGF("recv: HTTP ignored: type=%d, id=%d, header %s: %s", frame.hd.type, subject_stream.id, hf.name, hf.value); 6019 } 6020 6021 } 6022 6023 if (rv == ErrorCode.OK) { 6024 bool pause; 6025 bool close; 6026 bool ok = callOnHeaderField(frame, hf, pause, close); 6027 if (!ok) 6028 return ErrorCode.CALLBACK_FAILURE; 6029 if (close) 6030 return ErrorCode.TEMPORAL_CALLBACK_FAILURE; 6031 if (pause) 6032 return ErrorCode.PAUSE; 6033 } 6034 } 6035 6036 if (inflate_flag & InflateFlag.FINAL) { 6037 hd_inflater.endHeaders(); 6038 break; 6039 } 6040 if ((inflate_flag & InflateFlag.EMIT) == 0 && inlen == 0) { 6041 break; 6042 } 6043 } 6044 return ErrorCode.OK; 6045 } 6046 6047 package: /* Used only for tests */ 6048 /* 6049 * Returns top of outbound frame queue. This function returns null if 6050 * queue is empty. 6051 */ 6052 @property OutboundItem ob_pq_top() { 6053 return ob_pq.top; 6054 } 6055 6056 package: 6057 HashMap!(int, Stream) streams; 6058 6059 StreamRoots roots; 6060 6061 /// Priority Queue for outbound frames other than stream-starting HEADERS and DATA 6062 PriorityQueue ob_pq; 6063 6064 /// Priority Queue for outbound stream-starting HEADERS frame 6065 PriorityQueue ob_ss_pq; 6066 6067 /// Priority Queue for DATA frame 6068 PriorityQueue ob_da_pq; 6069 6070 ActiveOutboundItem aob; 6071 InboundFrame iframe; 6072 Deflater hd_deflater; 6073 Inflater hd_inflater; 6074 Connector connector; 6075 6076 /// Sequence number of outbound frame to maintain the order of enqueue if priority is equal. 6077 long next_seq; 6078 6079 /** Reset count of OutboundItem's weight. We decrements 6080 weight each time DATA is sent to simulate resource sharing. We 6081 use priority queue and larger weight has the precedence. If 6082 weight is reached to lowest weight, it resets to its initial 6083 weight. If this happens, other items which have the lower weight 6084 currently but same initial weight cannot send DATA until item 6085 having large weight is decreased. To avoid this, we use this 6086 cycle variable. Initally, this is set to 1. If weight gets 6087 lowest weight, and if item's cycle == last_cycle, we increments 6088 last_cycle and assigns it to item's cycle. Otherwise, just 6089 assign last_cycle. In priority queue comparator, we first 6090 compare items' cycle value. Lower cycle value has the 6091 precedence. */ 6092 ulong last_cycle = 1; 6093 6094 /// Points to the latest closed stream. null if there is no closed stream. 6095 /// Notes: Only used when session is initialized as server. 6096 Stream closed_stream_head; 6097 6098 /// Points to the oldest closed stream. null if there is no closed stream. 6099 /// Notes: Only used when session is initialized as server. 6100 Stream closed_stream_tail; 6101 6102 /// Points to the latest idle stream. null if there is no idle stream. 6103 /// Notes: Only used when session is initialized as server . 6104 Stream idle_stream_head; 6105 6106 /// Points to the oldest idle stream. null if there is no idle stream. 6107 /// Notes: Only used when session is initialized as server. 6108 Stream idle_stream_tail; 6109 6110 /// In-flight SETTINGS values. null for no in-flight SETTINGS. 6111 Setting[] inflight_iva; 6112 6113 /// The number of outgoing streams. This will be capped by remote_settings.max_concurrent_streams. 6114 size_t num_outgoing_streams; 6115 6116 /// The number of incoming streams. This will be capped by local_settings.max_concurrent_streams. 6117 size_t num_incoming_streams; 6118 6119 /// The number of closed streams still kept in |streams| hash. The closed streams can be accessed 6120 /// through single linked list |closed_stream_head|. 6121 /// Notes: The current implementation only keeps incoming streams if session is initialized as server. 6122 size_t num_closed_streams; 6123 6124 /// The number of idle streams kept in |streams| hash. The idle streams can be accessed through doubly linked list 6125 /// |idle_stream_head|. 6126 /// Notes: The current implementation only keeps idle streams if session is initialized as server. 6127 size_t num_idle_streams; 6128 6129 /// Next Stream ID. Made unsigned int to detect >= (1 << 31). 6130 uint next_stream_id; 6131 6132 /// The largest stream ID received so far 6133 int last_recv_stream_id; 6134 6135 /// The largest stream ID which has been processed in some way. 6136 /// Notes: This value will be used as last-stream-id when sending GOAWAY frame. 6137 int last_proc_stream_id; 6138 6139 /// Counter of unique ID of PING. Wraps when it exceeds max_UNIQUE_ID */ 6140 uint next_unique_id; 6141 6142 /// This is the last-stream-ID we have sent in GOAWAY 6143 int local_last_stream_id = (1u << 31) - 1; 6144 6145 /// This is the value in GOAWAY frame received from remote endpoint. 6146 int remote_last_stream_id = (1u << 31) - 1; 6147 6148 /// Current sender window size. This value is computed against the current initial window size of remote endpoint. 6149 int remote_window_size = INITIAL_CONNECTION_WINDOW_SIZE; 6150 6151 /// Keep track of the number of bytes received without WINDOW_UPDATE. This could be negative after 6152 /// submitting negative value to WINDOW_UPDATE. 6153 int recv_window_size; 6154 6155 /// The number of bytes consumed by the application and now is subject to WINDOW_UPDATE. 6156 /// Notes: This is only used when auto WINDOW_UPDATE is turned off. 6157 int consumed_size; 6158 6159 /// The amount of recv_window_size cut using submitting negative value to WINDOW_UPDATE 6160 int recv_reduction; 6161 6162 /// window size for local flow control. It is initially set to INITIAL_CONNECTION_WINDOW_SIZE and could be 6163 /// increased/decreased by submitting WINDOW_UPDATE. See submitWindowUpdate(). 6164 int local_window_size = INITIAL_CONNECTION_WINDOW_SIZE; 6165 6166 /// Settings value received from the remote endpoint. We just use ID as index. The index = 0 is unused. 6167 SettingsStorage remote_settings; 6168 6169 /// Settings value of the local endpoint. 6170 SettingsStorage local_settings; 6171 6172 /// Option flags. This is bitwise-OR of 0 or more of OptionsMask. 6173 OptionsMask opt_flags; 6174 6175 /// Unacked local Setting.MAX_CONCURRENT_STREAMS value. We use this to refuse the incoming stream if it exceeds this value. 6176 uint pending_local_max_concurrent_stream = INITIAL_MAX_CONCURRENT_STREAMS; 6177 6178 /// Unacked local ENABLE_PUSH value. We use this to refuse PUSH_PROMISE before SETTINGS ACK is received. 6179 bool pending_enable_push = true; 6180 6181 /// true if the session is server side. 6182 bool is_server; 6183 6184 /// Flags indicating GOAWAY is sent and/or recieved. 6185 GoAwayFlags goaway_flags = GoAwayFlags.NONE; 6186 6187 6188 } 6189 /** 6190 * @function 6191 * 6192 * Serializes the SETTINGS values |iv| in the |buf|. The size of the 6193 * |buf| is specified by |buflen|. The number of entries in the |iv| 6194 * array is given by |niv|. The required space in |buf| for the |niv| 6195 * entries is `8*niv` bytes and if the given buffer is too small, an 6196 * error is returned. This function is used mainly for creating a 6197 * SETTINGS payload to be sent with the `HTTP2-Settings` header 6198 * field in an HTTP Upgrade request. The data written in |buf| is NOT 6199 * base64url encoded and the application is responsible for encoding. 6200 * 6201 * This function returns the number of bytes written in |buf|, or one 6202 * of the following negative error codes: 6203 * 6204 * $(D ErrorCode.INVALID_ARGUMENT) 6205 * The |iv| contains duplicate settings ID or invalid value. 6206 * 6207 * $(D ErrorCode.INSUFF_BUFSIZE) 6208 * The provided |buflen| size is too small to hold the output. 6209 */ 6210 int packSettingsPayload(ubyte[] buf, in Setting[] iva) 6211 { 6212 if (!iva.check()) { 6213 return ErrorCode.INVALID_ARGUMENT; 6214 } 6215 6216 if (buf.length < (iva.length * FRAME_SETTINGS_ENTRY_LENGTH)) { 6217 return ErrorCode.INSUFF_BUFSIZE; 6218 } 6219 6220 return Settings.pack(buf, iva); 6221 } 6222 6223 /** 6224 * Submits HEADERS frame and optionally one or more DATA frames. 6225 * 6226 * The |pri_spec| is priority specification of this request. 6227 * To specify the priority, use `PrioritySpec()`. 6228 * 6229 * The `pri_spec.weight` must be in [$(D MIN_WEIGHT), 6230 * $(D MAX_WEIGHT)], inclusive. If `pri_spec.weight` is 6231 * strictly less than $(D MIN_WEIGHT), it becomes 6232 * $(D MIN_WEIGHT). If it is strictly greater than 6233 * $(D MAX_WEIGHT), it becomes $(D MAX_WEIGHT). 6234 * 6235 * The |hfa| is an array of header fields $(D HeaderField) with 6236 * |hfa.length| elements. The application is responsible to include 6237 * required pseudo-header fields (header field whose name starts with 6238 * ":") in |hfa| and must place pseudo-headers before regular header 6239 * fields. 6240 * 6241 * This function creates copies of all header fields in |hfa|. It 6242 * also lower-cases all names in |hfa|. The order of elements in 6243 * |hfa| is preserved. 6244 * 6245 * HTTP/2 specification has requirement about header fields in the 6246 * request HEADERS. See the specification for more details. 6247 * 6248 * If |data_prd| is not `null`, it provides data which will be sent 6249 * in subsequent DATA frames. In this case, a method that allows 6250 * request message bodies 6251 * (http://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html#sec9) must 6252 * be specified with `:method` key in |hfa| (e.g. `POST`). This 6253 * function does not take ownership of the |data_prd|. The function 6254 * copies the members of the |data_prd|. If |data_prd| is `null`, 6255 * HEADERS have END_STREAM set. The |stream_user_data| is data 6256 * associated to the stream opened by this request and can be an 6257 * arbitrary pointer, which can be retrieved later by 6258 * `getStreamUserData()`. 6259 * 6260 * This function returns assigned stream ID if it succeeds, or one of 6261 * the following negative error codes: 6262 * 6263 * $(D ErrorCode.STREAM_ID_NOT_AVAILABLE) 6264 * No stream ID is available because maximum stream ID was 6265 * reached. 6266 * 6267 * .. warning:: 6268 * 6269 * This function returns assigned stream ID if it succeeds. But 6270 * that stream is not opened yet. The application must not submit 6271 * a frame to that stream ID before $(D Connector.onFrameReady) is called for this 6272 * frame. 6273 * 6274 */ 6275 int submitRequest(Session session, in PrioritySpec pri_spec, in HeaderField[] hfa, in DataProvider data_prd, void* stream_user_data = null) 6276 { 6277 FrameFlags flags = setRequestFlags(pri_spec, data_prd); 6278 6279 return submitHeadersSharedHfa(session, flags, -1, pri_spec, hfa, data_prd, stream_user_data, false); 6280 } 6281 6282 /** 6283 * Submits response HEADERS frame and optionally one or more DATA 6284 * frames against the stream |stream_id|. 6285 * 6286 * The |hfa| is an array of $(D HeaderField) with 6287 * |hfa.length| elements. The application is responsible to include 6288 * required pseudo-header fields (header field whose name starts with 6289 * ":") in |hfa| and must place pseudo-headers before regular header 6290 * fields. 6291 * 6292 * This function creates copies of all header fields in |hfa|. It 6293 * also lower-cases all names in |hfa|. The order of elements in 6294 * |hfa| is preserved. 6295 * 6296 * HTTP/2 specification has requirement about header fields in the 6297 * response HEADERS. See the specification for more details. 6298 * 6299 * If |data_prd| is not `null`, it provides data which will be sent 6300 * in subsequent DATA frames. If |data_prd| is `null`, HEADERS will have 6301 * END_STREAM flag set. 6302 * 6303 * This method can be used as normal HTTP response and push response. 6304 * When pushing a resource using this function, the $(D Session) must be 6305 * configured using `new Session()` or its variants and 6306 * the target stream denoted by the |stream_id| must be reserved using 6307 * `submitPushPromise()`. 6308 * 6309 * To send non-final response headers (e.g., HTTP status 101), don't 6310 * use this function because this function half-closes the outbound 6311 * stream. Instead, use `submitHeaders()` for this purpose. 6312 * 6313 * This function returns 0 if it succeeds, or one of the following 6314 * negative error codes: 6315 * 6316 * $(D ErrorCode.INVALID_ARGUMENT) 6317 * The |stream_id| is 0. 6318 * 6319 * .. warning:: 6320 * 6321 * Calling this function twice for the same stream ID may lead to 6322 * program crash. It is generally considered to a programming error 6323 * to commit response twice. 6324 */ 6325 ErrorCode submitResponse(Session session, int stream_id, in HeaderField[] hfa, in DataProvider data_prd) 6326 { 6327 FrameFlags flags = setResponseFlags(data_prd); 6328 return cast(ErrorCode)submitHeadersSharedHfa(session, flags, stream_id, PrioritySpec.init, hfa, data_prd, null, true); 6329 } 6330 6331 /** 6332 * Submits HEADERS frame. The |flags| is bitwise OR of the 6333 * following values: 6334 * 6335 * * $(D FrameFlags.END_STREAM) 6336 * 6337 * If |flags| includes $(D FrameFlags.END_STREAM), this frame has 6338 * END_STREAM flag set. 6339 * 6340 * The library handles the CONTINUATION frame internally and it 6341 * correctly sets END_HEADERS to the last sequence of the PUSH_PROMISE 6342 * or CONTINUATION frame. 6343 * 6344 * If the |stream_id| is -1, this frame is assumed as request (i.e., 6345 * request HEADERS frame which opens new stream). In this case, the 6346 * assigned stream ID will be returned. Otherwise, specify stream ID 6347 * in |stream_id|. 6348 * 6349 * The |pri_spec| is priority specification of this request. init 6350 * means the default priority. To specify the priority, 6351 * use $(D PrioritySpec) constructor. 6352 * 6353 * The `pri_spec.weight` must be in [$(D MIN_WEIGHT), 6354 * $(D MAX_WEIGHT)], inclusive. If `pri_spec.weight` is 6355 * strictly less than $(D MIN_WEIGHT), it becomes 6356 * $(D MIN_WEIGHT). If it is strictly greater than 6357 * $(D MAX_WEIGHT), it becomes $(D MAX_WEIGHT). 6358 * 6359 * The |hfa| is an array of header fields $(D HeaderField) with 6360 * |hfa.length| elements. The application is responsible to include 6361 * required pseudo-header fields (header field whose name starts with 6362 * ":") in |hfa| and must place pseudo-headers before regular header 6363 * fields. 6364 * 6365 * This function creates copies of all header fields in |hfa|. It 6366 * also lower-cases all names in |hfa|. The order of elements in 6367 * |hfa| is preserved. 6368 * 6369 * The |stream_user_data| is a pointer to an arbitrary data which is 6370 * associated to the stream this frame will open. Therefore it is 6371 * only used if this frame opens streams, in other words, it changes 6372 * stream state from idle or reserved to open. 6373 * 6374 * This function is low-level in a sense that the application code can 6375 * specify flags directly. For usual HTTP request, 6376 * `submitRequest()` is useful. 6377 * 6378 * This function returns newly assigned stream ID if it succeeds and 6379 * |stream_id| is -1. Otherwise, this function returns 0 if it 6380 * succeeds, or one of the following negative error codes: 6381 * 6382 * $(D ErrorCode.STREAM_ID_NOT_AVAILABLE) 6383 * No stream ID is available because maximum stream ID was 6384 * reached. 6385 * $(D ErrorCode.INVALID_ARGUMENT) 6386 * The |stream_id| is 0. 6387 * 6388 * .. warning:: 6389 * 6390 * This function returns assigned stream ID if it succeeds and 6391 * |stream_id| is -1. But that stream is not opened yet. The 6392 * application must not submit frame to that stream ID before 6393 * $(D Connector.onFrameHeader) is called for this 6394 * frame. 6395 * 6396 */ 6397 int submitHeaders(Session session, FrameFlags flags, int stream_id = -1, in PrioritySpec pri_spec = PrioritySpec.init, in HeaderField[] hfa = null, void *stream_user_data = null) 6398 { 6399 flags &= FrameFlags.END_STREAM; 6400 6401 if (pri_spec != PrioritySpec.init) 6402 flags |= FrameFlags.PRIORITY; 6403 6404 return submitHeadersSharedHfa(session, flags, stream_id, pri_spec, hfa, DataProvider.init, stream_user_data, false); 6405 } 6406 6407 /** 6408 * Submits one or more DATA frames to the stream |stream_id|. The 6409 * data to be sent are provided by |data_prd|. If |flags| contains 6410 * $(D FrameFlags.END_STREAM), the last DATA frame has END_STREAM 6411 * flag set. 6412 * 6413 * This function does not take ownership of the |data_prd|. The 6414 * function copies the members of the |data_prd|. 6415 * 6416 * This function returns 0 if it succeeds, or one of the following 6417 * negative error codes: 6418 * 6419 * $(D ErrorCode.DATA_EXIST) 6420 * DATA has been already submitted and not fully processed yet. 6421 * $(D ErrorCode.INVALID_ARGUMENT) 6422 * The |stream_id| is 0. 6423 * $(D ErrorCode.STREAM_CLOSED) 6424 * The stream was alreay closed; or the |stream_id| is invalid. 6425 * 6426 * .. note:: 6427 * 6428 * Currently, only one data is allowed for a stream at a time. 6429 * Submitting data more than once before first data is finished 6430 * results in $(D ErrorCode.DATA_EXIST) error code. The 6431 * earliest callback which tells that previous data is done is 6432 * $(D Connector.onFrameSent). In side that callback, 6433 * new data can be submitted using `submitData()`. Of 6434 * course, all data except for last one must not have 6435 * $(D FrameFlags.END_STREAM) flag set in |flags|. 6436 */ 6437 ErrorCode submitData(Session session, FrameFlags flags, int stream_id, in DataProvider data_prd) 6438 { 6439 OutboundItem item; 6440 Frame* frame; 6441 DataAuxData* aux_data; 6442 DataFlags nflags = cast(DataFlags)(flags & FrameFlags.END_STREAM); 6443 6444 if (stream_id == 0) 6445 return ErrorCode.INVALID_ARGUMENT; 6446 6447 item = Mem.alloc!OutboundItem(session); 6448 scope(failure) Mem.free(item); 6449 6450 frame = &item.frame; 6451 aux_data = &item.aux_data.data; 6452 aux_data.data_prd = data_prd; 6453 aux_data.eof = false; 6454 aux_data.flags = nflags; 6455 6456 /* flags are sent on transmission */ 6457 frame.data = Data(FrameFlags.NONE, stream_id); 6458 scope(failure) frame.data.free(); 6459 ErrorCode rv = session.addItem(item); 6460 if (rv != 0) { 6461 frame.data.free(); 6462 Mem.free(item); 6463 return rv; 6464 } 6465 return ErrorCode.OK; 6466 } 6467 6468 /** 6469 * Submits PRIORITY frame to change the priority of stream |stream_id| 6470 * to the priority specification |pri_spec|. 6471 * 6472 * 6473 * The |pri_spec| is priority specification of this request. `null` 6474 * is not allowed for this function. To specify the priority, use 6475 * `PrioritySpec.init`. This function will copy its data 6476 * members. 6477 * 6478 * The `pri_spec.weight` must be in [$(D MIN_WEIGHT), 6479 * $(D MAX_WEIGHT)], inclusive. If `pri_spec.weight` is 6480 * strictly less than $(D MIN_WEIGHT), it becomes 6481 * $(D MIN_WEIGHT). If it is strictly greater than 6482 * $(D MAX_WEIGHT), it becomes $(D MAX_WEIGHT). 6483 * 6484 * This function returns 0 if it succeeds, or one of the following 6485 * negative error codes: 6486 * 6487 * $(D ErrorCode.INVALID_ARGUMENT) 6488 * The |stream_id| is 0; or the |pri_spec| is null; or trying to 6489 * depend on itself. 6490 */ 6491 ErrorCode submitPriority(Session session, int stream_id, in PrioritySpec pri_spec) 6492 { 6493 OutboundItem item; 6494 Frame* frame; 6495 PrioritySpec copy_pri_spec; 6496 6497 if (stream_id == 0 || pri_spec == PrioritySpec.init) 6498 return ErrorCode.INVALID_ARGUMENT; 6499 6500 if (stream_id == pri_spec.stream_id) 6501 return ErrorCode.INVALID_ARGUMENT; 6502 6503 copy_pri_spec = pri_spec; 6504 6505 copy_pri_spec.adjustWeight(); 6506 6507 item = Mem.alloc!OutboundItem(session); 6508 scope(failure) Mem.free(item); 6509 frame = &item.frame; 6510 6511 frame.priority = Priority(stream_id, copy_pri_spec); 6512 scope(failure) frame.priority.free(); 6513 6514 session.addItem(item); 6515 return ErrorCode.OK; 6516 } 6517 6518 6519 /** 6520 * @function 6521 * 6522 * Submits RST_STREAM frame to cancel/reject the stream |stream_id| 6523 * with the error code |error_code|. 6524 * 6525 * The pre-defined error code is one of $(D FrameError). 6526 * 6527 * This function returns 0 if it succeeds, or one of the following 6528 * negative error codes: 6529 * 6530 * $(D ErrorCode.NOMEM) 6531 * Out of memory. 6532 * $(D ErrorCode.INVALID_ARGUMENT) 6533 * The |stream_id| is 0. 6534 */ 6535 ErrorCode submitRstStream(Session session, int stream_id, FrameError error_code) 6536 { 6537 if (stream_id == 0) 6538 return ErrorCode.INVALID_ARGUMENT; 6539 6540 session.addRstStream(stream_id, error_code); 6541 return ErrorCode.OK; 6542 } 6543 6544 /** 6545 * @function 6546 * 6547 * Stores local settings and submits SETTINGS frame. The |iva| is the 6548 * pointer to the array of $(D Setting). The |iv.length| 6549 * indicates the number of settings. 6550 * 6551 * This function does not take ownership of the |iva|. This function 6552 * copies all the elements in the |iva|. 6553 * 6554 * While updating individual stream's local window size, if the window 6555 * size becomes strictly larger than max_WINDOW_SIZE, 6556 * RST_STREAM is issued against such a stream. 6557 * 6558 * SETTINGS with $(D FrameFlags.ACK) is automatically submitted 6559 * by the library and application could not send it at its will. 6560 * 6561 * This function returns 0 if it succeeds, or one of the following 6562 * negative error codes: 6563 * 6564 * $(D ErrorCode.INVALID_ARGUMENT) 6565 * The |iv| contains invalid value (e.g., initial window size 6566 * strictly greater than (1 << 31) - 1. 6567 * $(D ErrorCode.TOO_MANY_INFLIGHT_SETTINGS) 6568 * There is already another in-flight SETTINGS. Note that the 6569 * current implementation only allows 1 in-flight SETTINGS frame 6570 * without ACK flag set. 6571 * $(D ErrorCode.NOMEM) 6572 * Out of memory. 6573 */ 6574 ErrorCode submitSettings(Session session, in Setting[] iva) 6575 { 6576 return session.addSettings(FrameFlags.NONE, iva); 6577 } 6578 6579 /** 6580 * Submits PUSH_PROMISE frame. 6581 * 6582 * The |stream_id| must be client initiated stream ID. 6583 * 6584 * The |hfa| is an array of $(D HeaderField) with 6585 * |hfa.length| elements. The application is responsible to include 6586 * required pseudo-header fields (header field whose name starts with 6587 * ":") in |hfa| and must place pseudo-headers before regular header 6588 * fields. 6589 * 6590 * This function creates copies of all header fieldss in |hfa|. It 6591 * also lower-cases all names in |hfa|. The order of elements in 6592 * |hfa| is preserved. 6593 * 6594 * The |promised_stream_user_data| is a pointer to an arbitrary data 6595 * which is associated to the promised stream this frame will open and 6596 * make it in reserved state. It is available using $(D Session.getStreamUserData). 6597 * The application can access it in $(D Connector.onFrameHeader) and 6598 * $(D Connector.onFrameSent) of this frame. 6599 * 6600 * The client side is not allowed to use this function. 6601 * 6602 * To submit response headers and data, use 6603 * `submitResponse()`. 6604 * 6605 * This function returns assigned promised stream ID if it succeeds, 6606 * or one of the following negative error codes: 6607 * 6608 * $(D ErrorCode.PROTO) 6609 * This function was invoked when $(D Session) is initialized as 6610 * client. 6611 * $(D ErrorCode.STREAM_ID_NOT_AVAILABLE) 6612 * No stream ID is available because maximum stream ID was 6613 * reached. 6614 * $(D ErrorCode.INVALID_ARGUMENT) 6615 * The |stream_id| is 0; The |stream_id| does not designate stream 6616 * that peer initiated. 6617 * 6618 * .. warning:: 6619 * 6620 * This function returns assigned promised stream ID if it succeeds. 6621 * But that stream is not opened yet. The application must not 6622 * submit frame to that stream ID before 6623 * $(D Connector.onFrameHeader) is called for this 6624 * frame. 6625 * 6626 */ 6627 int submitPushPromise(Session session, int stream_id, in HeaderField[] hfa, void* promised_stream_user_data) 6628 { 6629 OutboundItem item; 6630 Frame* frame; 6631 HeaderField[] hfa_copy; 6632 FrameFlags flags_copy; 6633 int promised_stream_id; 6634 6635 if (stream_id == 0 || session.isMyStreamId(stream_id)) { 6636 return ErrorCode.INVALID_ARGUMENT; 6637 } 6638 6639 if (!session.is_server) 6640 return ErrorCode.PROTO; 6641 6642 /* All 32bit signed stream IDs are spent. */ 6643 if (session.next_stream_id > int.max) { 6644 return ErrorCode.STREAM_ID_NOT_AVAILABLE; 6645 } 6646 6647 item = Mem.alloc!OutboundItem(session); 6648 scope(failure) 6649 Mem.free(item); 6650 6651 item.aux_data.headers.stream_user_data = promised_stream_user_data; 6652 6653 frame = &item.frame; 6654 bool is_owner; 6655 hfa_copy = hfa.copy(); 6656 is_owner = true; 6657 scope(failure) if (is_owner) Mem.free(hfa_copy); 6658 flags_copy = FrameFlags.END_HEADERS; 6659 6660 promised_stream_id = session.next_stream_id; 6661 session.next_stream_id += 2; 6662 6663 is_owner = false; 6664 frame.push_promise = PushPromise(flags_copy, stream_id, promised_stream_id, hfa_copy); 6665 scope(failure) frame.push_promise.free(); 6666 6667 session.addItem(item); 6668 6669 return promised_stream_id; 6670 } 6671 6672 /** 6673 * Submits PING frame. You don't have to send PING back when you 6674 * received PING frame. The library automatically submits PING frame 6675 * in this case. 6676 * 6677 * 6678 * If the |opaque_data| is non `null`, then it should point to the 8 6679 * bytes array of memory to specify opaque data to send with PING 6680 * frame. If the |opaque_data| is `null`, zero-cleared 8 bytes will 6681 * be sent as opaque data. 6682 */ 6683 void submitPing(Session session, in ubyte[] opaque_data) 6684 { 6685 return session.addPing(FrameFlags.NONE, opaque_data); 6686 } 6687 6688 /** 6689 * @function 6690 * 6691 * Submits GOAWAY frame with the last stream ID |last_stream_id| and 6692 * the error code |error_code|. 6693 * 6694 * The pre-defined error code is one of $(D FrameError). 6695 * 6696 * The |flags| is currently ignored and should be 6697 * $(D FrameFlags.NONE). 6698 * 6699 * The |last_stream_id| is peer's stream ID or 0. So if $(D Session) is 6700 * initialized as client, |last_stream_id| must be even or 0. If 6701 * $(D Session) is initialized as server, |last_stream_id| must be odd or 6702 * 0. 6703 * 6704 * The HTTP/2 specification says last_stream_id must not be increased 6705 * from the value previously sent. So the actual value sent as 6706 * last_stream_id is the minimum value between the given 6707 * |last_stream_id| and the last_stream_id previously sent to the 6708 * peer. 6709 * 6710 * If the |opaque_data| is not `null` and |opaque_data_len| is not 6711 * zero, those data will be sent as additional debug data. The 6712 * library makes a copy of the memory region pointed by |opaque_data| 6713 * with the length |opaque_data_len|, so the caller does not need to 6714 * keep this memory after the return of this function. If the 6715 * |opaque_data_len| is 0, the |opaque_data| could be `null`. 6716 * 6717 * After successful transmission of GOAWAY, following things happen. 6718 * All incoming streams having strictly more than |last_stream_id| are 6719 * closed. All incoming HEADERS which starts new stream are simply 6720 * ignored. After all active streams are handled, both 6721 * `wantRead()` and `wantWrite()` return 0 and the application can close session. 6722 * 6723 * This function returns 0 if it succeeds, or one of the following 6724 * negative error codes: 6725 * 6726 * $(D ErrorCode.INVALID_ARGUMENT) 6727 * The |opaque_data.length| is too large; the |last_stream_id| is invalid. 6728 */ 6729 ErrorCode submitGoAway(Session session, int last_stream_id, FrameError error_code, in string opaque_data) 6730 { 6731 if (session.goaway_flags & GoAwayFlags.TERM_ON_SEND) { 6732 return ErrorCode.OK; 6733 } 6734 return session.addGoAway(last_stream_id, error_code, opaque_data, GoAwayAuxFlags.NONE); 6735 } 6736 6737 /** 6738 * Submits WINDOW_UPDATE frame. 6739 * 6740 * The |flags| is currently ignored and should be 6741 * $(D FrameFlags.NONE). 6742 * 6743 * If the |window_size_increment| is positive, the WINDOW_UPDATE with 6744 * that value as window_size_increment is queued. If the 6745 * |window_size_increment| is larger than the received bytes from the 6746 * remote endpoint, the local window size is increased by that 6747 * difference. 6748 * 6749 * If the |window_size_increment| is negative, the local window size 6750 * is decreased by -|window_size_increment|. If automatic 6751 * WINDOW_UPDATE is enabled 6752 * $(D Options.setNoAutoWindowUpdate), and the library 6753 * decided that the WINDOW_UPDATE should be submitted, then 6754 * WINDOW_UPDATE is queued with the current received bytes count. 6755 * 6756 * If the |window_size_increment| is 0, the function does nothing and 6757 * returns 0. 6758 * 6759 * This function returns 0 if it succeeds, or one of the following 6760 * negative error codes: 6761 * 6762 * $(D ErrorCode.FLOW_CONTROL) 6763 * The local window size overflow or gets negative. 6764 */ 6765 ErrorCode submitWindowUpdate(Session session, int stream_id, int window_size_increment) 6766 { 6767 ErrorCode rv; 6768 Stream stream; 6769 if (window_size_increment == 0) { 6770 return ErrorCode.OK; 6771 } 6772 FrameFlags flags; 6773 if (stream_id == 0) { 6774 rv = adjustLocalWindowSize(session.local_window_size, session.recv_window_size, session.recv_reduction, window_size_increment); 6775 if (rv != ErrorCode.OK) { 6776 return rv; 6777 } 6778 } else { 6779 stream = session.getStream(stream_id); 6780 if (!stream) { 6781 return ErrorCode.OK; 6782 } 6783 6784 rv = adjustLocalWindowSize(stream.localWindowSize, stream.recvWindowSize, stream.recvReduction, window_size_increment); 6785 if (rv != ErrorCode.OK) { 6786 return rv; 6787 } 6788 } 6789 6790 if (window_size_increment > 0) { 6791 if (stream_id == 0) { 6792 session.consumed_size = max(0, session.consumed_size - window_size_increment); 6793 } else { 6794 stream.consumedSize = max(0, stream.consumedSize - window_size_increment); 6795 } 6796 6797 session.addWindowUpdate(flags, stream_id, window_size_increment); 6798 } 6799 return ErrorCode.OK; 6800 } 6801 6802 6803 /** 6804 * Signals to the client that the server started graceful shutdown 6805 * procedure. 6806 * 6807 * This function is only usable for server. If this function is 6808 * called with client side session, this function returns 6809 * $(D ErrorCode.INVALID_STATE). 6810 * 6811 * To gracefully shutdown HTTP/2 session, server should call this 6812 * function to send GOAWAY with last_stream_id (1u << 31) - 1. And 6813 * after some delay (e.g., 1 RTT), send another GOAWAY with the stream 6814 * ID that the server has some processing using 6815 * `submitGoAway()`. See also `getLastProcStreamID()`. 6816 * 6817 * Unlike `submitGoAway()`, this function just sends GOAWAY 6818 * and does nothing more. This is a mere indication to the client 6819 * that session shutdown is imminent. The application should call 6820 * `submitGoAway()` with appropriate last_stream_id after 6821 * this call. 6822 * 6823 * If one or more GOAWAY frame have been already sent by either 6824 * `submitGoAway()` or `terminateSession()`, this function has no effect. 6825 * 6826 * This function returns 0 if it succeeds, or one of the following 6827 * negative error codes: 6828 * 6829 * $(D ErrorCode.NOMEM) 6830 * Out of memory. 6831 * $(D ErrorCode.INVALID_STATE) 6832 * The $(D Session) is initialized as client. 6833 */ 6834 ErrorCode submitShutdownNotice(Session session) 6835 { 6836 if (!session.is_server) { 6837 return ErrorCode.INVALID_STATE; 6838 } 6839 if (session.goaway_flags) 6840 return ErrorCode.OK; 6841 6842 return session.addGoAway((1u << 31) - 1, FrameError.NO_ERROR, null, GoAwayAuxFlags.SHUTDOWN_NOTICE); 6843 } 6844 6845 private: 6846 6847 FrameFlags setResponseFlags(in DataProvider data_prd) 6848 { 6849 FrameFlags flags = FrameFlags.NONE; 6850 6851 if (!data_prd) 6852 flags |= FrameFlags.END_STREAM; 6853 6854 return flags; 6855 } 6856 6857 FrameFlags setRequestFlags(in PrioritySpec pri_spec, in DataProvider data_prd) 6858 { 6859 FrameFlags flags = FrameFlags.NONE; 6860 if (!data_prd) 6861 flags |= FrameFlags.END_STREAM; 6862 6863 if (pri_spec != PrioritySpec.init) 6864 flags |= FrameFlags.PRIORITY; 6865 6866 return flags; 6867 } 6868 6869 /* This function takes ownership of |hfa_copy|. Regardless of the 6870 return value, the caller must not free |hfa_copy| after this 6871 function returns. */ 6872 int submitHeadersShared(Session session, FrameFlags flags, int stream_id, 6873 const ref PrioritySpec pri_spec, HeaderField[] hfa_copy, 6874 in DataProvider data_prd, void *stream_user_data, bool attach_stream) 6875 { 6876 ErrorCode rv; 6877 FrameFlags flags_copy; 6878 OutboundItem item; 6879 Frame* frame; 6880 HeadersCategory hcat; 6881 bool owns_hfa = true; 6882 scope(failure) 6883 if (owns_hfa && hfa_copy) 6884 hfa_copy.free(); 6885 6886 if (stream_id == 0) { 6887 hfa_copy.free(); 6888 return ErrorCode.INVALID_ARGUMENT; 6889 } 6890 6891 item = Mem.alloc!OutboundItem(session); 6892 scope(failure) { 6893 if (item) Mem.free(item); 6894 } 6895 if (data_prd) { 6896 item.aux_data.headers.data_prd = data_prd; 6897 } 6898 6899 item.aux_data.headers.stream_user_data = stream_user_data; 6900 item.aux_data.headers.attach_stream = attach_stream; 6901 6902 flags_copy = cast(FrameFlags)((flags & (FrameFlags.END_STREAM | FrameFlags.PRIORITY)) | FrameFlags.END_HEADERS); 6903 6904 if (stream_id == -1) { 6905 if (session.next_stream_id > int.max) { 6906 if (item) Mem.free(item); 6907 if (hfa_copy) 6908 hfa_copy.free(); 6909 return ErrorCode.STREAM_ID_NOT_AVAILABLE; 6910 } 6911 6912 stream_id = session.next_stream_id; 6913 session.next_stream_id += 2; 6914 6915 hcat = HeadersCategory.REQUEST; 6916 } else { 6917 /* More specific categorization will be done later. */ 6918 hcat = HeadersCategory.HEADERS; 6919 } 6920 6921 frame = &item.frame; 6922 6923 owns_hfa = false; 6924 frame.headers = Headers(flags_copy, stream_id, hcat, pri_spec, hfa_copy); 6925 session.addItem(item); 6926 6927 if (rv != ErrorCode.OK) { 6928 if (item) { Mem.free(item); } 6929 if (hfa_copy) 6930 hfa_copy.free(); 6931 return rv; 6932 } 6933 6934 if (hcat == HeadersCategory.REQUEST) 6935 return stream_id; 6936 6937 return ErrorCode.OK; 6938 } 6939 6940 6941 6942 int submitHeadersSharedHfa(Session session, FrameFlags flags, int stream_id, in PrioritySpec pri_spec, in HeaderField[] hfa, 6943 in DataProvider data_prd, void *stream_user_data, bool attach_stream) 6944 { 6945 HeaderField[] hfa_copy = hfa.copy(); 6946 PrioritySpec copy_pri_spec = pri_spec; 6947 copy_pri_spec.adjustWeight(); 6948 6949 return submitHeadersShared(session, flags, stream_id, copy_pri_spec, hfa_copy, data_prd, stream_user_data, attach_stream); 6950 } 6951 6952 public: 6953 6954 /** 6955 * A helper function for dealing with NPN in client side or ALPN in 6956 * server side. The |input| contains peer's protocol list in preferable 6957 * order. The format of |input| is length-prefixed and not 6958 * null-terminated. For example, `HTTP-draft-04/2.0` and 6959 * `http/1.1` stored in |input| like this:: 6960 * 6961 * in[0] = 17 6962 * in[1..17] = "HTTP-draft-04/2.0" 6963 * in[18] = 8 6964 * in[19..26] = "http/1.1" 6965 * inlen = 27 6966 * 6967 * The selection algorithm is as follows: 6968 * 6969 * 1. If peer's list contains HTTP/2 protocol the library supports, 6970 * it is selected and returns 1. The following step is not taken. 6971 * 6972 * 2. If peer's list contains `http/1.1`, this function selects 6973 * `http/1.1` and returns 0. The following step is not taken. 6974 * 6975 * 3. This function selects nothing and returns -1 (So called 6976 * non-overlap case). In this case, |output| is left 6977 * untouched. 6978 * 6979 * Selecting `HTTP-draft-04/2.0` means that `HTTP-draft-04/2.0` is 6980 * written into |*out| and its length (which is 17) is assigned to 6981 * |*outlen|. 6982 * 6983 * For ALPN, refer to 6984 * https://tools.ietf.org/html/draft-ietf-tls-applayerprotoneg-05 6985 * 6986 * See http://technotes.googlecode.com/git/nextprotoneg.html for more 6987 * details about NPN. 6988 * 6989 * For NPN, to use this method you should do something like:: 6990 * 6991 * static int select_next_proto_cb(SSL* ssl, 6992 * unsigned char **out, 6993 * unsigned char *outlen, 6994 * const unsigned char *in, 6995 * unsigned int inlen, 6996 * void *arg) 6997 * { 6998 * int rv; 6999 * rv = selectNextProtocol(out, outlen, in, inlen); 7000 * if(rv == 1) { 7001 * (cast(MyType*)arg).http2_selected = 1; 7002 * } 7003 * return SSL_TLSEXT_ERR_OK; 7004 * } 7005 * ... 7006 * SSL_CTX_set_next_proto_select_cb(ssl_ctx, select_next_proto_cb, my_obj); 7007 * 7008 */ 7009 int selectNextProtocol(ref ubyte[] output, in ubyte[] input, ubyte[] other_proto = null) 7010 { 7011 size_t i; 7012 size_t len; 7013 while (i < input.length) 7014 { 7015 len = input[i]; 7016 ++i; 7017 ubyte[] proto = cast(ubyte[]) input[i .. i+len]; 7018 i += len; 7019 if (other_proto && other_proto == proto) 7020 { 7021 output = proto; 7022 return 1; 7023 } 7024 7025 if (proto == PROTOCOL_ALPN) { 7026 output = proto; 7027 return 1; 7028 } 7029 if (proto == HTTP_1_1_ALPN) { 7030 output = proto; 7031 return ErrorCode.OK; 7032 } 7033 } 7034 return -1; 7035 } 7036 7037 7038 7039 /** 7040 * Returns true if the $(D RV) library error code 7041 * |lib_error| is fatal. 7042 */ 7043 bool isFatal(int lib_error) { return lib_error < ErrorCode.FATAL; } 7044 7045 7046 /// Configuration options 7047 enum OptionFlags { 7048 /** 7049 * This option prevents the library from sending WINDOW_UPDATE for a 7050 * connection automatically. If this option is set to nonzero, the 7051 * library won't send WINDOW_UPDATE for DATA until application calls 7052 * $(D Session.consume) to indicate the amount of consumed 7053 * DATA. By default, this option is set to zero. 7054 */ 7055 NO_AUTO_WINDOW_UPDATE = 1, 7056 /** 7057 * This option sets the Setting.MAX_CONCURRENT_STREAMS value of 7058 * remote endpoint as if it is received in SETTINGS frame. Without 7059 * specifying this option, before the local endpoint receives 7060 * Setting.MAX_CONCURRENT_STREAMS in SETTINGS frame from remote 7061 * endpoint, Setting.MAX_CONCURRENT_STREAMS is unlimited. This may 7062 * cause problem if local endpoint submits lots of requests 7063 * initially and sending them at once to the remote peer may lead to 7064 * the rejection of some requests. Specifying this option to the 7065 * sensible value, say 100, may avoid this kind of issue. This value 7066 * will be overwritten if the local endpoint receives 7067 * Setting.MAX_CONCURRENT_STREAMS from the remote endpoint. 7068 */ 7069 PEER_MAX_CONCURRENT_STREAMS = 1 << 1, 7070 RECV_CLIENT_PREFACE = 1 << 2, 7071 NO_HTTP_MESSAGING = 1 << 3, 7072 } 7073 7074 /// Struct to store option values for http2_session. 7075 struct Options { 7076 private: 7077 /// Bitwise OR of http2_option_flag to determine which fields are specified. 7078 uint m_opt_set_mask; 7079 7080 uint m_peer_max_concurrent_streams; 7081 7082 bool m_no_auto_window_update; 7083 7084 bool m_recv_client_preface; 7085 7086 bool m_no_http_messaging; 7087 public: 7088 @property uint peer_max_concurrent_streams() const { return m_peer_max_concurrent_streams; } 7089 @property uint opt_set_mask() const { return m_opt_set_mask; } 7090 @property bool no_auto_window_update() const { return m_no_auto_window_update; } 7091 @property bool recv_client_preface() const { return m_recv_client_preface; } 7092 @property bool no_http_messaging() const { return m_no_http_messaging; } 7093 7094 /** 7095 * This option prevents the library from sending WINDOW_UPDATE for a 7096 * connection automatically. If this option is set to nonzero, the 7097 * library won't send WINDOW_UPDATE for DATA until application calls 7098 * `consume()` to indicate the consumed amount of 7099 * data. Don't use $(D submitWindowUpdate) for this purpose. 7100 * By default, this option is set to zero. 7101 */ 7102 @property void setNoAutoWindowUpdate(bool val) 7103 { 7104 if (val) m_opt_set_mask |= OptionFlags.NO_AUTO_WINDOW_UPDATE; 7105 else m_opt_set_mask |= ~OptionFlags.NO_AUTO_WINDOW_UPDATE; 7106 m_no_auto_window_update = val; 7107 } 7108 7109 /** 7110 * This option sets the Setting.MAX_CONCURRENT_STREAMS value of 7111 * remote endpoint as if it is received in SETTINGS frame. Without 7112 * specifying this option, before the local endpoint receives 7113 * Setting.MAX_CONCURRENT_STREAMS in SETTINGS frame from remote 7114 * endpoint, Setting.MAX_CONCURRENT_STREAMS is unlimited. This may 7115 * cause problem if local endpoint submits lots of requests initially 7116 * and sending them at once to the remote peer may lead to the 7117 * rejection of some requests. Specifying this option to the sensible 7118 * value, say 100, may avoid this kind of issue. This value will be 7119 * overwritten if the local endpoint receives 7120 * Setting.MAX_CONCURRENT_STREAMS from the remote endpoint. 7121 */ 7122 void setPeerMaxConcurrentStreams(uint val) 7123 { 7124 m_opt_set_mask |= OptionFlags.PEER_MAX_CONCURRENT_STREAMS; 7125 m_peer_max_concurrent_streams = val; 7126 } 7127 7128 /** 7129 * By default, libhttp2 library only handles HTTP/2 frames and does not 7130 * recognize first 24 bytes of client connection preface. This design 7131 * choice is done due to the fact that server may want to detect the 7132 * application protocol based on first few bytes on clear text 7133 * communication. But for simple servers which only speak HTTP/2, it 7134 * is easier for developers if libhttp2 library takes care of client 7135 * connection preface. 7136 * 7137 * If this option is used with nonzero |val|, libhttp2 library checks 7138 * first 24 bytes client connection preface. If it is not a valid 7139 * one, $(D Session.recv) and $(D Session.memRecv) will 7140 * return error $(D ErrorCode.BAD_PREFACE), which is fatal error. 7141 */ 7142 void setRecvClientPreface(bool val) 7143 { 7144 m_opt_set_mask |= OptionFlags.RECV_CLIENT_PREFACE; 7145 m_recv_client_preface = val; 7146 } 7147 7148 /** 7149 * By default, libhttp2 library enforces subset of HTTP Messaging rules 7150 * described in `HTTP/2 specification, section 8 7151 * <https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-8>`_. 7152 * See `HTTP Messaging`_ section for details. For those applications 7153 * who use libhttp2 library as non-HTTP use, give nonzero to |val| to 7154 * disable this enforcement. 7155 */ 7156 void setNoHTTPMessaging(bool val) 7157 { 7158 m_opt_set_mask |= OptionFlags.NO_HTTP_MESSAGING; 7159 m_no_http_messaging = val; 7160 } 7161 7162 }