1 /** 2 * Session 3 * 4 * Copyright: 5 * (C) 2012-2015 Tatsuhiro Tsujikawa 6 * (C) 2014-2015 Etienne Cimon 7 * 8 * License: 9 * Distributed under the terms of the MIT license with an additional section 1.2 of the curl/libcurl project. 10 * Consult the provided LICENSE.md file for details 11 */ 12 module libhttp2.session; 13 14 import libhttp2.constants; 15 import libhttp2.types; 16 import libhttp2.frame; 17 import libhttp2.stream; 18 import libhttp2.connector; 19 import libhttp2.deflater; 20 import libhttp2.inflater; 21 import libhttp2.buffers; 22 import libhttp2.priority_queue; 23 import libhttp2.helpers; 24 import libhttp2.huffman; 25 import core.exception : RangeError; 26 27 import memutils.circularbuffer; 28 import memutils.vector; 29 import memutils.hashmap; 30 31 import std.algorithm : min, max; 32 33 enum OptionsMask { 34 NONE = 0, 35 NO_AUTO_WINDOW_UPDATE = 1 << 0, 36 RECV_CLIENT_PREFACE = 1 << 1, 37 NO_HTTP_MESSAGING = 1 << 2, 38 } 39 40 enum OutboundState { 41 POP_ITEM, 42 SEND_DATA, 43 SEND_NO_COPY 44 } 45 46 struct ActiveOutboundItem { 47 OutboundItem item; 48 Buffers framebufs; 49 OutboundState state = OutboundState.POP_ITEM; 50 51 void reset() { 52 LOGF("send: reset http2_active_outbound_item"); 53 LOGF("send: aob.item = %s", item); 54 if(item) { 55 item.free(); 56 Mem.free(item); 57 item = null; 58 } 59 framebufs.reset(); 60 state = OutboundState.POP_ITEM; 61 } 62 } 63 64 /// Internal state when receiving incoming frame 65 enum InboundState : ubyte { 66 /* Receiving frame header */ 67 READ_CLIENT_PREFACE, 68 READ_FIRST_SETTINGS, 69 READ_HEAD, 70 READ_NBYTE, 71 READ_HEADER_BLOCK, 72 IGN_HEADER_BLOCK, 73 IGN_PAYLOAD, 74 FRAME_SIZE_ERROR, 75 READ_SETTINGS, 76 READ_GOAWAY_DEBUG, 77 EXPECT_CONTINUATION, 78 IGN_CONTINUATION, 79 READ_PAD_DATA, 80 READ_DATA, 81 IGN_DATA, 82 IGN_ALL, 83 } 84 85 struct InboundFrame { 86 Frame frame; 87 88 /* The received SETTINGS entry. The protocol says that we only cares 89 about the defined settings ID. If unknown ID is received, it is 90 ignored. We use last entry to hold minimum header table size if 91 same settings are seen multiple times. */ 92 Setting[INBOUND_NUM_IV] iva; 93 94 /// buffer pointers to small buffer, raw_sbuf 95 Buffer sbuf; 96 97 /// buffer pointers to large buffer, raw_lbuf 98 Buffer lbuf; 99 100 /// Large buffer, malloced on demand 101 ubyte[] raw_lbuf; 102 103 /* The number of entry filled in |iva| */ 104 size_t niv; 105 106 /* How many bytes we still need to receive for current frame */ 107 size_t payloadleft; 108 109 /* padding length for the current frame */ 110 size_t padlen; 111 112 InboundState state; 113 114 /* Small buffer. Currently the largest contiguous chunk to buffer 115 is frame header. We buffer part of payload, but they are smaller 116 than frame header. */ 117 ubyte[FRAME_HDLEN] raw_sbuf; 118 119 /// Returns the amount of bytes that are required by this frame 120 size_t readLength(const ubyte* input, const ubyte* last) 121 { 122 return min(cast(size_t)(last - input), payloadleft); 123 } 124 125 /* 126 * Resets iframe.sbuf and advance its mark pointer by |left| bytes. 127 */ 128 void setMark(size_t left) 129 { 130 sbuf.reset; 131 sbuf.mark += left; 132 } 133 134 size_t read(in ubyte* input, in ubyte* last) 135 { 136 import std.c.string : memcpy; 137 138 size_t readlen; 139 140 readlen = min(last - input, sbuf.markAvailable); 141 142 memcpy(sbuf.last, input, readlen); 143 sbuf.last += readlen; 144 145 return readlen; 146 } 147 148 /* 149 * Unpacks SETTINGS entry in iframe.sbuf. 150 */ 151 void unpackSetting() 152 { 153 Setting _iv; 154 _iv.unpack(sbuf[]); 155 156 size_t i; 157 158 with(Setting) switch (_iv.id) { 159 case HEADER_TABLE_SIZE: 160 case ENABLE_PUSH: 161 case MAX_CONCURRENT_STREAMS: 162 case INITIAL_WINDOW_SIZE: 163 case MAX_FRAME_SIZE: 164 case MAX_HEADER_LIST_SIZE: 165 break; 166 default: 167 LOGF("recv: ignore unknown settings id=0x%02x", _iv.id); 168 return; 169 } 170 171 for(i = 0; i < niv; ++i) { 172 if (iva[i].id == _iv.id) { 173 iva[i] = _iv; 174 break; 175 } 176 } 177 178 if (i == niv) { 179 iva[niv] = _iv; 180 niv++; 181 } 182 183 if (_iv.id == Setting.HEADER_TABLE_SIZE && _iv.value < iva[INBOUND_NUM_IV - 1].value) 184 { 185 iva[INBOUND_NUM_IV - 1] = _iv; 186 } 187 } 188 private: 189 /* 190 * Checks PADDED flags and set iframe.sbuf to read them accordingly. 191 * If padding is set, this function returns 1. If no padding is set, 192 * this function returns 0. On error, returns -1. 193 */ 194 int handlePad() 195 { 196 if (frame.hd.flags & FrameFlags.PADDED) { 197 if (frame.hd.length < 1) { 198 return -1; 199 } 200 setMark(1); 201 return 1; 202 } 203 LOGF("recv: no padding in payload"); 204 return ErrorCode.OK; 205 } 206 207 /* 208 * Computes number of padding based on flags. This function returns 209 * padlen if it succeeds, or -1. 210 */ 211 int computePad() 212 { 213 /* 1 for Pad Length field */ 214 int _padlen = sbuf.pos[0] + 1; 215 216 LOGF("recv: padlen=%d", padlen); 217 218 /* We cannot use iframe.frame.hd.length because of CONTINUATION */ 219 if (_padlen - 1 > payloadleft) { 220 return -1; 221 } 222 223 padlen = _padlen; 224 225 return _padlen; 226 } 227 228 /* 229 * This function returns the effective payload length in the data of 230 * length |readlen| when the remaning payload is |payloadleft|. The 231 * |payloadleft| does not include |readlen|. If padding was started 232 * strictly before this data chunk, this function returns -1. 233 */ 234 int effectiveReadLength(size_t _payloadleft, size_t readlen) 235 { 236 size_t trail_padlen = frame.trailPadlen(padlen); 237 238 if (trail_padlen > _payloadleft) { 239 size_t padlen; 240 padlen = trail_padlen - _payloadleft; 241 if (readlen < padlen) { 242 return -1; 243 } else { 244 return cast(int)(readlen - padlen); 245 } 246 } 247 return cast(int)readlen; 248 } 249 250 void reset() 251 { 252 /* A bit risky code, since if this function is called from Session(), we rely on the fact that 253 frame.hd.type is 0, so that no free is performed. */ 254 with (FrameType) switch (frame.hd.type) { 255 case HEADERS: 256 frame.headers.free(); 257 break; 258 case PRIORITY: 259 frame.priority.free(); 260 break; 261 case RST_STREAM: 262 frame.rst_stream.free(); 263 break; 264 case SETTINGS: 265 frame.settings.free(); 266 break; 267 case PUSH_PROMISE: 268 frame.push_promise.free(); 269 break; 270 case PING: 271 frame.ping.free(); 272 break; 273 case GOAWAY: 274 frame.goaway.free(); 275 break; 276 case WINDOW_UPDATE: 277 frame.window_update.free(); 278 break; 279 default: break; 280 } 281 282 destroy(frame); 283 284 state = InboundState.READ_HEAD; 285 286 sbuf = Buffer(raw_sbuf.ptr[0 .. raw_sbuf.sizeof]); 287 sbuf.mark += FRAME_HDLEN; 288 289 lbuf.free(); 290 lbuf = Buffer(); 291 destroy(iva); 292 payloadleft = 0; 293 padlen = 0; 294 iva[INBOUND_NUM_IV - 1].id = Setting.HEADER_TABLE_SIZE; 295 iva[INBOUND_NUM_IV - 1].value = uint.max; 296 niv = 0; 297 } 298 } 299 300 struct SettingsStorage { 301 uint header_table_size = HD_DEFAULT_MAX_BUFFER_SIZE; 302 uint enable_push = 1; 303 uint max_concurrent_streams = INITIAL_MAX_CONCURRENT_STREAMS; 304 uint initial_window_size = INITIAL_WINDOW_SIZE; 305 uint max_frame_size = MAX_FRAME_SIZE_MIN; 306 uint max_header_list_size = uint.max; 307 } 308 309 enum GoAwayFlags { 310 NONE = 0, 311 /* Flag means that connection should be terminated after sending GOAWAY. */ 312 TERM_ON_SEND = 0x1, 313 /* Flag means GOAWAY to terminate session has been sent */ 314 TERM_SENT = 0x2, 315 /* Flag means GOAWAY was sent */ 316 SENT = 0x4, 317 /* Flag means GOAWAY was received */ 318 RECV = 0x8, 319 } 320 321 enum { 322 CLIENT = false, 323 SERVER = true 324 } 325 326 class Session { 327 328 this(bool server, Connector callbacks, in Options options = Options.init) 329 { 330 if (server) { 331 is_server = true; 332 next_stream_id = 2; // server IDs always pair 333 } 334 else 335 next_stream_id = 1; // client IDs always impair 336 337 roots = Mem.alloc!StreamRoots(); 338 scope(failure) Mem.free(roots); 339 340 hd_inflater = Inflater(true); 341 scope(failure) hd_inflater.free(); 342 343 hd_deflater = Deflater(DEFAULT_MAX_DEFLATE_BUFFER_SIZE); 344 scope(failure) hd_deflater.free(); 345 346 ob_pq = PriorityQueue(128); 347 scope(failure) ob_pq.free(); 348 349 ob_ss_pq = PriorityQueue(128); 350 scope(failure) ob_ss_pq.free(); 351 352 ob_da_pq = PriorityQueue(128); 353 scope(failure) ob_da_pq.free(); 354 355 /* 1 for Pad Field. */ 356 aob.framebufs = Mem.alloc!Buffers(FRAMEBUF_CHUNKLEN, FRAMEBUF_MAX_NUM, 1, FRAME_HDLEN + 1); 357 scope(failure) { aob.framebufs.free(); Mem.free(aob.framebufs); } 358 359 aob.reset(); 360 361 if (options != Options.init) { 362 if ((options.opt_set_mask & OptionFlags.NO_AUTO_WINDOW_UPDATE) && options.no_auto_window_update) 363 { 364 opt_flags |= OptionsMask.NO_AUTO_WINDOW_UPDATE; 365 } 366 367 if (options.opt_set_mask & OptionFlags.PEER_MAX_CONCURRENT_STREAMS) 368 { 369 remote_settings.max_concurrent_streams = options.peer_max_concurrent_streams; 370 } 371 372 if ((options.opt_set_mask & OptionFlags.RECV_CLIENT_PREFACE) && options.recv_client_preface) 373 { 374 opt_flags |= OptionsMask.RECV_CLIENT_PREFACE; 375 } 376 377 if ((options.opt_set_mask & OptionFlags.NO_HTTP_MESSAGING) && options.no_http_messaging) 378 { 379 opt_flags |= OptionsMask.NO_HTTP_MESSAGING; 380 } 381 } 382 383 connector = callbacks; 384 385 iframe.reset(); 386 387 if (is_server && opt_flags & OptionsMask.RECV_CLIENT_PREFACE) 388 { 389 iframe.state = InboundState.READ_CLIENT_PREFACE; 390 iframe.payloadleft = CLIENT_CONNECTION_PREFACE.length; 391 } else static if (ENABLE_FIRST_SETTING_CHECK) 392 { 393 iframe.state = InboundState.READ_FIRST_SETTINGS; 394 } 395 } 396 397 /** 398 * Frees any resources allocated for $(D Session). If $(D Session) is 399 * `null`, this function does nothing. 400 */ 401 void free() { 402 if (inflight_iva) 403 Mem.free(inflight_iva); 404 roots.free(); 405 Mem.free(roots); 406 freeAllStreams(); 407 iframe.reset(); 408 ob_pq.free(); 409 ob_ss_pq.free(); 410 ob_da_pq.free(); 411 aob.reset(); 412 hd_deflater.free(); 413 hd_inflater.free(); 414 aob.framebufs.free(); 415 if (aob.framebufs) 416 Mem.free(aob.framebufs); 417 destroy(streams); 418 } 419 420 /** 421 * Sends pending frames to the remote peer. 422 * 423 * This function retrieves the highest prioritized frame from the 424 * outbound queue and sends it to the remote peer. It does this as 425 * many as possible until the user callback $(D Connector.write) returns 426 * $(D ErrorCode.WOULDBLOCK) or the outbound queue becomes empty. 427 * 428 * This function calls several $(D Connector) functions which are passed 429 * when initializing the $(D Session). Here is the simple time chart 430 * which tells when each callback is invoked: 431 * 432 * 1. Get the next frame to be sent from a priority sorted outbound queue. 433 * 434 * 2. Prepare transmission of the frame. 435 * 436 * 3. $(D Connector.onFrameFailure) may be invoked if the control frame cannot 437 * be sent because some preconditions are not met (e.g., request HEADERS 438 * cannot be sent after GOAWAY). This then aborts the following steps. 439 * 440 * 4. $(D Connector.selectPaddingLength) is invoked if the frame is HEADERS, 441 * PUSH_PROMISE or DATA. 442 * 443 * 5. If the frame is request HEADERS, the stream is opened here. 444 * 445 * 6. $(D Connector.onFrameReady) is invoked. 446 * 447 * 7. $(D Connector.write) is invoked one or more times to send the frame. 448 * 449 * 8. $(D Connector.onFrameSent) is invoked after all data is transmitted. 450 * 451 * 9. $(D Connector.onStreamExit) may be invoked if the transmission of the frame 452 * triggers closure of the stream, it is destroyed afterwards. 453 * 454 * This function returns 0 if it succeeds, or one of the following 455 * negative error codes: 456 * 457 * $(D ErrorCode.CALLBACK_FAILURE) 458 * The callback function failed. 459 */ 460 ErrorCode send() { 461 ErrorCode rv; 462 ubyte[] data; 463 int sentlen; 464 Buffers framebufs = aob.framebufs; 465 466 for (;;) { 467 rv = memSendInternal(data, false); 468 if (rv < 0) 469 return rv; 470 else if (data.length == 0) 471 return ErrorCode.OK; 472 try sentlen = connector.write(data); 473 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 474 475 if (sentlen < 0) { 476 if (cast(ErrorCode) sentlen == ErrorCode.WOULDBLOCK) { 477 /* Transmission canceled. Rewind the offset */ 478 framebufs.cur.buf.pos -= data.length; 479 return ErrorCode.OK; 480 } 481 482 return ErrorCode.CALLBACK_FAILURE; 483 } 484 485 /* Rewind the offset to the amount of unsent bytes */ 486 framebufs.cur.buf.pos -= (data.length - sentlen); 487 } 488 489 assert(false); 490 } 491 492 /** 493 * @function 494 * 495 * Returns the serialized data to send. 496 * 497 * This function behaves like `send()` except that it 498 * does not use $(D Connector.write) to transmit data. 499 * Instead, it assigns the serialized data to the given $(D ubyte[]) 500 * |data_arr|. The other callbacks are called in the same way as they are 501 * in `send()`. 502 * 503 * This function may not return all serialized data in one invocation. 504 * To get all data, call this function repeatedly until it returns an 505 * array of 0 length or one of negative error codes. 506 * 507 * The assigned |data_ar| is valid until the next call of 508 * `memSend()` or `send()`. 509 * 510 * The caller must send all data before sending the next chunk of 511 * data. 512 * 513 * This function returns an error code on failure or 0 on success 514 */ 515 ErrorCode memSend(ref ubyte[] data_arr) 516 { 517 ErrorCode rv; 518 519 rv = memSendInternal(data_arr, true); 520 if (rv < 0) { 521 return rv; 522 } 523 524 /* We have to call afterFrameSent here to handle stream 525 closure upon transmission of frames. Otherwise, END_STREAM may 526 be reached to client before we call memSend 527 again and we may get exceeding number of incoming streams. */ 528 rv = afterFrameSent(); 529 if (rv < 0) { 530 /* FATAL */ 531 assert(isFatal(rv)); 532 return rv; 533 } 534 535 return ErrorCode.OK; 536 } 537 538 /** 539 * Receives frames from the remote peer. 540 * 541 * This function receives as many frames as possible until the user 542 * callback $(D Connector.read) returns $(D ErrorCode.WOULDBLOCK). 543 * This function calls several $(D Connector) functions which are passed 544 * when initializing the $(D Session). 545 * 546 * Here is the simple time chart which tells when each callback is invoked: 547 * 548 * 1. $(D Connector.read) is invoked one or more times to receive the frame header. 549 * 550 * 2. $(D Connector.onFrameHeader) is invoked after the frame header is received. 551 * 552 * 3. If the frame is DATA frame: 553 * 554 * 1. $(D Connector.read) is invoked one or more times to receive the DATA payload. 555 * 556 * 2. $(D Connector.onDataChunk) is invoked alternatively with $(D Connector.read) 557 * for each chunk of data. 558 * 559 * 2. $(D Connector.onFrame) may be invoked if one DATA frame is completely received. 560 * 561 * 3. $(D Connector.onStreamExit) may be invoked if the reception of the frame triggers 562 * closure of the stream. 563 * 564 * 4. If the frame is the control frame: 565 * 566 * 1. $(D Connector.read) is invoked one or more times to receive the whole frame. 567 * 568 * 2. If the received frame is valid, then following actions are 569 * taken. 570 * - If the frame is either HEADERS or PUSH_PROMISE: 571 * - $(D Connector.onHeaders) is invoked first. 572 * - $(D Connector.onHeaderField) is invoked for each header fields. 573 * - $(D Connector.onFrame) is invoked after all header fields. 574 * - For other frames: 575 * - $(D Connector.onFrame) is invoked. 576 * - $(D Connector.onStreamExit) may be invoked if the reception of the frame 577 * triggers the closure of the stream. 578 * 579 * 3. $(D Connector.onInvalidFrame) may be invoked if the received frame is unpacked 580 * but is interpreted as invalid. 581 * 582 * This function returns 0 if it succeeds, or one of the following 583 * negative error codes: 584 * 585 * $(D ErrorCode.EOF) 586 * The remote peer did shutdown on the connection. 587 * $(D ErrorCode.CALLBACK_FAILURE) 588 * The callback function failed. 589 * $(D ErrorCode.BAD_PREFACE) 590 * Invalid client preface was detected. This error only returns 591 * when $(D Session) was configured as server and 592 * `setRecvClientPreface()` is used. 593 */ 594 ErrorCode recv() { 595 ubyte[INBOUND_BUFFER_LENGTH] buf; 596 while (1) { 597 int readlen; 598 readlen = callRead(buf.ptr[0 .. buf.sizeof]); 599 if (readlen > 0) { 600 // process the received data 601 int proclen = memRecv(buf[0 .. readlen]); 602 if (proclen < 0) { 603 return cast(ErrorCode)proclen; 604 } 605 assert(proclen == readlen); 606 } else if (readlen == 0 || readlen == ErrorCode.WOULDBLOCK) { 607 return ErrorCode.OK; 608 } else if (readlen == ErrorCode.EOF) { 609 return ErrorCode.EOF; 610 } else if (readlen < 0) { 611 return ErrorCode.CALLBACK_FAILURE; 612 } 613 } 614 } 615 616 /** 617 * Processes data |input| as an input from the remote endpoint. The 618 * |inlen| indicates the number of bytes in the |in|. 619 * 620 * This function behaves like $(D Session.recv) except that it 621 * does not use $(D Connector.read) to receive data; the 622 * |input| is the only data for the invocation of this function. If all 623 * bytes are processed, this function returns. The other connector 624 * are called in the same way as they are in $(D Session.recv). 625 * 626 * In the current implementation, this function always tries to 627 * process all input data unless either an error occurs or 628 * $(D ErrorCode.PAUSE) is returned from $(D Connector.onHeaderField) or 629 * $(D Connector.onDataChunk). If $(D ErrorCode.PAUSE) is used, 630 * the return value includes the number of bytes which was used to 631 * produce the data or frame for the callback. 632 * 633 * This function returns the number of processed bytes, or one of the 634 * following negative error codes: 635 * 636 * $(D ErrorCode.CALLBACK_FAILURE) 637 * The callback function failed. 638 * $(D ErrorCode.BAD_PREFACE) 639 * Invalid client preface was detected. This error only returns 640 * when $(D Session) was configured as server and 641 * `setRecvClientPreface()` is used. 642 */ 643 int memRecv(in ubyte[] input) 644 { 645 const(ubyte)* pos = input.ptr; 646 const ubyte* first = input.ptr; 647 const ubyte* last = input.ptr + input.length; 648 size_t readlen; 649 int padlen; 650 ErrorCode rv; 651 bool busy; 652 FrameHeader cont_hd; 653 Stream stream; 654 size_t pri_fieldlen; 655 656 LOGF("recv: connection recv_window_size=%d, local_window=%d", recv_window_size, local_window_size); 657 658 for (;;) { 659 with(InboundState) final switch (iframe.state) { 660 case READ_CLIENT_PREFACE: 661 readlen = min(input.length, iframe.payloadleft); 662 663 if (CLIENT_CONNECTION_PREFACE[$ - iframe.payloadleft .. $ - iframe.payloadleft + readlen] != pos[0 .. readlen]) 664 { 665 return ErrorCode.BAD_PREFACE; 666 } 667 668 iframe.payloadleft -= readlen; 669 pos += readlen; 670 671 if (iframe.payloadleft == 0) { 672 iframe.reset(); 673 iframe.state = READ_FIRST_SETTINGS; 674 } 675 676 break; 677 case READ_FIRST_SETTINGS: 678 LOGF("recv: [READ_FIRST_SETTINGS]"); 679 680 readlen = iframe.read(pos, last); 681 pos += readlen; 682 683 if (iframe.sbuf.markAvailable) { 684 return cast(int)(pos - first); 685 } 686 687 if (iframe.sbuf.pos[3] != FrameType.SETTINGS || (iframe.sbuf.pos[4] & FrameFlags.ACK)) 688 { 689 690 iframe.state = IGN_ALL; 691 692 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "SETTINGS expected"); 693 694 if (isFatal(rv)) { 695 return rv; 696 } 697 698 return cast(int)input.length; 699 } 700 701 iframe.state = READ_HEAD; 702 703 goto case READ_HEAD; 704 case READ_HEAD: { 705 bool on_frame_header_called; 706 707 LOGF("recv: [READ_HEAD]"); 708 709 readlen = iframe.read(pos, last); 710 pos += readlen; 711 712 if (iframe.sbuf.markAvailable) { 713 return cast(int)(pos - first); 714 } 715 716 iframe.frame.hd.unpack(iframe.sbuf[]); 717 iframe.payloadleft = iframe.frame.hd.length; 718 719 LOGF("recv: payloadlen=%d, type=%u, flags=0x%02x, stream_id=%d", 720 iframe.frame.hd.length, iframe.frame.hd.type, iframe.frame.hd.flags, iframe.frame.hd.stream_id); 721 722 if (iframe.frame.hd.length > local_settings.max_frame_size) { 723 LOGF("recv: length is too large %d > %u", iframe.frame.hd.length, local_settings.max_frame_size); 724 725 busy = true; 726 727 iframe.state = IGN_PAYLOAD; 728 729 rv = terminateSessionWithReason(FrameError.FRAME_SIZE_ERROR, "too large frame size"); 730 731 if (isFatal(rv)) { 732 return rv; 733 } 734 735 break; 736 } 737 738 switch (iframe.frame.hd.type) { 739 case FrameType.DATA: { 740 LOGF("recv: DATA"); 741 742 iframe.frame.hd.flags &= (FrameFlags.END_STREAM | FrameFlags.PADDED); 743 /* Check stream is open. If it is not open or closing, ignore payload. */ 744 busy = true; 745 746 rv = onDataFailFast(); 747 if (rv == ErrorCode.IGN_PAYLOAD) { 748 LOGF("recv: DATA not allowed stream_id=%d", iframe.frame.hd.stream_id); 749 iframe.state = IGN_DATA; 750 break; 751 } 752 753 if (isFatal(rv)) { 754 return rv; 755 } 756 757 rv = cast(ErrorCode)iframe.handlePad(); 758 if (rv < 0) { 759 iframe.state = IGN_DATA; 760 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "DATA: insufficient padding space"); 761 762 if (isFatal(rv)) { 763 return rv; 764 } 765 break; 766 } 767 768 if (rv == 1) { 769 iframe.state = READ_PAD_DATA; 770 break; 771 } 772 773 iframe.state = READ_DATA; 774 break; 775 } 776 case FrameType.HEADERS: 777 778 LOGF("recv: HEADERS"); 779 780 iframe.frame.hd.flags &= (FrameFlags.END_STREAM | FrameFlags.END_HEADERS | FrameFlags.PADDED | FrameFlags.PRIORITY); 781 782 rv = cast(ErrorCode)iframe.handlePad(); 783 if (rv < 0) { 784 busy = true; 785 786 iframe.state = IGN_PAYLOAD; 787 788 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "HEADERS: insufficient padding space"); 789 if (isFatal(rv)) { 790 return rv; 791 } 792 break; 793 } 794 795 if (rv == 1) { 796 iframe.state = READ_NBYTE; 797 break; 798 } 799 800 pri_fieldlen = priorityLength(iframe.frame.hd.flags); 801 802 if (pri_fieldlen > 0) { 803 if (iframe.payloadleft < pri_fieldlen) { 804 busy = true; 805 iframe.state = FRAME_SIZE_ERROR; 806 break; 807 } 808 809 iframe.state = READ_NBYTE; 810 811 iframe.setMark(pri_fieldlen); 812 break; 813 } 814 815 /* Call onFrameHeader here because processHeadersFrame() may call onHeaders callback */ 816 bool ok = callOnFrameHeader(iframe.frame.hd); 817 818 if (!ok) { 819 return ErrorCode.CALLBACK_FAILURE; 820 } 821 822 on_frame_header_called = true; 823 824 rv = processHeadersFrame(); 825 826 if (isFatal(rv)) { 827 return rv; 828 } 829 830 busy = true; 831 832 if (rv == ErrorCode.IGN_HEADER_BLOCK) { 833 iframe.state = IGN_HEADER_BLOCK; 834 break; 835 } 836 837 iframe.state = READ_HEADER_BLOCK; 838 839 break; 840 case FrameType.PRIORITY: 841 LOGF("recv: PRIORITY"); 842 843 iframe.frame.hd.flags = FrameFlags.NONE; 844 845 if (iframe.payloadleft != PRIORITY_SPECLEN) { 846 busy = true; 847 848 iframe.state = FRAME_SIZE_ERROR; 849 850 break; 851 } 852 853 iframe.state = READ_NBYTE; 854 855 iframe.setMark(PRIORITY_SPECLEN); 856 857 break; 858 case FrameType.RST_STREAM: 859 case FrameType.WINDOW_UPDATE: 860 static if (DEBUG) { 861 switch (iframe.frame.hd.type) { 862 case FrameType.RST_STREAM: 863 LOGF("recv: RST_STREAM"); 864 break; 865 case FrameType.WINDOW_UPDATE: 866 LOGF("recv: WINDOW_UPDATE"); 867 break; 868 default: break; 869 } 870 } 871 872 iframe.frame.hd.flags = FrameFlags.NONE; 873 874 if (iframe.payloadleft != 4) { 875 busy = true; 876 iframe.state = FRAME_SIZE_ERROR; 877 break; 878 } 879 880 iframe.state = READ_NBYTE; 881 882 iframe.setMark(4); 883 884 break; 885 case FrameType.SETTINGS: 886 LOGF("recv: SETTINGS"); 887 888 iframe.frame.hd.flags &= FrameFlags.ACK; 889 890 if ((iframe.frame.hd.length % FRAME_SETTINGS_ENTRY_LENGTH) || 891 ((iframe.frame.hd.flags & FrameFlags.ACK) && iframe.payloadleft > 0)) { 892 busy = true; 893 iframe.state = FRAME_SIZE_ERROR; 894 break; 895 } 896 897 iframe.state = READ_SETTINGS; 898 899 if (iframe.payloadleft) { 900 iframe.setMark(FRAME_SETTINGS_ENTRY_LENGTH); 901 break; 902 } 903 904 busy = true; 905 906 iframe.setMark(0); 907 908 break; 909 case FrameType.PUSH_PROMISE: 910 LOGF("recv: PUSH_PROMISE"); 911 912 iframe.frame.hd.flags &= (FrameFlags.END_HEADERS | FrameFlags.PADDED); 913 914 rv = cast(ErrorCode)iframe.handlePad(); 915 if (rv < 0) { 916 busy = true; 917 iframe.state = IGN_PAYLOAD; 918 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: insufficient padding space"); 919 if (isFatal(rv)) { 920 return rv; 921 } 922 break; 923 } 924 925 if (rv == 1) { 926 iframe.state = READ_NBYTE; 927 break; 928 } 929 930 if (iframe.payloadleft < 4) { 931 busy = true; 932 iframe.state = FRAME_SIZE_ERROR; 933 break; 934 } 935 936 iframe.state = READ_NBYTE; 937 938 iframe.setMark(4); 939 940 break; 941 case FrameType.PING: 942 LOGF("recv: PING"); 943 944 iframe.frame.hd.flags &= FrameFlags.ACK; 945 946 if (iframe.payloadleft != 8) { 947 busy = true; 948 iframe.state = FRAME_SIZE_ERROR; 949 break; 950 } 951 952 iframe.state = READ_NBYTE; 953 iframe.setMark(8); 954 955 break; 956 case FrameType.GOAWAY: 957 LOGF("recv: GOAWAY"); 958 959 iframe.frame.hd.flags = FrameFlags.NONE; 960 961 if (iframe.payloadleft < 8) { 962 busy = true; 963 iframe.state = FRAME_SIZE_ERROR; 964 break; 965 } 966 967 iframe.state = READ_NBYTE; 968 iframe.setMark(8); 969 970 break; 971 case FrameType.CONTINUATION: 972 LOGF("recv: unexpected CONTINUATION"); 973 974 /* Receiving CONTINUATION in this state are subject to connection error of type PROTOCOL_ERROR */ 975 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "CONTINUATION: unexpected"); 976 if (isFatal(rv)) 977 { 978 return rv; 979 } 980 981 busy = true; 982 983 iframe.state = IGN_PAYLOAD; 984 985 break; 986 default: 987 LOGF("recv: unknown frame"); 988 989 /* Silently ignore unknown frame type. */ 990 991 busy = true; 992 993 iframe.state = IGN_PAYLOAD; 994 995 break; 996 } 997 998 if (!on_frame_header_called) { 999 switch (iframe.state) { 1000 case IGN_HEADER_BLOCK: 1001 case IGN_PAYLOAD: 1002 case FRAME_SIZE_ERROR: 1003 case IGN_DATA: 1004 break; 1005 default: 1006 bool ok = callOnFrameHeader(iframe.frame.hd); 1007 1008 if (!ok) { 1009 return ErrorCode.CALLBACK_FAILURE; 1010 } 1011 } 1012 } 1013 1014 break; 1015 } 1016 case READ_NBYTE: 1017 LOGF("recv: [READ_NBYTE]"); 1018 1019 readlen = iframe.read(pos, last); 1020 pos += readlen; 1021 iframe.payloadleft -= readlen; 1022 1023 LOGF("recv: readlen=%d, payloadleft=%d, left=%d, type=%s", readlen, iframe.payloadleft, iframe.sbuf.markAvailable, iframe.frame.hd.type); 1024 1025 if (iframe.sbuf.markAvailable) { 1026 return cast(int)(pos - first); 1027 } 1028 1029 switch (iframe.frame.hd.type) { 1030 case FrameType.HEADERS: 1031 if (iframe.padlen == 0 && (iframe.frame.hd.flags & FrameFlags.PADDED)) { 1032 padlen = iframe.computePad(); 1033 if (padlen < 0) { 1034 busy = true; 1035 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "HEADERS: invalid padding"); 1036 if (isFatal(rv)) { 1037 return rv; 1038 } 1039 iframe.state = IGN_PAYLOAD; 1040 break; 1041 } 1042 iframe.frame.headers.padlen = padlen; 1043 1044 pri_fieldlen = priorityLength(iframe.frame.hd.flags); 1045 if (pri_fieldlen > 0) { 1046 if (iframe.payloadleft < pri_fieldlen) { 1047 busy = true; 1048 iframe.state = FRAME_SIZE_ERROR; 1049 break; 1050 } 1051 iframe.state = READ_NBYTE; 1052 iframe.setMark(pri_fieldlen); 1053 break; 1054 } else { 1055 /* Truncate buffers used for padding spec */ 1056 iframe.setMark(0); 1057 } 1058 } 1059 1060 rv = processHeadersFrame(); 1061 if (isFatal(rv)) { 1062 return rv; 1063 } 1064 1065 busy = true; 1066 1067 if (rv == ErrorCode.IGN_HEADER_BLOCK) { 1068 iframe.state = IGN_HEADER_BLOCK; 1069 break; 1070 } 1071 1072 iframe.state = READ_HEADER_BLOCK; 1073 1074 break; 1075 case FrameType.PRIORITY: 1076 rv = processPriorityFrame(); 1077 if (isFatal(rv)) { 1078 return rv; 1079 } 1080 1081 iframe.reset(); 1082 1083 break; 1084 case FrameType.RST_STREAM: 1085 rv = processRstStreamFrame(); 1086 if (isFatal(rv)) { 1087 return rv; 1088 } 1089 1090 iframe.reset(); 1091 1092 break; 1093 case FrameType.PUSH_PROMISE: 1094 if (iframe.padlen == 0 && (iframe.frame.hd.flags & FrameFlags.PADDED)) { 1095 padlen = iframe.computePad(); 1096 if (padlen < 0) { 1097 busy = true; 1098 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: invalid padding"); 1099 if (isFatal(rv)) { 1100 return rv; 1101 } 1102 iframe.state = IGN_PAYLOAD; 1103 break; 1104 } 1105 1106 iframe.frame.push_promise.padlen = padlen; 1107 1108 if (iframe.payloadleft < 4) { 1109 busy = true; 1110 iframe.state = FRAME_SIZE_ERROR; 1111 break; 1112 } 1113 1114 iframe.state = READ_NBYTE; 1115 1116 iframe.setMark(4); 1117 1118 break; 1119 } 1120 1121 rv = processPushPromiseFrame(); 1122 if (isFatal(rv)) { 1123 return rv; 1124 } 1125 1126 busy = true; 1127 1128 if (rv == ErrorCode.IGN_HEADER_BLOCK) { 1129 iframe.state = IGN_HEADER_BLOCK; 1130 break; 1131 } 1132 1133 iframe.state = READ_HEADER_BLOCK; 1134 1135 break; 1136 case FrameType.PING: 1137 rv = processPingFrame(); 1138 if (isFatal(rv)) { 1139 return rv; 1140 } 1141 1142 iframe.reset(); 1143 1144 break; 1145 case FrameType.GOAWAY: { 1146 size_t debuglen; 1147 1148 /* 8 is Last-stream-ID + Error Code */ 1149 debuglen = iframe.frame.hd.length - 8; 1150 1151 if (debuglen > 0) { 1152 iframe.raw_lbuf = Mem.alloc!(ubyte[])(debuglen); 1153 iframe.lbuf = Buffer(iframe.raw_lbuf); 1154 } 1155 1156 busy = true; 1157 1158 iframe.state = READ_GOAWAY_DEBUG; 1159 1160 break; 1161 } 1162 case FrameType.WINDOW_UPDATE: 1163 rv = processWindowUpdateFrame(); 1164 if (isFatal(rv)) { 1165 return rv; 1166 } 1167 1168 iframe.reset(); 1169 1170 break; 1171 default: 1172 /* This is unknown frame */ 1173 iframe.reset(); 1174 1175 break; 1176 } 1177 break; 1178 case READ_HEADER_BLOCK: 1179 case IGN_HEADER_BLOCK: { 1180 int data_readlen; 1181 static if (DEBUG) { 1182 if (iframe.state == READ_HEADER_BLOCK) { 1183 LOGF("recv: [READ_HEADER_BLOCK]"); 1184 } else { 1185 LOGF("recv: [IGN_HEADER_BLOCK]"); 1186 } 1187 } 1188 1189 readlen = iframe.readLength(pos, last); 1190 1191 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft - readlen); 1192 1193 data_readlen = iframe.effectiveReadLength(iframe.payloadleft - readlen, readlen); 1194 1195 if (data_readlen >= 0) { 1196 size_t trail_padlen; 1197 size_t hd_proclen = 0; 1198 trail_padlen = iframe.frame.trailPadlen(iframe.padlen); 1199 LOGF("recv: block final=%d", (iframe.frame.hd.flags & FrameFlags.END_HEADERS) && iframe.payloadleft - data_readlen == trail_padlen); 1200 1201 rv = inflateHeaderBlock(iframe.frame, hd_proclen, cast(ubyte[])pos[0 .. data_readlen], 1202 (iframe.frame.hd.flags & FrameFlags.END_HEADERS) && iframe.payloadleft - data_readlen == trail_padlen, 1203 iframe.state == READ_HEADER_BLOCK); 1204 1205 if (isFatal(rv)) { 1206 return rv; 1207 } 1208 1209 if (rv == ErrorCode.PAUSE) { 1210 pos += hd_proclen; 1211 iframe.payloadleft -= hd_proclen; 1212 1213 return cast(int)(pos - first); 1214 } 1215 1216 if (rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) { 1217 /* The application says no more headers. We decompress the 1218 rest of the header block but not invoke on_header_callback 1219 and on_frame_recv_callback. */ 1220 pos += hd_proclen; 1221 iframe.payloadleft -= hd_proclen; 1222 1223 addRstStream(iframe.frame.hd.stream_id, FrameError.INTERNAL_ERROR); 1224 busy = true; 1225 iframe.state = IGN_HEADER_BLOCK; 1226 break; 1227 } 1228 1229 pos += readlen; 1230 iframe.payloadleft -= readlen; 1231 1232 if (rv == ErrorCode.HEADER_COMP) { 1233 /* GOAWAY is already issued */ 1234 if (iframe.payloadleft == 0) { 1235 iframe.reset(); 1236 } else { 1237 busy = true; 1238 iframe.state = IGN_PAYLOAD; 1239 } 1240 break; 1241 } 1242 } else { 1243 pos += readlen; 1244 iframe.payloadleft -= readlen; 1245 } 1246 1247 if (iframe.payloadleft) { 1248 break; 1249 } 1250 1251 if ((iframe.frame.hd.flags & FrameFlags.END_HEADERS) == 0) { 1252 1253 iframe.setMark(FRAME_HDLEN); 1254 1255 iframe.padlen = 0; 1256 1257 if (iframe.state == READ_HEADER_BLOCK) 1258 iframe.state = EXPECT_CONTINUATION; 1259 else 1260 iframe.state = IGN_CONTINUATION; 1261 1262 } else { 1263 if (iframe.state == READ_HEADER_BLOCK) { 1264 rv = afterHeaderBlockReceived(); 1265 if (isFatal(rv)) { 1266 return rv; 1267 } 1268 } 1269 iframe.reset(); 1270 } 1271 break; 1272 } 1273 case IGN_PAYLOAD: 1274 LOGF("recv: [IGN_PAYLOAD]"); 1275 1276 readlen = iframe.readLength(pos, last); 1277 iframe.payloadleft -= readlen; 1278 pos += readlen; 1279 1280 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1281 1282 if (iframe.payloadleft) { 1283 break; 1284 } 1285 1286 switch (iframe.frame.hd.type) { 1287 case FrameType.HEADERS: 1288 case FrameType.PUSH_PROMISE: 1289 case FrameType.CONTINUATION: 1290 /* Mark inflater bad so that we won't perform further decoding */ 1291 hd_inflater.ctx.bad = 1; 1292 break; 1293 default: 1294 break; 1295 } 1296 1297 iframe.reset(); 1298 1299 break; 1300 case FRAME_SIZE_ERROR: 1301 LOGF("recv: [FRAME_SIZE_ERROR]"); 1302 1303 rv = terminateSession(FrameError.FRAME_SIZE_ERROR); 1304 if (isFatal(rv)) { 1305 return rv; 1306 } 1307 1308 busy = true; 1309 1310 iframe.state = IGN_PAYLOAD; 1311 1312 break; 1313 case READ_SETTINGS: 1314 LOGF("recv: [READ_SETTINGS]"); 1315 1316 readlen = iframe.read(pos, last); 1317 iframe.payloadleft -= readlen; 1318 pos += readlen; 1319 1320 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1321 1322 if (iframe.sbuf.markAvailable) { 1323 break; 1324 } 1325 1326 if (readlen > 0) 1327 iframe.unpackSetting(); 1328 1329 if (iframe.payloadleft) { 1330 iframe.setMark(FRAME_SETTINGS_ENTRY_LENGTH); 1331 break; 1332 } 1333 1334 rv = processSettingsFrame(); 1335 1336 if (isFatal(rv)) { 1337 return rv; 1338 } 1339 1340 iframe.reset(); 1341 1342 break; 1343 case READ_GOAWAY_DEBUG: 1344 LOGF("recv: [READ_GOAWAY_DEBUG]"); 1345 1346 readlen = iframe.readLength(pos, last); 1347 1348 iframe.lbuf.last[0 .. readlen] = pos[0 .. readlen]; 1349 iframe.lbuf.last += readlen; 1350 1351 iframe.payloadleft -= readlen; 1352 pos += readlen; 1353 1354 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1355 1356 if (iframe.payloadleft) { 1357 assert(iframe.lbuf.available > 0); 1358 1359 break; 1360 } 1361 1362 rv = processGoAwayFrame(); 1363 1364 if (isFatal(rv)) { 1365 return rv; 1366 } 1367 1368 iframe.reset(); 1369 1370 break; 1371 case EXPECT_CONTINUATION: 1372 case IGN_CONTINUATION: 1373 static if (DEBUG) { 1374 if (iframe.state == EXPECT_CONTINUATION) { 1375 LOGF("recv: [EXPECT_CONTINUATION]"); 1376 } else { 1377 LOGF("recv: [IGN_CONTINUATION]"); 1378 } 1379 } 1380 1381 readlen = iframe.read(pos, last); 1382 pos += readlen; 1383 1384 if (iframe.sbuf.markAvailable) { 1385 return cast(int)(pos - first); 1386 } 1387 1388 cont_hd.unpack(iframe.sbuf.pos); 1389 iframe.payloadleft = cont_hd.length; 1390 1391 LOGF("recv: payloadlen=%d, type=%u, flags=0x%02x, stream_id=%d", cont_hd.length, cont_hd.type, cont_hd.flags, cont_hd.stream_id); 1392 1393 if (cont_hd.type != FrameType.CONTINUATION || 1394 cont_hd.stream_id != iframe.frame.hd.stream_id) { 1395 LOGF("recv: expected stream_id=%d, type=%d, but got stream_id=%d, type=%d", 1396 iframe.frame.hd.stream_id, FrameType.CONTINUATION, cont_hd.stream_id, cont_hd.type); 1397 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "unexpected non-CONTINUATION frame or stream_id is invalid"); 1398 if (isFatal(rv)) { 1399 return rv; 1400 } 1401 1402 busy = true; 1403 1404 iframe.state = IGN_PAYLOAD; 1405 1406 break; 1407 } 1408 1409 /* CONTINUATION won't bear FrameFlags.PADDED flag */ 1410 iframe.frame.hd.flags |= cont_hd.flags & FrameFlags.END_HEADERS; 1411 iframe.frame.hd.length += cont_hd.length; 1412 1413 busy = true; 1414 1415 if (iframe.state == EXPECT_CONTINUATION) { 1416 iframe.state = READ_HEADER_BLOCK; 1417 1418 bool ok = callOnFrameHeader(cont_hd); 1419 1420 if (!ok) { 1421 return ErrorCode.CALLBACK_FAILURE; 1422 } 1423 } else { 1424 iframe.state = IGN_HEADER_BLOCK; 1425 } 1426 1427 break; 1428 case READ_PAD_DATA: 1429 LOGF("recv: [READ_PAD_DATA]"); 1430 1431 readlen = iframe.read(pos, last); 1432 pos += readlen; 1433 iframe.payloadleft -= readlen; 1434 1435 LOGF("recv: readlen=%d, payloadleft=%d, left=%d", readlen, iframe.payloadleft, iframe.sbuf.markAvailable); 1436 1437 if (iframe.sbuf.markAvailable) { 1438 return cast(int)(pos - first); 1439 } 1440 1441 /* Pad Length field is subject to flow control */ 1442 rv = updateRecvConnectionWindowSize(readlen); 1443 if (isFatal(rv)) { 1444 return rv; 1445 } 1446 1447 /* Pad Length field is consumed immediately */ 1448 rv = consume(iframe.frame.hd.stream_id, readlen); 1449 1450 if (isFatal(rv)) { 1451 return rv; 1452 } 1453 1454 stream = getStream(iframe.frame.hd.stream_id); 1455 if (stream) 1456 updateRecvStreamWindowSize(stream, readlen, iframe.payloadleft || (iframe.frame.hd.flags & FrameFlags.END_STREAM) == 0); 1457 1458 busy = true; 1459 1460 padlen = iframe.computePad(); 1461 if (padlen < 0) { 1462 rv = terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "DATA: invalid padding"); 1463 if (isFatal(rv)) { 1464 return rv; 1465 } 1466 iframe.state = IGN_DATA; 1467 break; 1468 } 1469 1470 iframe.frame.data.padlen = padlen; 1471 1472 iframe.state = READ_DATA; 1473 1474 break; 1475 case READ_DATA: 1476 LOGF("recv: [READ_DATA]"); 1477 1478 readlen = iframe.readLength(pos, last); 1479 iframe.payloadleft -= readlen; 1480 pos += readlen; 1481 1482 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1483 1484 if (readlen > 0) { 1485 int data_readlen; 1486 1487 rv = updateRecvConnectionWindowSize(readlen); 1488 if (isFatal(rv)) { 1489 return rv; 1490 } 1491 1492 stream = getStream(iframe.frame.hd.stream_id); 1493 if (stream) 1494 updateRecvStreamWindowSize(stream, readlen, iframe.payloadleft || (iframe.frame.hd.flags & FrameFlags.END_STREAM) == 0); 1495 1496 data_readlen = iframe.effectiveReadLength(iframe.payloadleft, readlen); 1497 1498 padlen = cast(int)(readlen - data_readlen); 1499 1500 if (padlen > 0) { 1501 /* Padding is considered as "consumed" immediately */ 1502 rv = consume(iframe.frame.hd.stream_id, padlen); 1503 1504 if (isFatal(rv)) { 1505 return rv; 1506 } 1507 } 1508 1509 LOGF("recv: data_readlen=%d", data_readlen); 1510 1511 if (stream && data_readlen > 0) { 1512 if (isHTTPMessagingEnabled()) { 1513 if (!stream.onDataChunk(data_readlen)) { 1514 addRstStream(iframe.frame.hd.stream_id, FrameError.PROTOCOL_ERROR); 1515 busy = true; 1516 iframe.state = IGN_DATA; 1517 break; 1518 } 1519 } 1520 1521 ubyte[] data_nopad = cast(ubyte[])(pos - readlen)[0 .. data_readlen]; 1522 FrameFlags flags = iframe.frame.hd.flags; 1523 int stream_id = iframe.frame.hd.stream_id; 1524 bool pause; 1525 bool ok; 1526 try ok = connector.onDataChunk(flags, stream_id, data_nopad, pause); 1527 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 1528 1529 if (pause) { 1530 return cast(int)(pos - first); 1531 } 1532 1533 if (!ok) { 1534 return ErrorCode.CALLBACK_FAILURE; 1535 } 1536 1537 } 1538 } 1539 1540 if (iframe.payloadleft) { 1541 break; 1542 } 1543 1544 rv = processDataFrame(); 1545 if (isFatal(rv)) { 1546 return rv; 1547 } 1548 1549 iframe.reset(); 1550 1551 break; 1552 case IGN_DATA: 1553 LOGF("recv: [IGN_DATA]"); 1554 1555 readlen = iframe.readLength(pos, last); 1556 iframe.payloadleft -= readlen; 1557 pos += readlen; 1558 1559 LOGF("recv: readlen=%d, payloadleft=%d", readlen, iframe.payloadleft); 1560 1561 if (readlen > 0) { 1562 /* Update connection-level flow control window for ignored DATA frame too */ 1563 rv = updateRecvConnectionWindowSize(readlen); 1564 if (isFatal(rv)) { 1565 return rv; 1566 } 1567 1568 if (opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE) { 1569 1570 /* Ignored DATA is considered as "consumed" immediately. */ 1571 rv = updateConnectionConsumedSize(readlen); 1572 1573 if (isFatal(rv)) { 1574 return rv; 1575 } 1576 } 1577 } 1578 1579 if (iframe.payloadleft) { 1580 break; 1581 } 1582 1583 iframe.reset(); 1584 1585 break; 1586 case IGN_ALL: 1587 return cast(int)input.length; 1588 } 1589 1590 if (!busy && pos == last) { 1591 break; 1592 } 1593 1594 busy = false; 1595 } 1596 1597 assert(pos == last); 1598 1599 return cast(int)(pos - first); 1600 } 1601 1602 /** 1603 * Puts back previously deferred DATA frame in the stream |stream_id| 1604 * to the outbound queue. 1605 * 1606 * This function returns 0 if it succeeds, or one of the following 1607 * negative error codes: 1608 * 1609 * $(D ErrorCode.INVALID_ARGUMENT) 1610 * The stream does not exist; or no deferred data exist. 1611 */ 1612 ErrorCode resumeData(int stream_id) 1613 { 1614 Stream stream = getStream(stream_id); 1615 1616 if (!stream || !stream.isItemDeferred()) 1617 return ErrorCode.INVALID_ARGUMENT; 1618 1619 stream.resumeDeferredItem(StreamFlags.DEFERRED_USER, this); 1620 return ErrorCode.OK; 1621 } 1622 1623 /** 1624 * Returns true value if $(D Session) wants to receive data from the 1625 * remote peer. 1626 * 1627 * If both `wantRead()` and `wantWrite()` return false, the application should 1628 * drop the connection. 1629 */ 1630 bool wantRead() 1631 { 1632 size_t num_active_streams; 1633 1634 /* If this flag is set, we don't want to read. The application should drop the connection. */ 1635 if (goaway_flags & GoAwayFlags.TERM_SENT) { 1636 return false; 1637 } 1638 1639 num_active_streams = getNumActiveStreams(); 1640 1641 /* Unless termination GOAWAY is sent or received, we always want to read incoming frames. */ 1642 if (num_active_streams > 0) { 1643 return true; 1644 } 1645 1646 /* If there is no active streams and GOAWAY has been sent or received, we are done with this session. */ 1647 return (goaway_flags & (GoAwayFlags.SENT | GoAwayFlags.RECV)) == 0; 1648 } 1649 1650 /** 1651 * Returns true value if $(D Session) wants to send data to the remote 1652 * peer. 1653 * 1654 * If both `wantRead()` and `wantWrite()` return false, the application should 1655 * drop the connection. 1656 */ 1657 bool wantWrite() 1658 { 1659 /* If these flag is set, we don't want to write any data. The application should drop the connection. */ 1660 if (goaway_flags & GoAwayFlags.TERM_SENT) 1661 { 1662 return false; 1663 } 1664 1665 /* 1666 * Unless termination GOAWAY is sent or received, we want to write 1667 * frames if there is pending ones. If pending frame is request/push 1668 * response HEADERS and concurrent stream limit is reached, we don't 1669 * want to write them. 1670 */ 1671 1672 if (!aob.item && ob_pq.empty && 1673 (ob_da_pq.empty || remote_window_size == 0) && 1674 (ob_ss_pq.empty || isOutgoingConcurrentStreamsMax())) 1675 { 1676 return false; 1677 } 1678 1679 /* If there is no active streams and GOAWAY has been sent or received, we are done with this session. */ 1680 return (goaway_flags & (GoAwayFlags.SENT | GoAwayFlags.RECV)) == 0; 1681 } 1682 1683 /** 1684 * Returns stream_user_data for the stream |stream_id|. The 1685 * stream_user_data is provided by `submitRequest()`, 1686 * `submitHeaders()` or `setStreamUserData()`. 1687 * Unless it is set using `setStreamUserData()`, if the stream is 1688 * initiated by the remote endpoint, stream_user_data is always 1689 * `null`. If the stream does not exist, this function returns 1690 * `null`. 1691 */ 1692 void* getStreamUserData(int stream_id) { 1693 Stream stream = getStream(stream_id); 1694 if (stream) { 1695 return stream.userData; 1696 } else { 1697 return null; 1698 } 1699 } 1700 1701 /** 1702 * Sets the |stream_user_data| to the stream denoted by the 1703 * |stream_id|. If a stream user data is already set to the stream, 1704 * it is replaced with the |stream_user_data|. It is valid to specify 1705 * `null` in the |stream_user_data|, which nullifies the associated 1706 * data pointer. 1707 * 1708 * It is valid to set the |stream_user_data| to the stream reserved by 1709 * PUSH_PROMISE frame. 1710 * 1711 * This function returns 0 if it succeeds, or one of following 1712 * negative error codes: 1713 * 1714 * $(D ErrorCode.INVALID_ARGUMENT) 1715 * The stream does not exist 1716 */ 1717 ErrorCode setStreamUserData(int stream_id, void* stream_user_data){ 1718 Stream stream = getStream(stream_id); 1719 if (!stream) 1720 return ErrorCode.INVALID_ARGUMENT; 1721 stream.userData = stream_user_data; 1722 return ErrorCode.OK; 1723 } 1724 1725 /** 1726 * Returns the number of frames in the outbound queue. This does not 1727 * include the deferred DATA frames. 1728 */ 1729 size_t getOutboundQueueSize() { 1730 return ob_pq.length + ob_ss_pq.length + ob_da_pq.length; 1731 } 1732 1733 /** 1734 * Returns the number of DATA payload in bytes received without 1735 * WINDOW_UPDATE transmission for the stream |stream_id|. The local 1736 * (receive) window size can be adjusted by 1737 * $(D submitWindowUpdate). This function takes into account 1738 * that and returns effective data length. In particular, if the 1739 * local window size is reduced by submitting negative 1740 * window_size_increment with $(D submitWindowUpdate), this 1741 * function returns the number of bytes less than actually received. 1742 * 1743 * This function returns -1 if it fails. 1744 */ 1745 int getStreamEffectiveRecvDataLength(int stream_id) 1746 { 1747 Stream stream = getStream(stream_id); 1748 if (!stream) 1749 return -1; 1750 return stream.recvWindowSize < 0 ? 0 : stream.recvWindowSize; 1751 } 1752 1753 /** 1754 * Returns the local (receive) window size for the stream |stream_id|. 1755 * The local window size can be adjusted by 1756 * $(D submitWindowUpdate). This function takes into account 1757 * that and returns effective window size. 1758 * 1759 * This function returns -1 if it fails. 1760 */ 1761 int getStreamEffectiveLocalWindowSize(int stream_id) 1762 { 1763 Stream stream = getStream(stream_id); 1764 if (!stream) 1765 return -1; 1766 return stream.localWindowSize; 1767 } 1768 1769 /** 1770 * Returns the number of DATA payload in bytes received without 1771 * WINDOW_UPDATE transmission for a connection. The local (receive) 1772 * window size can be adjusted by $(D submitWindowUpdate). 1773 * This function takes into account that and returns effective data 1774 * length. In particular, if the local window size is reduced by 1775 * submitting negative window_size_increment with 1776 * $(D submitWindowUpdate), this function returns the number 1777 * of bytes less than actually received. 1778 * 1779 * This function returns -1 if it fails. 1780 */ 1781 int getEffectiveRecvDataLength() 1782 { 1783 return recv_window_size < 0 ? 0 : recv_window_size; 1784 } 1785 1786 /** 1787 * Returns the local (receive) window size for a connection. The 1788 * local window size can be adjusted by 1789 * $(D submitWindowUpdate). This function takes into account 1790 * that and returns effective window size. 1791 * 1792 * This function returns -1 if it fails. 1793 */ 1794 int getEffectiveLocalWindowSize() 1795 { 1796 return local_window_size; 1797 } 1798 1799 /** 1800 * Returns the remote window size for a given stream |stream_id|. 1801 * 1802 * This is the amount of flow-controlled payload (e.g., DATA) that the 1803 * local endpoint can send without stream level WINDOW_UPDATE. There 1804 * is also connection level flow control, so the effective size of 1805 * payload that the local endpoint can actually send is 1806 * min(getStreamRemoteWindowSize(), getRemoteWindowSize()). 1807 * 1808 * This function returns -1 if it fails. 1809 */ 1810 int getStreamRemoteWindowSize(int stream_id) 1811 { 1812 Stream stream = getStream(stream_id); 1813 if (!stream) 1814 return -1; 1815 1816 /* stream.remoteWindowSize can be negative when Setting.INITIAL_WINDOW_SIZE is changed. */ 1817 return max(0, stream.remoteWindowSize); 1818 } 1819 1820 /** 1821 * Returns the remote window size for a connection. 1822 * 1823 * This function always succeeds. 1824 */ 1825 int getRemoteWindowSize() { 1826 return remote_window_size; 1827 } 1828 1829 /** 1830 * Returns 1 if local peer half closed the given stream |stream_id|. 1831 * Returns 0 if it did not. Returns -1 if no such stream exists. 1832 */ 1833 int getStreamLocalClose(int stream_id) 1834 { 1835 Stream stream = getStream(stream_id); 1836 1837 if (!stream) 1838 return -1; 1839 1840 return (stream.shutFlags & ShutdownFlag.WR) != 0; 1841 } 1842 1843 /** 1844 * Returns 1 if remote peer half closed the given stream |stream_id|. 1845 * Returns 0 if it did not. Returns -1 if no such stream exists. 1846 */ 1847 int getStreamRemoteClose(int stream_id) 1848 { 1849 Stream stream = getStream(stream_id); 1850 1851 if (!stream) 1852 return -1; 1853 1854 return (stream.shutFlags & ShutdownFlag.RD) != 0; 1855 } 1856 1857 /** 1858 * Signals the session so that the connection should be terminated. 1859 * 1860 * The last stream ID is the minimum value between the stream ID of a 1861 * stream for which $(D Connector.onFrame) was called 1862 * most recently and the last stream ID we have sent to the peer 1863 * previously. 1864 * 1865 * The |error_code| is the error code of this GOAWAY frame. The 1866 * pre-defined error code is one of $(D FrameError). 1867 * 1868 * After the transmission, both `wantRead()` and 1869 * `wantWrite()` return 0. 1870 * 1871 * This function should be called when the connection should be 1872 * terminated after sending GOAWAY. If the remaining streams should 1873 * be processed after GOAWAY, use `submitGoAway()` instead. 1874 */ 1875 ErrorCode terminateSession(FrameError error_code) 1876 { 1877 return terminateSession(last_proc_stream_id, error_code, null); 1878 } 1879 1880 1881 /** 1882 * Signals the session so that the connection should be terminated. 1883 * 1884 * This function behaves like $(D Session.terminateSession), 1885 * but the last stream ID can be specified by the application for fine 1886 * grained control of stream. The HTTP/2 specification does not allow 1887 * last_stream_id to be increased. So the actual value sent as 1888 * last_stream_id is the minimum value between the given 1889 * |last_stream_id| and the last_stream_id we have previously sent to 1890 * the peer. 1891 * 1892 * The |last_stream_id| is peer's stream ID or 0. So if $(D Session) is 1893 * initialized as client, |last_stream_id| must be even or 0. If 1894 * $(D Session) is initialized as server, |last_stream_id| must be odd or 1895 * 0. 1896 * 1897 * This function returns 0 if it succeeds, or one of the following 1898 * negative error codes: 1899 * 1900 * $(D ErrorCode.INVALID_ARGUMENT) 1901 * The |last_stream_id| is invalid. 1902 */ 1903 ErrorCode terminateSession(int last_stream_id, FrameError error_code) { 1904 return terminateSession(last_stream_id, error_code, null); 1905 } 1906 1907 /** 1908 * Returns the value of SETTINGS |id| notified by a remote endpoint. 1909 * The |id| must be one of values defined in $(D SettingsID). 1910 */ 1911 uint getRemoteSettings(SettingsID id) { 1912 switch (id) { 1913 case Setting.HEADER_TABLE_SIZE: 1914 return remote_settings.header_table_size; 1915 case Setting.ENABLE_PUSH: 1916 return remote_settings.enable_push; 1917 case Setting.MAX_CONCURRENT_STREAMS: 1918 return remote_settings.max_concurrent_streams; 1919 case Setting.INITIAL_WINDOW_SIZE: 1920 return remote_settings.initial_window_size; 1921 case Setting.MAX_FRAME_SIZE: 1922 return remote_settings.max_frame_size; 1923 case Setting.MAX_HEADER_LIST_SIZE: 1924 return remote_settings.max_header_list_size; 1925 default: return -1; 1926 } 1927 } 1928 1929 /** 1930 * Tells the $(D Session) that next stream ID is |next_stream_id|. The 1931 * |next_stream_id| must be equal or greater than the value returned 1932 * by `getNextStreamID()`. 1933 * 1934 * This function returns 0 if it succeeds, or one of the following 1935 * negative error codes: 1936 * 1937 * $(D ErrorCode.INVALID_ARGUMENT) 1938 * The |next_stream_id| is strictly less than the value 1939 * `getNextStreamID()` returns. 1940 */ 1941 ErrorCode setNextStreamID(int _next_stream_id) 1942 { 1943 if (_next_stream_id < 0 || next_stream_id > cast(uint)next_stream_id) { 1944 return ErrorCode.INVALID_ARGUMENT; 1945 } 1946 1947 next_stream_id = _next_stream_id; 1948 1949 return ErrorCode.OK; 1950 } 1951 1952 /** 1953 * Returns the next outgoing stream ID. Notice that return type is 1954 * uint. If we run out of stream ID for this session, this 1955 * function returns 1 << 31. 1956 */ 1957 uint getNextStreamID() 1958 { 1959 return next_stream_id; 1960 } 1961 1962 /** 1963 * Tells the $(D Session) that |size| bytes for a stream denoted by 1964 * |stream_id| were consumed by application and are ready to 1965 * WINDOW_UPDATE. The consumed bytes are counted towards both connection and 1966 * stream level WINDOW_UPDATE (see `consumeConnection` and `consumeStream`). 1967 * This function is intended to be used without 1968 * automatic window update (see $(D Options.setNoAutoWindowUpdate). 1969 * 1970 * This function returns 0 if it succeeds, or one of the following 1971 * negative error codes: 1972 * 1973 * $(D ErrorCode.INVALID_ARGUMENT) 1974 * The |stream_id| is 0. 1975 * $(D ErrorCode.INVALID_STATE) 1976 * Automatic WINDOW_UPDATE is not disabled. 1977 */ 1978 ErrorCode consume(int stream_id, size_t size) { 1979 ErrorCode rv; 1980 Stream stream; 1981 1982 if (stream_id == 0) { 1983 return ErrorCode.INVALID_ARGUMENT; 1984 } 1985 1986 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) { 1987 return ErrorCode.INVALID_STATE; 1988 } 1989 1990 rv = updateConnectionConsumedSize(size); 1991 1992 if (isFatal(rv)) { 1993 return rv; 1994 } 1995 1996 stream = getStream(stream_id); 1997 1998 if (!stream) 1999 return ErrorCode.OK; 2000 2001 rv = updateStreamConsumedSize(stream, size); 2002 2003 if (isFatal(rv)) { 2004 return rv; 2005 } 2006 2007 return ErrorCode.OK; 2008 } 2009 2010 /** 2011 * Like $(D consume), but this only tells library that 2012 * |size| bytes were consumed only for connection level. Note that 2013 * HTTP/2 maintains connection and stream level flow control windows 2014 * independently. 2015 * 2016 * This function returns 0 if it succeeds, or one of the following 2017 * negative error codes: 2018 * 2019 * $(D ErrorCode.INVALID_STATE) 2020 * Automatic WINDOW_UPDATE is not disabled. 2021 */ 2022 ErrorCode consumeConnection(size_t size) 2023 { 2024 ErrorCode rv; 2025 2026 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) 2027 return ErrorCode.INVALID_STATE; 2028 rv = updateConnectionConsumedSize(size); 2029 if (isFatal(rv)) 2030 return rv; 2031 return ErrorCode.OK; 2032 } 2033 2034 /** 2035 * @function 2036 * 2037 * Like $(D consume), but this only tells library that 2038 * |size| bytes were consumed only for stream denoted by |stream_id|. 2039 * Note that HTTP/2 maintains connection and stream level flow control 2040 * windows independently. 2041 * 2042 * This function returns 0 if it succeeds, or one of the following 2043 * negative error codes: 2044 * 2045 * $(D ErrorCode.INVALID_ARGUMENT) 2046 * The |stream_id| is 0. 2047 * $(D ErrorCode.INVALID_STATE) 2048 * Automatic WINDOW_UPDATE is not disabled. 2049 */ 2050 ErrorCode consumeStream(int stream_id, size_t size) 2051 { 2052 ErrorCode rv; 2053 Stream stream; 2054 2055 if (stream_id == 0) 2056 return ErrorCode.INVALID_ARGUMENT; 2057 2058 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) 2059 return ErrorCode.INVALID_STATE; 2060 2061 stream = getStream(stream_id); 2062 2063 if (!stream) 2064 return ErrorCode.OK; 2065 2066 rv = updateStreamConsumedSize(stream, size); 2067 2068 if (isFatal(rv)) { 2069 return rv; 2070 } 2071 2072 return ErrorCode.OK; 2073 2074 } 2075 2076 /** 2077 * Performs post-process of HTTP Upgrade request. This function can 2078 * be called from both client and server, but the behavior is very 2079 * different in each other. 2080 * 2081 * If called from client side, the |settings_payload| must be the 2082 * value sent in `HTTP2-Settings` header field and must be decoded 2083 * by base64url decoder. The |settings_payload| is unpacked and its 2084 * setting values will be submitted using $(D submitSettings). 2085 * This means that the client application code does not need to submit 2086 * SETTINGS by itself. The stream with stream ID=1 is opened and the 2087 * |stream_user_data| is used for its stream_user_data. The opened 2088 * stream becomes half-closed (local) state. 2089 * 2090 * If called from server side, the |settings_payload| must be the 2091 * value received in `HTTP2-Settings` header field and must be 2092 * decoded by base64url decoder. It is treated as if the SETTINGS 2093 * frame with that payload is received. Thus, callback functions for 2094 * the reception of SETTINGS frame will be invoked. The stream with 2095 * stream ID=1 is opened. The |stream_user_data| is ignored. The 2096 * opened stream becomes half-closed (remote). 2097 * 2098 * This function returns 0 if it succeeds, or one of the following 2099 * negative error codes: 2100 * 2101 * $(D ErrorCode.INVALID_ARGUMENT) 2102 * The |settings_payload| is badly formed. 2103 * $(D ErrorCode.PROTO) 2104 * The stream ID 1 is already used or closed; or is not available. 2105 */ 2106 ErrorCode upgrade(in ubyte[] settings_payload, void* stream_user_data = null) 2107 { 2108 Stream stream; 2109 Frame frame; 2110 Setting[] iva; 2111 ErrorCode rv; 2112 PrioritySpec pri_spec; 2113 2114 if ((!is_server && next_stream_id != 1) || (is_server && last_recv_stream_id >= 1)) { 2115 return ErrorCode.PROTO; 2116 } 2117 2118 if (settings_payload.length % FRAME_SETTINGS_ENTRY_LENGTH) { 2119 return ErrorCode.INVALID_ARGUMENT; 2120 } 2121 2122 Settings.unpack(iva, settings_payload); 2123 2124 if (is_server) { 2125 frame.hd = FrameHeader(cast(uint)settings_payload.length, FrameType.SETTINGS, FrameFlags.NONE, 0); 2126 frame.settings.iva = iva; 2127 rv = onSettings(frame, true /* No ACK */); 2128 } else { 2129 rv = submitSettings(this, iva); 2130 } 2131 2132 if (iva) Mem.free(iva); 2133 2134 stream = openStream(1, StreamFlags.NONE, pri_spec, StreamState.OPENING, is_server ? null : stream_user_data); 2135 2136 if (is_server) 2137 { 2138 stream.shutdown(ShutdownFlag.RD); 2139 last_recv_stream_id = 1; 2140 last_proc_stream_id = 1; 2141 } else { 2142 stream.shutdown(ShutdownFlag.WR); 2143 next_stream_id += 2; 2144 } 2145 2146 return ErrorCode.OK; 2147 } 2148 2149 /* 2150 * Returns true if |stream_id| is initiated by local endpoint. 2151 */ 2152 bool isMyStreamId(int stream_id) 2153 { 2154 int rem; 2155 if (stream_id == 0) { 2156 return false; 2157 } 2158 rem = stream_id & 0x1; 2159 if (is_server) { 2160 return rem == 0; 2161 } 2162 return rem == 1; 2163 } 2164 2165 /* 2166 * Adds |item| to the outbound queue in $(D Session). When this function 2167 * succeeds, it takes ownership of |item|. So caller must not free it 2168 * on success. 2169 * 2170 * This function returns 0 if it succeeds, or one of the following 2171 * negative error codes: 2172 * 2173 * ErrorCode.STREAM_CLOSED 2174 * Stream already closed (DATA frame only) 2175 * 2176 * ErrorCode.DATA_EXIST 2177 */ 2178 ErrorCode addItem(OutboundItem item) 2179 { 2180 /* TODO: Return error if stream is not found for the frame requiring stream presence. */ 2181 Stream stream = getStream(item.frame.hd.stream_id); 2182 Frame* frame = &item.frame; 2183 2184 if (frame.hd.type != FrameType.DATA) { 2185 switch (frame.hd.type) { 2186 case FrameType.RST_STREAM: 2187 if (stream) 2188 stream.state = StreamState.CLOSING; 2189 break; 2190 case FrameType.SETTINGS: 2191 item.weight = OB_SETTINGS_WEIGHT; 2192 break; 2193 case FrameType.PING: 2194 /* Ping has highest priority. */ 2195 item.weight = OB_PING_WEIGHT; 2196 break; 2197 default: 2198 break; 2199 } 2200 2201 if (frame.hd.type == FrameType.HEADERS) { 2202 /* We push request HEADERS and push response HEADERS to 2203 dedicated queue because their transmission is affected by 2204 Setting.MAX_CONCURRENT_STREAMS */ 2205 /* TODO: If 2 HEADERS are submitted for reserved stream, then 2206 both of them are queued into ob_ss_pq, which is not 2207 desirable. */ 2208 if (frame.headers.cat == HeadersCategory.REQUEST) { 2209 ob_ss_pq.push(item); 2210 item.queued = 1; 2211 } else if (stream && (stream.state == StreamState.RESERVED || item.aux_data.headers.attach_stream)) { 2212 item.weight = stream.effectiveWeight; 2213 item.cycle = last_cycle; 2214 stream.attachItem(item, this); 2215 } else { 2216 ob_pq.push(item); 2217 item.queued = 1; 2218 } 2219 } else { 2220 ob_pq.push(item); 2221 item.queued = 1; 2222 } 2223 2224 return ErrorCode.OK; 2225 } 2226 2227 if (!stream) { 2228 return ErrorCode.STREAM_CLOSED; 2229 } 2230 2231 if (stream.item) { 2232 return ErrorCode.DATA_EXIST; 2233 } 2234 2235 item.weight = stream.effectiveWeight; 2236 item.cycle = last_cycle; 2237 2238 stream.attachItem(item, this); 2239 2240 return ErrorCode.OK; 2241 } 2242 2243 /* 2244 * Adds RST_STREAM frame for the stream |stream_id| with the error 2245 * code |error_code|. This is a convenient function built on top of 2246 * $(D Session.addFrame) to add RST_STREAM easily. 2247 * 2248 * This function simply returns without adding RST_STREAM frame if 2249 * given stream is in StreamState.CLOSING state, because multiple 2250 * RST_STREAM for a stream is redundant. 2251 */ 2252 void addRstStream(int stream_id, FrameError error_code) 2253 { 2254 ErrorCode rv; 2255 OutboundItem item; 2256 Frame* frame; 2257 Stream stream; 2258 2259 stream = getStream(stream_id); 2260 if (stream && stream.state == StreamState.CLOSING) 2261 return; 2262 2263 /* Cancel pending request HEADERS in ob_ss_pq if this RST_STREAM refers to that stream. */ 2264 if (!is_server && isMyStreamId(stream_id) && ob_ss_pq.top) 2265 { 2266 OutboundItem top; 2267 Frame* headers_frame; 2268 2269 top = ob_ss_pq.top; 2270 headers_frame = &top.frame; 2271 2272 assert(headers_frame.hd.type == FrameType.HEADERS); 2273 2274 if (headers_frame.hd.stream_id <= stream_id && cast(uint)stream_id < next_stream_id) 2275 { 2276 foreach (OutboundItem item; ob_ss_pq) { 2277 2278 HeadersAuxData* aux_data = &item.aux_data.headers; 2279 2280 if (item.frame.hd.stream_id != stream_id || aux_data.canceled) 2281 { 2282 continue; 2283 } 2284 2285 aux_data.error_code = error_code; 2286 aux_data.canceled = 1; 2287 return; 2288 } 2289 } 2290 } 2291 2292 item = Mem.alloc!OutboundItem(this); 2293 frame = &item.frame; 2294 2295 frame.rst_stream = RstStream(stream_id, error_code); 2296 addItem(item); 2297 } 2298 2299 /* 2300 * Adds PING frame. This is a convenient functin built on top of 2301 * $(D Session.addFrame) to add PING easily. 2302 * 2303 * If the |opaque_data| is not null, it must point to 8 bytes memory 2304 * region of data. The data pointed by |opaque_data| is copied. It can 2305 * be null. In this case, 8 bytes null is used. 2306 * 2307 */ 2308 void addPing(FrameFlags flags, in ubyte[] opaque_data) 2309 { 2310 ErrorCode rv; 2311 OutboundItem item; 2312 Frame* frame; 2313 2314 item = Mem.alloc!OutboundItem(this); 2315 2316 frame = &item.frame; 2317 2318 frame.ping = Ping(flags, opaque_data); 2319 2320 addItem(item); 2321 } 2322 2323 /* 2324 * Adds GOAWAY frame with the last-stream-ID |last_stream_id| and the 2325 * error code |error_code|. This is a convenient function built on top 2326 * of $(D Session.addFrame) to add GOAWAY easily. The 2327 * |aux_flags| are bitwise-OR of one or more of 2328 * GoAwayAuxFlags. 2329 * 2330 * This function returns 0 if it succeeds, or one of the following 2331 * negative error codes: 2332 * 2333 * ErrorCode.INVALID_ARGUMENT 2334 * The |opaque_data_len| is too large. 2335 */ 2336 ErrorCode addGoAway(int last_stream_id, FrameError error_code, in string opaque_data, GoAwayAuxFlags aux_flags) { 2337 ErrorCode rv; 2338 OutboundItem item; 2339 Frame* frame; 2340 string opaque_data_copy; 2341 GoAwayAuxData* aux_data; 2342 2343 if (isMyStreamId(last_stream_id)) { 2344 return ErrorCode.INVALID_ARGUMENT; 2345 } 2346 2347 if (opaque_data) { 2348 if (opaque_data.length + 8 > MAX_PAYLOADLEN) { 2349 return ErrorCode.INVALID_ARGUMENT; 2350 } 2351 opaque_data_copy = cast(string)Mem.copy(opaque_data); 2352 } 2353 2354 item = Mem.alloc!OutboundItem(this); 2355 2356 frame = &item.frame; 2357 2358 /* last_stream_id must not be increased from the value previously sent */ 2359 last_stream_id = min(last_stream_id, local_last_stream_id); 2360 2361 frame.goaway = GoAway(last_stream_id, error_code, opaque_data_copy); 2362 2363 aux_data = &item.aux_data.goaway; 2364 aux_data.flags = aux_flags; 2365 2366 addItem(item); 2367 return ErrorCode.OK; 2368 } 2369 /* 2370 * Adds WINDOW_UPDATE frame with stream ID |stream_id| and 2371 * window-size-increment |window_size_increment|. This is a convenient 2372 * function built on top of $(D Session.addFrame) to add 2373 * WINDOW_UPDATE easily. 2374 */ 2375 void addWindowUpdate(FrameFlags flags, int stream_id, int window_size_increment) { 2376 ErrorCode rv; 2377 OutboundItem item; 2378 Frame* frame; 2379 2380 item = Mem.alloc!OutboundItem(this); 2381 frame = &item.frame; 2382 2383 frame.window_update = WindowUpdate(flags, stream_id, window_size_increment); 2384 2385 addItem(item); 2386 } 2387 2388 /* 2389 * Adds SETTINGS frame. 2390 */ 2391 ErrorCode addSettings(FrameFlags flags, in Setting[] iva) 2392 { 2393 OutboundItem item; 2394 Frame* frame; 2395 Setting[] iva_copy; 2396 size_t i; 2397 2398 if (flags & FrameFlags.ACK) { 2399 if (iva.length != 0) { 2400 return ErrorCode.INVALID_ARGUMENT; 2401 } 2402 } 2403 else if (inflight_iva.length != 0) 2404 return ErrorCode.TOO_MANY_INFLIGHT_SETTINGS; 2405 2406 if (!iva.check()) 2407 return ErrorCode.INVALID_ARGUMENT; 2408 2409 item = Mem.alloc!OutboundItem(this); 2410 scope(failure) Mem.free(item); 2411 2412 if (iva.length > 0) 2413 iva_copy = iva.copy(); 2414 else 2415 iva_copy = null; 2416 2417 scope(failure) if(iva_copy) Mem.free(iva_copy); 2418 2419 if ((flags & FrameFlags.ACK) == 0) { 2420 if (iva.length > 0) 2421 inflight_iva = iva.copy(); 2422 else 2423 inflight_iva = null; 2424 2425 } 2426 2427 frame = &item.frame; 2428 2429 frame.settings = Settings(flags, iva_copy); 2430 2431 addItem(item); 2432 2433 /* Extract Setting.MAX_CONCURRENT_STREAMS and ENABLE_PUSH here. We use it to refuse the 2434 * incoming stream and PUSH_PROMISE with RST_STREAM. */ 2435 foreach_reverse(ref iv; iva) 2436 { 2437 if (iv.id == Setting.MAX_CONCURRENT_STREAMS) { 2438 pending_local_max_concurrent_stream = iv.value; 2439 break; 2440 } 2441 } 2442 foreach_reverse(ref iv; iva) 2443 { 2444 if (iv.id == Setting.ENABLE_PUSH) { 2445 pending_enable_push = iv.value>0; 2446 break; 2447 } 2448 } 2449 2450 return ErrorCode.OK; 2451 } 2452 2453 /** 2454 * Creates new stream in $(D Session) with stream ID |stream_id|, 2455 * priority |pri_spec| and flags |flags|. The |flags| is bitwise OR 2456 * of StreamFlags. Since this function is called when initial 2457 * HEADERS is sent or received, these flags are taken from it. The 2458 * state of stream is set to |initial_state|. The |stream_user_data| 2459 * is a pointer to the arbitrary user supplied data to be associated 2460 * to this stream. 2461 * 2462 * If |initial_state| is StreamState.RESERVED, this function sets the 2463 * StreamFlags.PUSH flag. 2464 * 2465 * This function returns a pointer to created new stream object. 2466 */ 2467 Stream openStream(int stream_id, StreamFlags flags, PrioritySpec pri_spec_in, StreamState initial_state, void *stream_user_data = null) 2468 { 2469 ErrorCode rv; 2470 Stream stream; 2471 Stream dep_stream; 2472 Stream root_stream; 2473 bool stream_alloc; 2474 PrioritySpec pri_spec_default; 2475 PrioritySpec pri_spec = pri_spec_in; 2476 stream = getStreamRaw(stream_id); 2477 2478 if (stream) { 2479 assert(stream.state == StreamState.IDLE); 2480 assert(stream.inDepTree()); 2481 detachIdleStream(stream); 2482 stream.remove(); 2483 } else { 2484 if (is_server && initial_state != StreamState.IDLE && !isMyStreamId(stream_id)) 2485 adjustClosedStream(1); 2486 stream_alloc = true; 2487 } 2488 2489 if (pri_spec.stream_id != 0) { 2490 dep_stream = getStreamRaw(pri_spec.stream_id); 2491 if (is_server && !dep_stream && idleStreamDetect(pri_spec.stream_id)) 2492 { 2493 /* Depends on idle stream, which does not exist in memory. Assign default priority for it. */ 2494 dep_stream = openStream(pri_spec.stream_id, StreamFlags.NONE, pri_spec_default, StreamState.IDLE, null); 2495 } else if (!dep_stream || !dep_stream.inDepTree()) { 2496 /* If dep_stream is not part of dependency tree, stream will get default priority. */ 2497 pri_spec = pri_spec_default; 2498 } 2499 } 2500 2501 if (initial_state == StreamState.RESERVED) 2502 flags |= StreamFlags.PUSH; 2503 2504 if (stream_alloc) 2505 stream = Mem.alloc!Stream(stream_id, flags, initial_state, pri_spec.weight, roots, 2506 remote_settings.initial_window_size, local_settings.initial_window_size, stream_user_data); 2507 else 2508 stream.initialize(stream_id, flags, initial_state, pri_spec.weight, roots, 2509 remote_settings.initial_window_size, local_settings.initial_window_size, stream_user_data); 2510 scope(failure) if (stream_alloc) Mem.free(stream); 2511 2512 if (stream_alloc) 2513 streams[stream_id] = stream; 2514 scope(failure) if (stream_alloc) streams.remove(stream_id); 2515 2516 switch (initial_state) { 2517 case StreamState.RESERVED: 2518 if (isMyStreamId(stream_id)) { 2519 /* half closed (remote) */ 2520 stream.shutdown(ShutdownFlag.RD); 2521 } else { 2522 /* half closed (local) */ 2523 stream.shutdown(ShutdownFlag.WR); 2524 } 2525 /* Reserved stream does not count in the concurrent streams limit. That is one of the DOS vector. */ 2526 break; 2527 case StreamState.IDLE: 2528 /* Idle stream does not count toward the concurrent streams limit. This is used as anchor node in dependency tree. */ 2529 assert(is_server); 2530 keepIdleStream(stream); 2531 break; 2532 default: 2533 if (isMyStreamId(stream_id)) { 2534 ++num_outgoing_streams; 2535 } else { 2536 ++num_incoming_streams; 2537 } 2538 } 2539 2540 /* We don't have to track dependency of received reserved stream */ 2541 if (stream.shutFlags & ShutdownFlag.WR) 2542 return stream; 2543 2544 if (pri_spec.stream_id == 0) 2545 { 2546 2547 ++roots.num_streams; 2548 2549 if (pri_spec.exclusive && roots.num_streams <= MAX_DEP_TREE_LENGTH) 2550 stream.makeTopmostRoot(this); 2551 else 2552 roots.add(stream); 2553 2554 return stream; 2555 } 2556 2557 2558 /* TODO Client does not have to track dependencies of streams except 2559 for those which have upload data. Currently, we just track 2560 everything. */ 2561 assert(dep_stream); 2562 2563 root_stream = dep_stream.getRoot(); 2564 2565 if (root_stream.subStreams < MAX_DEP_TREE_LENGTH) { 2566 if (pri_spec.exclusive) { 2567 dep_stream.insert(stream); 2568 } else { 2569 dep_stream.add(stream); 2570 } 2571 } else { 2572 stream.weight = DEFAULT_WEIGHT; 2573 roots.add(stream); 2574 } 2575 2576 return stream; 2577 } 2578 2579 /* 2580 * Closes stream whose stream ID is |stream_id|. The reason of closure 2581 * is indicated by the |error_code|. When closing the stream, 2582 * $(D Connector.onStreamExit) will be called. 2583 * 2584 * If the session is initialized as server and |stream| is incoming 2585 * stream, stream is just marked closed and this function calls 2586 * keepClosedStream() with |stream|. Otherwise, 2587 * |stream| will be deleted from memory. 2588 * 2589 * This function returns 0 if it succeeds, or one the following 2590 * negative error codes: 2591 * 2592 * ErrorCode.INVALID_ARGUMENT 2593 * The specified stream does not exist. 2594 * ErrorCode.CALLBACK_FAILURE 2595 * The callback function failed. 2596 */ 2597 ErrorCode closeStream(int stream_id, FrameError error_code) 2598 { 2599 Stream stream = getStream(stream_id); 2600 2601 if (!stream) { 2602 return ErrorCode.INVALID_ARGUMENT; 2603 } 2604 2605 LOGF("stream: stream(%s=%d close", stream, stream.id); 2606 2607 if (stream.item) { 2608 OutboundItem item = stream.item; 2609 2610 stream.detachItem(this); 2611 2612 /* If item is queued, it will be deleted when it is popped 2613 (prepareFrame() will fail). If aob.item 2614 points to this item, let active_outbound_item_reset() 2615 free the item. */ 2616 if (!item.queued && item != aob.item) { 2617 item.free(); 2618 Mem.free(item); 2619 } 2620 } 2621 2622 /* We call $(D Connector.onStreamExit) even if stream.state is 2623 StreamState.INITIAL. This will happen while sending request 2624 HEADERS, a local endpoint receives RST_STREAM for that stream. It 2625 may be PROTOCOL_ERROR, but without notifying stream closure will 2626 hang the stream in a local endpoint. 2627 */ 2628 try 2629 if (!connector.onStreamExit(stream_id, error_code)) 2630 return ErrorCode.CALLBACK_FAILURE; 2631 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 2632 2633 /* pushed streams which is not opened yet is not counted toward max concurrent limits */ 2634 if ((stream.flags & StreamFlags.PUSH) == 0) { 2635 if (isMyStreamId(stream_id)) { 2636 --num_outgoing_streams; 2637 } else { 2638 --num_incoming_streams; 2639 } 2640 } 2641 2642 /* Closes both directions just in case they are not closed yet */ 2643 stream.flags = cast(StreamFlags)(stream.flags | StreamFlags.CLOSED); 2644 2645 if (is_server && stream.inDepTree()) 2646 { 2647 /* On server side, retain stream at most MAX_CONCURRENT_STREAMS 2648 combined with the current active incoming streams to make 2649 dependency tree work better. */ 2650 keepClosedStream(stream); 2651 } else { 2652 destroyStream(stream); 2653 } 2654 return ErrorCode.OK; 2655 } 2656 2657 /* 2658 * Deletes |stream| from memory. After this function returns, stream 2659 * cannot be accessed. 2660 * 2661 */ 2662 void destroyStream(Stream stream) 2663 { 2664 LOGF("stream: destroy closed stream(%s=%d", stream, stream.id); 2665 2666 stream.remove(); 2667 2668 streams.remove(stream.id); 2669 stream.free(); 2670 Mem.free(stream); 2671 } 2672 2673 /* 2674 * Tries to keep incoming closed stream |stream|. Due to the 2675 * limitation of maximum number of streams in memory, |stream| is not 2676 * closed and just deleted from memory (see destroyStream). 2677 */ 2678 void keepClosedStream(Stream stream) 2679 { 2680 LOGF("stream: keep closed stream(%s=%d, state=%d", stream, stream.id, stream.state); 2681 2682 if (closed_stream_tail) { 2683 closed_stream_tail.closedNext = stream; 2684 stream.closedPrev = closed_stream_tail; 2685 } else { 2686 closed_stream_head = stream; 2687 } 2688 closed_stream_tail = stream; 2689 2690 ++num_closed_streams; 2691 2692 adjustClosedStream(0); 2693 } 2694 2695 /* 2696 * Appends |stream| to linked list |session.idle_stream_head|. We 2697 * apply fixed limit for list size. To fit into that limit, one or 2698 * more oldest streams are removed from list as necessary. 2699 */ 2700 void keepIdleStream(Stream stream) 2701 { 2702 LOGF("stream: keep idle stream(%s=%d, state=%d", stream, stream.id, stream.state); 2703 2704 if (idle_stream_tail) { 2705 idle_stream_tail.closedNext = stream; 2706 stream.closedPrev = idle_stream_tail; 2707 } else { 2708 idle_stream_head = stream; 2709 } 2710 idle_stream_tail = stream; 2711 2712 ++num_idle_streams; 2713 2714 adjustIdleStream(); 2715 } 2716 2717 /* 2718 * Detaches |stream| from idle streams linked list. 2719 */ 2720 2721 void detachIdleStream(Stream stream) 2722 { 2723 Stream prev_stream; 2724 Stream next_stream; 2725 2726 LOGF("stream: detach idle stream(%s=%d, state=%d", stream, stream.id, stream.state); 2727 2728 prev_stream = stream.closedPrev; 2729 next_stream = stream.closedNext; 2730 2731 if (prev_stream) { 2732 prev_stream.closedNext = next_stream; 2733 } else { 2734 idle_stream_head = next_stream; 2735 } 2736 2737 if (next_stream) { 2738 next_stream.closedPrev = prev_stream; 2739 } else { 2740 idle_stream_tail = prev_stream; 2741 } 2742 2743 stream.closedPrev = null; 2744 stream.closedNext = null; 2745 2746 --num_idle_streams; 2747 } 2748 2749 /* 2750 * Deletes closed stream to ensure that number of incoming streams 2751 * including active and closed is in the maximum number of allowed 2752 * stream. If |offset| is nonzero, it is decreased from the maximum 2753 * number of allowed stream when comparing number of active and closed 2754 * stream and the maximum number. 2755 */ 2756 void adjustClosedStream(int offset) 2757 { 2758 size_t num_stream_max; 2759 2760 num_stream_max = min(local_settings.max_concurrent_streams, pending_local_max_concurrent_stream); 2761 2762 LOGF("stream: adjusting kept closed streams num_closed_streams=%d, num_incoming_streams=%d, max_concurrent_streams=%d", 2763 num_closed_streams, num_incoming_streams, num_stream_max); 2764 2765 while (num_closed_streams > 0 && num_closed_streams + num_incoming_streams + offset > num_stream_max) 2766 { 2767 Stream head_stream; 2768 2769 head_stream = closed_stream_head; 2770 2771 assert(head_stream); 2772 2773 closed_stream_head = head_stream.closedNext; 2774 2775 if (closed_stream_head) 2776 closed_stream_head.closedPrev = null; 2777 else 2778 closed_stream_tail = null; 2779 2780 destroyStream(head_stream); 2781 /* head_stream is now freed */ 2782 --num_closed_streams; 2783 } 2784 } 2785 2786 /* 2787 * Deletes idle stream to ensure that number of idle streams is in 2788 * certain limit. 2789 */ 2790 void adjustIdleStream() 2791 { 2792 size_t _max; 2793 2794 /* Make minimum number of idle streams 2 so that allocating 2 2795 streams at once is easy. This happens when PRIORITY frame to 2796 idle stream, which depends on idle stream which does not 2797 exist. */ 2798 _max = max(2, min(local_settings.max_concurrent_streams, pending_local_max_concurrent_stream)); 2799 2800 LOGF("stream: adjusting kept idle streams num_idle_streams=%d, max=%d", num_idle_streams, _max); 2801 2802 while (num_idle_streams > _max) { 2803 Stream head; 2804 2805 head = idle_stream_head; 2806 assert(head); 2807 2808 idle_stream_head = head.closedNext; 2809 2810 if (idle_stream_head) { 2811 idle_stream_head.closedPrev = null; 2812 } else { 2813 idle_stream_tail = null; 2814 } 2815 2816 destroyStream(head); 2817 /* head is now destroyed */ 2818 --num_idle_streams; 2819 } 2820 } 2821 2822 /* 2823 * Closes stream with stream ID |stream_id| if both transmission and 2824 * reception of the stream were disallowed. The |error_code| indicates 2825 * the reason of the closure. 2826 * 2827 * This function returns 0 if it succeeds, or one of the following 2828 * negative error codes: 2829 * 2830 * ErrorCode.INVALID_ARGUMENT 2831 * The stream is not found. 2832 * ErrorCode.CALLBACK_FAILURE 2833 * The callback function failed. 2834 */ 2835 ErrorCode closeStreamIfShutRdWr(Stream stream) 2836 { 2837 if ((stream.shutFlags & ShutdownFlag.RDWR) == ShutdownFlag.RDWR) 2838 return closeStream(stream.id, FrameError.NO_ERROR); 2839 return ErrorCode.OK; 2840 } 2841 2842 void endRequestHeadersReceived(Frame frame, Stream stream) 2843 { 2844 if (frame.hd.flags & FrameFlags.END_STREAM) { 2845 stream.shutdown(ShutdownFlag.RD); 2846 } 2847 /* Here we assume that stream is not shutdown in ShutdownFlag.WR */ 2848 } 2849 2850 ErrorCode endResponseHeadersReceived(Frame frame, Stream stream) 2851 { 2852 if (frame.hd.flags & FrameFlags.END_STREAM) { 2853 /* This is the last frame of this stream, so disallow further receptions. */ 2854 stream.shutdown(ShutdownFlag.RD); 2855 return closeStreamIfShutRdWr(stream); 2856 } 2857 2858 return ErrorCode.OK; 2859 } 2860 2861 ErrorCode endHeadersReceived(Frame frame, Stream stream) 2862 { 2863 if (frame.hd.flags & FrameFlags.END_STREAM) { 2864 stream.shutdown(ShutdownFlag.RD); 2865 return closeStreamIfShutRdWr(stream); 2866 } 2867 return ErrorCode.OK; 2868 } 2869 2870 2871 ErrorCode onRequestHeaders(Frame frame) 2872 { 2873 ErrorCode rv; 2874 Stream stream; 2875 if (frame.hd.stream_id == 0) { 2876 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: stream_id == 0"); 2877 } 2878 2879 /* If client recieves idle stream from server, it is invalid 2880 regardless stream ID is even or odd. This is because client is 2881 not expected to receive request from server. */ 2882 if (!is_server) { 2883 if (idleStreamDetect(frame.hd.stream_id)) { 2884 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: client received request"); 2885 } 2886 2887 return ErrorCode.IGN_HEADER_BLOCK; 2888 } 2889 2890 if (!isNewPeerStreamId(frame.hd.stream_id)) 2891 { 2892 /* The spec says if an endpoint receives a HEADERS with invalid 2893 stream ID, it MUST issue connection error with error code 2894 PROTOCOL_ERROR. But we could get trailer HEADERS after we have 2895 sent RST_STREAM to this stream and peer have not received it. 2896 Then connection error is too harsh. It means that we only use 2897 connection error if stream ID refers idle stream. OTherwise we 2898 just ignore HEADERS for now. */ 2899 if (idleStreamDetect(frame.hd.stream_id)) { 2900 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: invalid stream_id"); 2901 } 2902 2903 return ErrorCode.IGN_HEADER_BLOCK; 2904 } 2905 2906 last_recv_stream_id = frame.hd.stream_id; 2907 2908 if (goaway_flags & GoAwayFlags.SENT) { 2909 /* We just ignore stream after GOAWAY was queued */ 2910 return ErrorCode.IGN_HEADER_BLOCK; 2911 } 2912 2913 if (isIncomingConcurrentStreamsMax()) { 2914 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: max concurrent streams exceeded"); 2915 } 2916 2917 if (frame.headers.pri_spec.stream_id == frame.hd.stream_id) { 2918 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "request HEADERS: depend on itself"); 2919 } 2920 2921 if (isIncomingConcurrentStreamsPendingMax()) { 2922 return handleInflateInvalidStream(frame, FrameError.REFUSED_STREAM); 2923 } 2924 2925 stream = openStream(frame.hd.stream_id, StreamFlags.NONE, frame.headers.pri_spec, StreamState.OPENING, null); 2926 last_proc_stream_id = last_recv_stream_id; 2927 2928 if (!callOnHeaders(frame)) 2929 return ErrorCode.CALLBACK_FAILURE; 2930 2931 return ErrorCode.OK; 2932 } 2933 2934 ErrorCode onResponseHeaders(Frame frame, Stream stream) 2935 { 2936 ErrorCode rv; 2937 /* This function is only called if stream.state == StreamState.OPENING and stream_id is local side initiated. */ 2938 assert(stream.state == StreamState.OPENING && isMyStreamId(frame.hd.stream_id)); 2939 if (frame.hd.stream_id == 0) { 2940 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "response HEADERS: stream_id == 0"); 2941 } 2942 if (stream.shutFlags & ShutdownFlag.RD) { 2943 /* half closed (remote): from the spec: 2944 2945 If an endpoint receives additional frames for a stream that is 2946 in this state it MUST respond with a stream error (Section 2947 5.4.2) of type STREAM_CLOSED. 2948 */ 2949 return handleInflateInvalidStream(frame, FrameError.STREAM_CLOSED); 2950 } 2951 stream.state = StreamState.OPENED; 2952 if (!callOnHeaders(frame)) 2953 return ErrorCode.CALLBACK_FAILURE; 2954 return ErrorCode.OK; 2955 } 2956 2957 ErrorCode onPushResponseHeaders(Frame frame, Stream stream) 2958 { 2959 ErrorCode rv; 2960 assert(stream.state == StreamState.RESERVED); 2961 if (frame.hd.stream_id == 0) { 2962 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "push response HEADERS: stream_id == 0"); 2963 } 2964 if (goaway_flags) { 2965 /* We don't accept new stream after GOAWAY is sent or received. */ 2966 return ErrorCode.IGN_HEADER_BLOCK; 2967 } 2968 2969 if (isIncomingConcurrentStreamsMax()) { 2970 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "push response HEADERS: max concurrent streams exceeded"); 2971 } 2972 if (isIncomingConcurrentStreamsPendingMax()) { 2973 return handleInflateInvalidStream(frame, FrameError.REFUSED_STREAM); 2974 } 2975 2976 stream.promiseFulfilled(); 2977 ++num_incoming_streams; 2978 if (!callOnHeaders(frame)) 2979 return ErrorCode.CALLBACK_FAILURE; 2980 return ErrorCode.OK; 2981 } 2982 2983 /* 2984 * Called when HEADERS is received, assuming |frame| is properly 2985 * initialized. This function will first validate received frame and 2986 * then open stream sending it through callback functions. 2987 * 2988 * This function returns 0 if it succeeds, or one of the following 2989 * negative error codes: 2990 * 2991 * ErrorCode.IGN_HEADER_BLOCK 2992 * Frame was rejected and header block must be decoded but 2993 * result must be ignored. 2994 * ErrorCode.CALLBACK_FAILURE 2995 * The DataProvider failed 2996 */ 2997 ErrorCode onHeaders(Frame frame, Stream stream) 2998 { 2999 ErrorCode rv; 3000 if (frame.hd.stream_id == 0) { 3001 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "HEADERS: stream_id == 0"); 3002 } 3003 if (stream.state == StreamState.RESERVED) 3004 { 3005 /* reserved. The valid push response HEADERS is processed by 3006 onPushResponseHeaders(). This 3007 generic HEADERS is called invalid cases for HEADERS against 3008 reserved state. */ 3009 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "HEADERS: stream in reserved"); 3010 } 3011 if ((stream.shutFlags & ShutdownFlag.RD)) { 3012 /* half closed (remote): from the spec: 3013 3014 If an endpoint receives additional frames for a stream that is 3015 in this state it MUST respond with a stream error (Section 3016 5.4.2) of type STREAM_CLOSED. 3017 */ 3018 return handleInflateInvalidStream(frame, FrameError.STREAM_CLOSED); 3019 } 3020 if (isMyStreamId(frame.hd.stream_id)) { 3021 if (stream.state == StreamState.OPENED) { 3022 if (!callOnHeaders(frame)) 3023 return ErrorCode.CALLBACK_FAILURE; 3024 return ErrorCode.OK; 3025 } else if (stream.state == StreamState.CLOSING) { 3026 /* This is race condition. StreamState.CLOSING indicates 3027 that we queued RST_STREAM but it has not been sent. It will 3028 eventually sent, so we just ignore this frame. */ 3029 return ErrorCode.IGN_HEADER_BLOCK; 3030 } else { 3031 return handleInflateInvalidStream(frame, FrameError.PROTOCOL_ERROR); 3032 } 3033 } 3034 /* If this is remote peer initiated stream, it is OK unless it 3035 has sent END_STREAM frame already. But if stream is in 3036 StreamState.CLOSING, we discard the frame. This is a race 3037 condition. */ 3038 if (stream.state != StreamState.CLOSING) 3039 { 3040 if (!callOnHeaders(frame)) 3041 return ErrorCode.CALLBACK_FAILURE; 3042 return ErrorCode.OK; 3043 } 3044 return ErrorCode.IGN_HEADER_BLOCK; 3045 } 3046 3047 /* 3048 * Called when PRIORITY is received, assuming |frame| is properly 3049 * initialized. 3050 * 3051 * This function returns 0 if it succeeds, or one of the following 3052 * negative error codes: 3053 * 3054 * ErrorCode.CALLBACK_FAILURE 3055 * The DataProvider failed 3056 */ 3057 ErrorCode onPriority(Frame frame) 3058 { 3059 ErrorCode rv; 3060 Stream stream; 3061 3062 if (frame.hd.stream_id == 0) { 3063 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PRIORITY: stream_id == 0"); 3064 } 3065 3066 if (!is_server) { 3067 /* Re-prioritization works only in server */ 3068 bool ok = callOnFrame(frame); 3069 if (!ok) 3070 return ErrorCode.CALLBACK_FAILURE; 3071 return ErrorCode.OK; 3072 } 3073 3074 stream = getStreamRaw(frame.hd.stream_id); 3075 3076 if (!stream) { 3077 /* PRIORITY against idle stream can create anchor node in dependency tree. */ 3078 if (!idleStreamDetect(frame.hd.stream_id)) { 3079 return ErrorCode.OK; 3080 } 3081 3082 stream = openStream(frame.hd.stream_id, StreamFlags.NONE, frame.priority.pri_spec, StreamState.IDLE, null); 3083 } else 3084 reprioritizeStream(stream, frame.priority.pri_spec); 3085 3086 bool ok = callOnFrame(frame); 3087 if (!ok) 3088 return ErrorCode.CALLBACK_FAILURE; 3089 return ErrorCode.OK; 3090 } 3091 3092 /* 3093 * Called when RST_STREAM is received, assuming |frame| is properly 3094 * initialized. 3095 * 3096 * This function returns 0 if it succeeds, or one the following 3097 * negative error codes: 3098 * 3099 * ErrorCode.CALLBACK_FAILURE 3100 * The DataProvider failed 3101 */ 3102 ErrorCode onRstStream(Frame frame) 3103 { 3104 ErrorCode rv; 3105 Stream stream; 3106 if (frame.hd.stream_id == 0) { 3107 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "RST_STREAM: stream_id == 0"); 3108 } 3109 stream = getStream(frame.hd.stream_id); 3110 if (!stream) { 3111 if (idleStreamDetect(frame.hd.stream_id)) { 3112 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "RST_STREAM: stream in idle"); 3113 } 3114 } 3115 3116 bool ok = callOnFrame(frame); 3117 if (!ok) 3118 return ErrorCode.CALLBACK_FAILURE; 3119 3120 rv = closeStream(frame.hd.stream_id, frame.rst_stream.error_code); 3121 if (isFatal(rv)) { 3122 return rv; 3123 } 3124 return ErrorCode.OK; 3125 } 3126 3127 /* 3128 * Called when SETTINGS is received, assuming |frame| is properly 3129 * initialized. If |noack| is non-zero, SETTINGS with ACK will not be 3130 * submitted. If |frame| has NGFrameFlags.ACK flag set, no SETTINGS 3131 * with ACK will not be submitted regardless of |noack|. 3132 * 3133 * This function returns 0 if it succeeds, or one the following 3134 * negative error codes: 3135 * 3136 * ErrorCode.CALLBACK_FAILURE 3137 * The DataProvider failed 3138 */ 3139 3140 ErrorCode onSettings(Frame frame, bool noack) 3141 { 3142 ErrorCode rv; 3143 size_t i; 3144 3145 if (frame.hd.stream_id != 0) { 3146 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: stream_id != 0"); 3147 } 3148 if (frame.hd.flags & FrameFlags.ACK) { 3149 if (frame.settings.iva.length != 0) { 3150 return handleInvalidConnection(frame, FrameError.FRAME_SIZE_ERROR, "SETTINGS: ACK and payload != 0"); 3151 } 3152 if (!inflight_iva) { 3153 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: unexpected ACK"); 3154 } 3155 rv = updateLocalSettings(inflight_iva); 3156 Mem.free(inflight_iva); 3157 inflight_iva = null; 3158 if (rv != ErrorCode.OK) { 3159 FrameError error_code = FrameError.INTERNAL_ERROR; 3160 if (isFatal(rv)) { 3161 return rv; 3162 } 3163 if (rv == ErrorCode.HEADER_COMP) { 3164 error_code = FrameError.COMPRESSION_ERROR; 3165 } 3166 return handleInvalidConnection(frame, error_code, null); 3167 } 3168 bool ok = callOnFrame(frame); 3169 if (!ok) 3170 return ErrorCode.CALLBACK_FAILURE; 3171 return ErrorCode.OK; 3172 } 3173 3174 for (i = 0; i < frame.settings.iva.length; ++i) { 3175 Setting entry = frame.settings.iva[i]; 3176 3177 with(Setting) switch (entry.id) { 3178 case HEADER_TABLE_SIZE: 3179 3180 if (entry.value > MAX_HEADER_TABLE_SIZE) { 3181 return handleInvalidConnection(frame, FrameError.COMPRESSION_ERROR, "SETTINGS: too large Setting.HEADER_TABLE_SIZE"); 3182 } 3183 3184 hd_deflater.changeTableSize(entry.value); 3185 3186 remote_settings.header_table_size = entry.value; 3187 3188 break; 3189 case ENABLE_PUSH: 3190 3191 if (entry.value != 0 && entry.value != 1) { 3192 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: invalid Setting.ENABLE_PUSH"); 3193 } 3194 3195 if (!is_server && entry.value != 0) { 3196 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: server attempted to enable push"); 3197 } 3198 3199 remote_settings.enable_push = entry.value; 3200 3201 break; 3202 case MAX_CONCURRENT_STREAMS: 3203 3204 remote_settings.max_concurrent_streams = entry.value; 3205 3206 break; 3207 case INITIAL_WINDOW_SIZE: 3208 /* Update the initial window size of the all active streams */ 3209 /* Check that initial_window_size < (1u << 31) */ 3210 if (entry.value > MAX_WINDOW_SIZE) { 3211 return handleInvalidConnection(frame, FrameError.FLOW_CONTROL_ERROR, "SETTINGS: too large Setting.INITIAL_WINDOW_SIZE"); 3212 } 3213 3214 rv = updateRemoteInitialWindowSize(entry.value); 3215 3216 if (isFatal(rv)) { 3217 return rv; 3218 } 3219 3220 if (rv != ErrorCode.OK) { 3221 return handleInvalidConnection(frame, FrameError.FLOW_CONTROL_ERROR, null); 3222 } 3223 3224 remote_settings.initial_window_size = entry.value; 3225 3226 break; 3227 case MAX_FRAME_SIZE: 3228 3229 if (entry.value < MAX_FRAME_SIZE_MIN || 3230 entry.value > MAX_FRAME_SIZE_MAX) { 3231 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "SETTINGS: invalid Setting.MAX_FRAME_SIZE"); 3232 } 3233 3234 remote_settings.max_frame_size = entry.value; 3235 3236 break; 3237 case MAX_HEADER_LIST_SIZE: 3238 3239 remote_settings.max_header_list_size = entry.value; 3240 3241 break; 3242 default: break; 3243 } 3244 } 3245 3246 if (!noack && !isClosing()) { 3247 rv = addSettings(FrameFlags.ACK, null); 3248 3249 if (rv != ErrorCode.OK) { 3250 if (isFatal(rv)) { 3251 return rv; 3252 } 3253 3254 return handleInvalidConnection(frame, FrameError.INTERNAL_ERROR, null); 3255 } 3256 } 3257 bool ok = callOnFrame(frame); 3258 if (!ok) 3259 return ErrorCode.CALLBACK_FAILURE; 3260 return ErrorCode.OK; 3261 } 3262 /* 3263 * Called when PUSH_PROMISE is received, assuming |frame| is properly 3264 * initialized. 3265 * 3266 * This function returns 0 if it succeeds, or one of the following 3267 * negative error codes: 3268 * 3269 * ErrorCode.IGN_HEADER_BLOCK 3270 * Frame was rejected and header block must be decoded but 3271 * result must be ignored. 3272 * ErrorCode.CALLBACK_FAILURE 3273 * The DataProvider failed 3274 */ 3275 3276 ErrorCode onPushPromise(Frame frame) 3277 { 3278 ErrorCode rv; 3279 Stream stream; 3280 Stream promised_stream; 3281 PrioritySpec pri_spec; 3282 3283 if (frame.hd.stream_id == 0) { 3284 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: stream_id == 0"); 3285 } 3286 if (is_server || local_settings.enable_push == 0) { 3287 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: push disabled"); 3288 } 3289 if (goaway_flags) { 3290 /* We just dicard PUSH_PROMISE after GOAWAY is sent or received. */ 3291 return ErrorCode.IGN_HEADER_BLOCK; 3292 } 3293 3294 if (!isMyStreamId(frame.hd.stream_id)) { 3295 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: invalid stream_id"); 3296 } 3297 3298 if (!isNewPeerStreamId(frame.push_promise.promised_stream_id)) { 3299 /* The spec says if an endpoint receives a PUSH_PROMISE with 3300 illegal stream ID is subject to a connection error of type 3301 PROTOCOL_ERROR. */ 3302 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: invalid promised_stream_id"); 3303 } 3304 last_recv_stream_id = frame.push_promise.promised_stream_id; 3305 stream = getStream(frame.hd.stream_id); 3306 if (!stream || stream.state == StreamState.CLOSING || !pending_enable_push) { 3307 if (!stream) { 3308 if (idleStreamDetect(frame.hd.stream_id)) { 3309 return handleInflateInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PUSH_PROMISE: stream in idle"); 3310 } 3311 } 3312 addRstStream(frame.push_promise.promised_stream_id, FrameError.REFUSED_STREAM); 3313 return ErrorCode.IGN_HEADER_BLOCK; 3314 } 3315 if (stream.shutFlags & ShutdownFlag.RD) { 3316 try 3317 if (!connector.onInvalidFrame(frame, FrameError.PROTOCOL_ERROR)) 3318 return ErrorCode.CALLBACK_FAILURE; 3319 catch(Exception e) return ErrorCode.CALLBACK_FAILURE; 3320 3321 addRstStream(frame.push_promise.promised_stream_id, FrameError.PROTOCOL_ERROR); 3322 return ErrorCode.IGN_HEADER_BLOCK; 3323 } 3324 3325 /* TODO: It is unclear reserved stream depends on associated stream with or without exclusive flag set */ 3326 pri_spec = PrioritySpec(stream.id, DEFAULT_WEIGHT, 0); 3327 3328 promised_stream = openStream(frame.push_promise.promised_stream_id, StreamFlags.NONE, pri_spec, StreamState.RESERVED, null); 3329 3330 last_proc_stream_id = last_recv_stream_id; 3331 if (!callOnHeaders(frame)) 3332 return ErrorCode.CALLBACK_FAILURE; 3333 return ErrorCode.OK; 3334 } 3335 3336 /* 3337 * Called when PING is received, assuming |frame| is properly 3338 * initialized. 3339 * 3340 * This function returns 0 if it succeeds, or one of the following 3341 * negative error codes: 3342 * 3343 * ErrorCode.CALLBACK_FAILURE 3344 * The callback function failed. 3345 */ 3346 ErrorCode onPing(Frame frame) 3347 { 3348 int rv = 0; 3349 if (frame.hd.stream_id != 0) { 3350 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "PING: stream_id != 0"); 3351 } 3352 if ((frame.hd.flags & FrameFlags.ACK) == 0 && !isClosing()) 3353 { 3354 /* Peer sent ping, so ping it back */ 3355 addPing(FrameFlags.ACK, frame.ping.opaque_data); 3356 } 3357 bool ok = callOnFrame(frame); 3358 if (!ok) 3359 return ErrorCode.CALLBACK_FAILURE; 3360 return ErrorCode.OK; 3361 } 3362 3363 /* 3364 * Called when GOAWAY is received, assuming |frame| is properly 3365 * initialized. 3366 * 3367 * This function returns 0 if it succeeds, or one of the following 3368 * negative error codes: 3369 * 3370 * ErrorCode.CALLBACK_FAILURE 3371 * The callback function failed. 3372 */ 3373 ErrorCode onGoAway(Frame frame) 3374 { 3375 if (frame.hd.stream_id != 0) 3376 { 3377 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "GOAWAY: stream_id != 0"); 3378 } 3379 3380 /* Spec says Endpoints MUST NOT increase the value they send in the last stream identifier. */ 3381 if ((frame.goaway.last_stream_id > 0 && !isMyStreamId(frame.goaway.last_stream_id)) || 3382 remote_last_stream_id < frame.goaway.last_stream_id) 3383 { 3384 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "GOAWAY: invalid last_stream_id"); 3385 } 3386 3387 goaway_flags |= GoAwayFlags.RECV; 3388 3389 remote_last_stream_id = frame.goaway.last_stream_id; 3390 3391 bool ok = callOnFrame(frame); 3392 if (!ok) 3393 return ErrorCode.CALLBACK_FAILURE; 3394 3395 return closeStreamOnGoAway(frame.goaway.last_stream_id, false); 3396 } 3397 3398 /* 3399 * Called when WINDOW_UPDATE is recieved, assuming |frame| is properly 3400 * initialized. 3401 * 3402 * This function returns 0 if it succeeds, or one of the following 3403 * negative error codes: 3404 * ErrorCode.CALLBACK_FAILURE 3405 * The callback function failed. 3406 */ 3407 ErrorCode onWindowUpdate(Frame frame) 3408 { 3409 if (frame.hd.stream_id == 0) 3410 { 3411 /* Handle connection-level flow control */ 3412 if (frame.window_update.window_size_increment == 0) 3413 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, null); 3414 3415 if (MAX_WINDOW_SIZE - frame.window_update.window_size_increment < remote_window_size) 3416 return handleInvalidConnection(frame, FrameError.FLOW_CONTROL_ERROR, null); 3417 3418 remote_window_size += frame.window_update.window_size_increment; 3419 3420 } else { 3421 /* handle stream window update */ 3422 Stream stream = getStream(frame.hd.stream_id); 3423 3424 if (!stream) { 3425 if (idleStreamDetect(frame.hd.stream_id)) 3426 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "WINDOW_UPDATE to idle stream"); 3427 return ErrorCode.OK; 3428 } 3429 3430 if (isReservedRemote(stream)) 3431 return handleInvalidConnection(frame, FrameError.PROTOCOL_ERROR, "WINDOW_UPADATE to reserved stream"); 3432 3433 if (frame.window_update.window_size_increment == 0) 3434 return handleInvalidStream(frame, FrameError.PROTOCOL_ERROR); 3435 3436 if (MAX_WINDOW_SIZE - frame.window_update.window_size_increment < stream.remoteWindowSize) 3437 return handleInvalidStream(frame, FrameError.FLOW_CONTROL_ERROR); 3438 3439 stream.remoteWindowSize = stream.remoteWindowSize + frame.window_update.window_size_increment; 3440 3441 if (stream.remoteWindowSize > 0 && stream.isDeferredByFlowControl()) 3442 stream.resumeDeferredItem(StreamFlags.DEFERRED_FLOW_CONTROL, this); 3443 } 3444 3445 bool ok = callOnFrame(frame); 3446 3447 if (!ok) 3448 return ErrorCode.CALLBACK_FAILURE; 3449 3450 return ErrorCode.OK; 3451 } 3452 3453 /* 3454 * Called when DATA is received, assuming |frame| is properly 3455 * initialized. 3456 * 3457 * This function returns 0 if it succeeds, or one of the following 3458 * negative error codes: 3459 * ErrorCode.CALLBACK_FAILURE 3460 * The callback function failed. 3461 */ 3462 ErrorCode onData(Frame frame) 3463 { 3464 ErrorCode rv; 3465 bool call_cb = true; 3466 Stream stream = getStream(frame.hd.stream_id); 3467 3468 /* We don't call on_frame_recv_callback if stream has been closed already or being closed. */ 3469 if (!stream || stream.state == StreamState.CLOSING) { 3470 /* This should be treated as stream error, but it results in lots 3471 of RST_STREAM. So just ignore frame against nonexistent stream 3472 for now. */ 3473 return ErrorCode.OK; 3474 } 3475 3476 if (isHTTPMessagingEnabled() && (frame.hd.flags & FrameFlags.END_STREAM)) 3477 { 3478 if (!stream.validateRemoteEndStream()) { 3479 call_cb = false; 3480 handleInvalidStream2(stream.id, frame, FrameError.PROTOCOL_ERROR); 3481 } 3482 } 3483 3484 if (call_cb) { 3485 bool ok = callOnFrame(frame); 3486 if (!ok) { 3487 return ErrorCode.CALLBACK_FAILURE; 3488 } 3489 } 3490 3491 if (frame.hd.flags & FrameFlags.END_STREAM) 3492 { 3493 stream.shutdown(ShutdownFlag.RD); 3494 rv = closeStreamIfShutRdWr(stream); 3495 if (isFatal(rv)) { 3496 return rv; 3497 } 3498 } 3499 return ErrorCode.OK; 3500 } 3501 3502 /* 3503 * Packs DATA frame |frame| in wire frame format and stores it in 3504 * |bufs|. Payload will be read using |aux_data.data_prd|. The 3505 * length of payload is at most |datamax| bytes. 3506 * 3507 * This function returns 0 if it succeeds, or one of the following 3508 * negative error codes: 3509 * 3510 * ErrorCode.DEFERRED 3511 * The DATA frame is postponed. 3512 * ErrorCode.TEMPORAL_CALLBACK_FAILURE 3513 * The DataProvider failed (stream error). 3514 * ErrorCode.CALLBACK_FAILURE 3515 * The DataProvider failed (session error). 3516 */ 3517 ErrorCode packData(Buffers bufs, int datamax, ref Frame frame, ref DataAuxData aux_data) { 3518 ErrorCode rv; 3519 DataFlags data_flags; 3520 int payloadlen; 3521 int padded_payloadlen; 3522 Buffer* buf; 3523 size_t max_payloadlen; 3524 3525 assert(bufs.head == bufs.cur); 3526 3527 buf = &bufs.cur.buf; 3528 3529 Stream stream = getStream(frame.hd.stream_id); 3530 3531 if (!stream) 3532 return ErrorCode.INVALID_ARGUMENT; 3533 3534 try payloadlen = connector.maxFrameSize(frame.hd.type, stream.id, remote_window_size, stream.remoteWindowSize, remote_settings.max_frame_size); 3535 catch(Exception e) payloadlen = min(remote_window_size, stream.remoteWindowSize, remote_settings.max_frame_size); 3536 LOGF("send: read_length_callback=%d", payloadlen); 3537 3538 payloadlen = enforceFlowControlLimits(stream, payloadlen); 3539 3540 LOGF("send: read_length_callback after flow control=%d", payloadlen); 3541 3542 if (payloadlen <= 0) 3543 return ErrorCode.CALLBACK_FAILURE; 3544 3545 if (payloadlen > buf.available) { 3546 import core.exception : OutOfMemoryError; 3547 /* Resize the current buffer(s). The reason why we do +1 for buffer size is for possible padding field. */ 3548 try { 3549 aob.framebufs.realloc(FRAME_HDLEN + 1 + payloadlen); 3550 assert(aob.framebufs == bufs); 3551 buf = &bufs.cur.buf; 3552 } catch (OutOfMemoryError oom) { 3553 /* If reallocation failed, old buffers are still in tact. So use safe limit. */ 3554 payloadlen = datamax; 3555 rv = ErrorCode.NOMEM; 3556 } 3557 } 3558 3559 datamax = payloadlen; 3560 3561 /* Current max DATA length is less then buffer chunk size */ 3562 assert(buf.available >= datamax); 3563 3564 data_flags = DataFlags.NONE; 3565 3566 payloadlen = aux_data.data_prd(buf.pos[0 .. datamax], data_flags); 3567 3568 if (payloadlen == ErrorCode.DEFERRED || 3569 payloadlen == ErrorCode.TEMPORAL_CALLBACK_FAILURE) 3570 { 3571 import libhttp2.helpers : toString; 3572 LOGF("send: DATA postponed due to %s", toString(cast(ErrorCode)payloadlen)); 3573 3574 return cast(ErrorCode)payloadlen; 3575 } 3576 3577 if (payloadlen < 0 || datamax < cast(size_t)payloadlen) 3578 { 3579 /* This is the error code when callback is failed. */ 3580 return ErrorCode.CALLBACK_FAILURE; 3581 } 3582 3583 buf.last = buf.pos + payloadlen; 3584 buf.pos -= FRAME_HDLEN; 3585 3586 /* Clear flags, because this may contain previous flags of previous DATA */ 3587 frame.hd.flags = FrameFlags.NONE; 3588 3589 if (data_flags & DataFlags.EOF) { 3590 aux_data.eof = true; 3591 if (aux_data.flags & DataFlags.EOF) 3592 frame.hd.flags |= FrameFlags.END_STREAM; 3593 } 3594 3595 if (data_flags & DataFlags.NO_COPY) 3596 aux_data.no_copy = true; 3597 3598 frame.hd.length = payloadlen; 3599 frame.data.padlen = 0; 3600 3601 max_payloadlen = min(datamax, frame.hd.length + MAX_PADLEN); 3602 3603 padded_payloadlen = callSelectPadding(frame, max_payloadlen); 3604 3605 if (isFatal(cast(int)padded_payloadlen)) { 3606 return cast(ErrorCode)padded_payloadlen; 3607 } 3608 3609 frame.data.padlen = padded_payloadlen - payloadlen; 3610 3611 frame.hd.pack((*buf)[]); 3612 3613 frame.hd.addPad(bufs, frame.data.padlen, aux_data.no_copy); 3614 3615 return ErrorCode.OK; 3616 } 3617 /* 3618 * This function is called when HTTP header field |hf| in |frame| is 3619 * received for |stream|. This function will validate |hf| against 3620 * the current state of stream. 3621 * 3622 * This function returns 0 if it succeeds, or one of the following 3623 * negative error codes: 3624 * 3625 * ErrorCode.HTTP_HEADER 3626 * Invalid HTTP header field was received. 3627 * ErrorCode.IGN_HTTP_HEADER 3628 * Invalid HTTP header field was received but it can be treated as 3629 * if it was not received because of compatibility reasons. 3630 */ 3631 ErrorCode validateHeaderField(Stream stream, in Frame frame, HeaderField hf, bool trailer) 3632 { 3633 /* We are strict for pseudo header field. One bad character 3634 should lead to fail. OTOH, we should be a bit forgiving for 3635 regular headers, since existing public internet has so much 3636 illegal headers floating around and if we kill the stream 3637 because of this, we may disrupt many web sites and/or 3638 libraries. So we become conservative here, and just ignore 3639 those illegal regular headers. */ 3640 if (!hf.validateName()) 3641 { 3642 size_t i; 3643 if (hf.name.length > 0 && hf.name[0] == ':') { 3644 return ErrorCode.HTTP_HEADER; 3645 } 3646 /* header field name must be lower-cased without exception */ 3647 for (i = 0; i < hf.name.length; ++i) { 3648 char c = hf.name[i]; 3649 if ('A' <= c && c <= 'Z') { 3650 return ErrorCode.HTTP_HEADER; 3651 } 3652 } 3653 3654 /* When ignoring regular headers, we set this flag so that we 3655 still enforce header field ordering rule for pseudo header 3656 fields. */ 3657 stream.httpFlags = cast(HTTPFlags)(stream.httpFlags | HTTPFlags.PSEUDO_HEADER_DISALLOWED); 3658 return ErrorCode.IGN_HTTP_HEADER; 3659 } 3660 3661 if (!hf.validateValue()) 3662 { 3663 assert(hf.name.length > 0); 3664 if (hf.name[0] == ':') { 3665 return ErrorCode.HTTP_HEADER; 3666 } 3667 3668 /* When ignoring regular headers, we set this flag so that we 3669 still enforce header field ordering rule for pseudo header 3670 fields. */ 3671 stream.httpFlags = cast(HTTPFlags)(stream.httpFlags | HTTPFlags.PSEUDO_HEADER_DISALLOWED); 3672 return ErrorCode.IGN_HTTP_HEADER; 3673 } 3674 3675 if (is_server || frame.hd.type == FrameType.PUSH_PROMISE) 3676 return hf.validateRequestHeader(stream, trailer) ? ErrorCode.OK : ErrorCode.HTTP_HEADER; 3677 3678 return hf.validateResponseHeader(stream, trailer) ? ErrorCode.OK : ErrorCode.HTTP_HEADER; 3679 } 3680 3681 /* 3682 * Pops and returns next item to send. If there is no such item, 3683 * returns null. This function takes into account max concurrent 3684 * streams. That means if session.ob_pq is empty but 3685 * session.ob_ss_pq has item and max concurrent streams is reached, 3686 * then this function returns null. 3687 */ 3688 OutboundItem popNextOutboundItem() { 3689 OutboundItem item; 3690 OutboundItem headers_item; 3691 3692 if (ob_pq.empty) { 3693 if (ob_ss_pq.empty) { 3694 if (remote_window_size == 0 || ob_da_pq.empty) 3695 return null; 3696 item = ob_da_pq.top; 3697 ob_da_pq.pop(); 3698 item.queued = 0; 3699 return item; 3700 } 3701 3702 /* Pop item only when concurrent connection limit is not reached */ 3703 if (isOutgoingConcurrentStreamsMax()) { 3704 if (remote_window_size == 0 || ob_da_pq.empty) 3705 return null; 3706 3707 item = ob_da_pq.top; 3708 ob_da_pq.pop(); 3709 item.queued = 0; 3710 return item; 3711 } 3712 3713 item = ob_ss_pq.top; 3714 ob_ss_pq.pop(); 3715 item.queued = 0; 3716 return item; 3717 } 3718 3719 if (ob_ss_pq.empty) { 3720 item = ob_pq.top; 3721 ob_pq.pop(); 3722 item.queued = 0; 3723 return item; 3724 } 3725 3726 item = ob_pq.top; 3727 headers_item = ob_ss_pq.top; 3728 3729 if (isOutgoingConcurrentStreamsMax() || 3730 item.weight > headers_item.weight || 3731 (item.weight == headers_item.weight && item.seq < headers_item.seq)) 3732 { 3733 ob_pq.pop(); 3734 item.queued = 0; 3735 return item; 3736 } 3737 3738 ob_ss_pq.pop(); 3739 headers_item.queued = 0; 3740 return headers_item; 3741 } 3742 3743 /* 3744 * Returns next item to send. If there is no such item, this function 3745 * returns null. This function takes into account max concurrent 3746 * streams. That means if session.ob_pq is empty but 3747 * session.ob_ss_pq has item and max concurrent streams is reached, 3748 * then this function returns null. 3749 */ 3750 OutboundItem getNextOutboundItem() { 3751 OutboundItem item; 3752 OutboundItem headers_item; 3753 3754 if (ob_pq.empty) { 3755 if (ob_ss_pq.empty) { 3756 if (remote_window_size == 0 || ob_da_pq.empty) 3757 return null; 3758 3759 return ob_da_pq.top; 3760 } 3761 3762 /* Return item only when concurrent connection limit is not reached */ 3763 if (isOutgoingConcurrentStreamsMax()) { 3764 if (remote_window_size == 0 || ob_da_pq.empty) 3765 return null; 3766 3767 return ob_da_pq.top; 3768 } 3769 3770 return ob_ss_pq.top; 3771 } 3772 3773 if (ob_ss_pq.empty) { 3774 return ob_pq.top; 3775 } 3776 3777 item = ob_pq.top; 3778 headers_item = ob_ss_pq.top; 3779 3780 if (isOutgoingConcurrentStreamsMax() || item.weight > headers_item.weight || 3781 (item.weight == headers_item.weight && item.seq < headers_item.seq)) 3782 { 3783 return item; 3784 } 3785 3786 return headers_item; 3787 } 3788 3789 /* 3790 * Updates local settings with the |iva|. The number of elements in the 3791 * array pointed by the |iva| is given by the |iva.length|. This function 3792 * assumes that the all settings_id member in |iva| are in range 1 to 3793 * Setting.MAX_HEADER_LIST_SIZE, inclusive. 3794 * 3795 * While updating individual stream's local window size, if the window 3796 * size becomes strictly larger than MAX_WINDOW_SIZE, 3797 * RST_STREAM is issued against such a stream. 3798 * 3799 * This function returns 0 if it succeeds, or one of the following 3800 * negative error codes: 3801 * 3802 * ErrorCode.HEADER_COMP 3803 * The header table size is out of range 3804 */ 3805 ErrorCode updateLocalSettings(in Setting[] iva) 3806 { 3807 ErrorCode rv; 3808 size_t i; 3809 int new_initial_window_size = -1; 3810 int header_table_size = -1; 3811 bool header_table_size_seen; 3812 /* Use the value last seen. */ 3813 foreach(iv; iva) { 3814 switch (iv.id) { 3815 case Setting.HEADER_TABLE_SIZE: 3816 header_table_size_seen = true; 3817 header_table_size = iv.value; 3818 break; 3819 case Setting.INITIAL_WINDOW_SIZE: 3820 new_initial_window_size = iv.value; 3821 break; 3822 default: break; 3823 } 3824 } 3825 if (header_table_size_seen) 3826 hd_inflater.changeTableSize(header_table_size); 3827 3828 if (new_initial_window_size != -1) { 3829 rv = updateLocalInitialWindowSize(new_initial_window_size, local_settings.initial_window_size); 3830 if (rv != ErrorCode.OK) { 3831 return rv; 3832 } 3833 } 3834 3835 foreach(iv; iva) { 3836 with(Setting) switch (iv.id) { 3837 case HEADER_TABLE_SIZE: 3838 local_settings.header_table_size = iv.value; 3839 break; 3840 case ENABLE_PUSH: 3841 local_settings.enable_push = iv.value; 3842 break; 3843 case MAX_CONCURRENT_STREAMS: 3844 local_settings.max_concurrent_streams = iv.value; 3845 break; 3846 case INITIAL_WINDOW_SIZE: 3847 local_settings.initial_window_size = iv.value; 3848 break; 3849 case MAX_FRAME_SIZE: 3850 local_settings.max_frame_size = iv.value; 3851 break; 3852 case MAX_HEADER_LIST_SIZE: 3853 local_settings.max_header_list_size = iv.value; 3854 break; 3855 default: break; 3856 } 3857 } 3858 3859 pending_local_max_concurrent_stream = INITIAL_MAX_CONCURRENT_STREAMS; 3860 pending_enable_push = true; 3861 3862 return ErrorCode.OK; 3863 } 3864 3865 /* 3866 * Re-prioritize |stream|. The new priority specification is |pri_spec|. 3867 */ 3868 void reprioritizeStream(Stream stream, ref PrioritySpec pri_spec) 3869 { 3870 Stream dep_stream; 3871 Stream root_stream; 3872 PrioritySpec pri_spec_default; 3873 3874 if (!stream.inDepTree()) 3875 return; 3876 3877 if (pri_spec.stream_id == stream.id) { 3878 terminateSessionWithReason(FrameError.PROTOCOL_ERROR, "depend on itself"); 3879 return; 3880 } 3881 3882 if (pri_spec.stream_id != 0) { 3883 dep_stream = getStreamRaw(pri_spec.stream_id); 3884 3885 if (is_server && !dep_stream && idleStreamDetect(pri_spec.stream_id)) 3886 { 3887 dep_stream = openStream(pri_spec.stream_id, StreamFlags.NONE, pri_spec_default, StreamState.IDLE, null); 3888 3889 } else if (!dep_stream || !dep_stream.inDepTree()) { 3890 pri_spec = pri_spec_default; 3891 } 3892 } 3893 3894 if (pri_spec.stream_id == 0) { 3895 stream.removeSubtree(); 3896 3897 /* We have to update weight after removing stream from tree */ 3898 stream.weight = pri_spec.weight; 3899 3900 if (pri_spec.exclusive && 3901 roots.num_streams <= MAX_DEP_TREE_LENGTH) { 3902 3903 stream.makeTopmostRoot(this); 3904 } else { 3905 stream.makeRoot(this); 3906 } 3907 3908 return; 3909 } 3910 3911 assert(dep_stream); 3912 3913 if (stream.subtreeContains(dep_stream)) { 3914 LOGF("stream: cycle detected, dep_stream(%s=%d stream(%s)=%d", dep_stream, dep_stream.id, stream, stream.id); 3915 3916 dep_stream.removeSubtree(); 3917 dep_stream.makeRoot(this); 3918 } 3919 3920 stream.removeSubtree(); 3921 3922 /* We have to update weight after removing stream from tree */ 3923 stream.weight = pri_spec.weight; 3924 3925 root_stream = dep_stream.getRoot(); 3926 3927 if (root_stream.subStreams + stream.subStreams > MAX_DEP_TREE_LENGTH) 3928 { 3929 stream.weight = DEFAULT_WEIGHT; 3930 3931 stream.makeRoot(this); 3932 } else { 3933 if (pri_spec.exclusive) 3934 dep_stream.insertSubtree(stream, this); 3935 else 3936 dep_stream.addSubtree(stream, this); 3937 } 3938 } 3939 3940 /* 3941 * Terminates current $(D Session) with the |error_code|. The |reason| 3942 * is null-terminated debug string. 3943 * 3944 * This function returns 0 if it succeeds, or one of the following 3945 * negative error codes: 3946 * 3947 * ErrorCode.INVALID_ARGUMENT 3948 * The |reason| is too long. 3949 */ 3950 ErrorCode terminateSessionWithReason(FrameError error_code, string reason) 3951 { 3952 return terminateSession(last_proc_stream_id, error_code, reason); 3953 } 3954 3955 /* 3956 * Returns true if the number of outgoing opened streams is larger than or equal to 3957 * remote_settings.max_concurrent_streams. 3958 */ 3959 bool isOutgoingConcurrentStreamsMax() 3960 { 3961 return remote_settings.max_concurrent_streams <= num_outgoing_streams; 3962 } 3963 3964 /* 3965 * Returns true if the number of incoming opened streams is larger 3966 * than or equal to local_settings.max_concurrent_streams. 3967 */ 3968 bool isIncomingConcurrentStreamsMax() 3969 { 3970 return local_settings.max_concurrent_streams <= num_incoming_streams; 3971 } 3972 3973 /* 3974 * Returns true if the number of incoming opened streams is larger 3975 * than or equal to session.pending_local_max_concurrent_stream. 3976 */ 3977 bool isIncomingConcurrentStreamsPendingMax() 3978 { 3979 return pending_local_max_concurrent_stream <= num_incoming_streams; 3980 } 3981 3982 bool isHTTPMessagingEnabled() 3983 { 3984 return (opt_flags & OptionsMask.NO_HTTP_MESSAGING) == 0; 3985 } 3986 3987 /* 3988 * Returns true if |frame| is trailer headers. 3989 */ 3990 bool isTrailerHeaders(Stream stream, in Frame frame) 3991 { 3992 if (!stream || frame.hd.type != FrameType.HEADERS) { 3993 return false; 3994 } 3995 if (is_server) { 3996 return frame.headers.cat == HeadersCategory.HEADERS; 3997 } 3998 3999 return frame.headers.cat == HeadersCategory.HEADERS && (stream.httpFlags & HTTPFlags.EXPECT_FINAL_RESPONSE) == 0; 4000 } 4001 4002 /* Returns true if the |stream| is in reserved(remote) state */ 4003 bool isReservedRemote(Stream stream) 4004 { 4005 return stream.state == StreamState.RESERVED && !isMyStreamId(stream.id); 4006 } 4007 4008 /* Returns true if the |stream| is in reserved(local) state */ 4009 bool isReservedLocal(Stream stream) { 4010 return stream.state == StreamState.RESERVED && isMyStreamId(stream.id); 4011 } 4012 4013 /* 4014 * Checks whether received stream_id is valid. 4015 */ 4016 bool isNewPeerStreamId(int stream_id) 4017 { 4018 return stream_id != 0 && !isMyStreamId(stream_id) && last_recv_stream_id < stream_id; 4019 } 4020 4021 4022 /** 4023 * @function 4024 * 4025 * Returns the last stream ID of a stream for which 4026 * $(D Connector.onFrame) was invoked most recently. 4027 * The returned value can be used as last_stream_id parameter for 4028 * `submitGoAway()` and `terminateSession()`. 4029 * 4030 * This function always succeeds. 4031 */ 4032 int getLastProcStreamID() 4033 { 4034 return last_proc_stream_id; 4035 } 4036 4037 bool idleStreamDetect(int stream_id) 4038 { 4039 /* Assume that stream object with stream_id does not exist */ 4040 if (isMyStreamId(stream_id)) { 4041 if (next_stream_id <= cast(uint)stream_id) 4042 return true; 4043 return false; 4044 } 4045 if (isNewPeerStreamId(stream_id)) 4046 return true; 4047 4048 return false; 4049 } 4050 4051 void freeAllStreams() { 4052 foreach(stream; streams) 4053 { 4054 OutboundItem item = stream.item; 4055 4056 if (item && !item.queued && item != aob.item) 4057 { 4058 item.free(); 4059 Mem.free(item); 4060 } 4061 4062 stream.free(); 4063 Mem.free(stream); 4064 } 4065 } 4066 4067 /* 4068 * Returns Stream object whose stream ID is |stream_id|. It 4069 * could be null if such stream does not exist. This function returns 4070 * null if stream is marked as closed. 4071 */ 4072 Stream getStream(int stream_id) 4073 { 4074 Stream stream; 4075 4076 stream = streams.get(stream_id); 4077 4078 if (!stream || (stream.flags & StreamFlags.CLOSED) || stream.state == StreamState.IDLE) 4079 { 4080 return null; 4081 } 4082 4083 return stream; 4084 } 4085 /* 4086 * This function behaves like getStream(), but it 4087 * returns stream object even if it is marked as closed or in 4088 * StreamState.IDLE state. 4089 */ 4090 Stream getStreamRaw(int stream_id) 4091 { 4092 return streams.get(stream_id); 4093 } 4094 4095 // terminates the session 4096 ErrorCode terminateSession(int last_stream_id, FrameError error_code, string reason) 4097 { 4098 ErrorCode rv; 4099 string debug_data; 4100 4101 if (goaway_flags & GoAwayFlags.TERM_ON_SEND) { 4102 return ErrorCode.OK; 4103 } 4104 4105 if (!reason) { 4106 debug_data = null; 4107 } else { 4108 debug_data = reason; 4109 } 4110 4111 rv = addGoAway(last_stream_id, error_code, debug_data, GoAwayAuxFlags.TERM_ON_SEND); 4112 4113 if (rv != ErrorCode.OK) { 4114 return rv; 4115 } 4116 4117 goaway_flags |= GoAwayFlags.TERM_ON_SEND; 4118 4119 return ErrorCode.OK; 4120 } 4121 4122 /* 4123 * This function returns nonzero if session is closing. 4124 */ 4125 bool isClosing() 4126 { 4127 return (goaway_flags & GoAwayFlags.TERM_ON_SEND) != 0; 4128 } 4129 4130 /* 4131 * Check that we can send a frame to the |stream|. This function 4132 * returns 0 if we can send a frame to the |frame|, or one of the 4133 * following negative error codes: 4134 * 4135 * ErrorCode.STREAM_CLOSED 4136 * The stream is already closed. 4137 * ErrorCode.STREAM_SHUT_WR 4138 * The stream is half-closed for transmission. 4139 * ErrorCode.SESSION_CLOSING 4140 * This session is closing. 4141 */ 4142 ErrorCode predicateForStreamSend(Stream stream) 4143 { 4144 if (!stream) { 4145 return ErrorCode.STREAM_CLOSED; 4146 } 4147 if (isClosing()) { 4148 return ErrorCode.SESSION_CLOSING; 4149 } 4150 if (stream.shutFlags & ShutdownFlag.WR) { 4151 return ErrorCode.STREAM_SHUT_WR; 4152 } 4153 return ErrorCode.OK; 4154 } 4155 4156 /* 4157 * This function checks request HEADERS frame, which opens stream, can 4158 * be sent at this time. 4159 * 4160 * This function returns 0 if it succeeds, or one of the following 4161 * negative error codes: 4162 * 4163 * ErrorCode.START_STREAM_NOT_ALLOWED 4164 * New stream cannot be created because of GOAWAY: session is 4165 * going down or received last_stream_id is strictly less than 4166 * frame.hd.stream_id. 4167 * ErrorCode.STREAM_CLOSING 4168 * request HEADERS was canceled by RST_STREAM while it is in queue. 4169 */ 4170 ErrorCode predicateRequestHeadersSend(OutboundItem item) 4171 { 4172 if (item.aux_data.headers.canceled) { 4173 return ErrorCode.STREAM_CLOSING; 4174 } 4175 /* If we are terminating session (GoAwayFlags.TERM_ON_SEND) or 4176 * GOAWAY was received from peer, new request is not allowed. */ 4177 4178 if (goaway_flags & (GoAwayFlags.TERM_ON_SEND | GoAwayFlags.RECV)) 4179 { 4180 return ErrorCode.START_STREAM_NOT_ALLOWED; 4181 } 4182 return ErrorCode.OK; 4183 } 4184 4185 /* 4186 * This function checks HEADERS, which is the first frame from the 4187 * server, with the |stream| can be sent at this time. The |stream| 4188 * can be null. 4189 * 4190 * This function returns 0 if it succeeds, or one of the following 4191 * negative error codes: 4192 * 4193 * ErrorCode.STREAM_CLOSED 4194 * The stream is already closed or does not exist. 4195 * ErrorCode.STREAM_SHUT_WR 4196 * The transmission is not allowed for this stream (e.g., a frame 4197 * with END_STREAM flag set has already sent) 4198 * ErrorCode.INVALID_STREAM_ID 4199 * The stream ID is invalid. 4200 * ErrorCode.STREAM_CLOSING 4201 * RST_STREAM was queued for this stream. 4202 * ErrorCode.INVALID_STREAM_STATE 4203 * The state of the stream is not valid. 4204 * ErrorCode.SESSION_CLOSING 4205 * This session is closing. 4206 */ 4207 ErrorCode predicateResponseHeadersSend(Stream stream) 4208 { 4209 ErrorCode rv; 4210 rv = predicateForStreamSend(stream); 4211 if (rv != ErrorCode.OK) { 4212 return rv; 4213 } 4214 assert(stream); 4215 if (isMyStreamId(stream.id)) { 4216 return ErrorCode.INVALID_STREAM_ID; 4217 } 4218 if (stream.state == StreamState.OPENING) { 4219 return ErrorCode.OK; 4220 } 4221 if (stream.state == StreamState.CLOSING) { 4222 return ErrorCode.STREAM_CLOSING; 4223 } 4224 return ErrorCode.INVALID_STREAM_STATE; 4225 } 4226 4227 /* 4228 * This function checks HEADERS for reserved stream can be sent. The 4229 * |stream| must be reserved state and the |session| is server side. 4230 * The |stream| can be null. 4231 * 4232 * This function returns 0 if it succeeds, or one of the following 4233 * error codes: 4234 * 4235 * ErrorCode.STREAM_CLOSED 4236 * The stream is already closed. 4237 * ErrorCode.STREAM_SHUT_WR 4238 * The stream is half-closed for transmission. 4239 * ErrorCode.PROTO 4240 * The stream is not reserved state 4241 * ErrorCode.STREAM_CLOSED 4242 * RST_STREAM was queued for this stream. 4243 * ErrorCode.SESSION_CLOSING 4244 * This session is closing. 4245 */ 4246 ErrorCode predicatePushResponseHeadersSend(Stream stream) 4247 { 4248 ErrorCode rv; 4249 /* TODO Should disallow HEADERS if GOAWAY has already been issued? */ 4250 rv = predicateForStreamSend(stream); 4251 if (rv != ErrorCode.OK) { 4252 return rv; 4253 } 4254 assert(stream); 4255 if (stream.state != StreamState.RESERVED) { 4256 return ErrorCode.PROTO; 4257 } 4258 if (stream.state == StreamState.CLOSING) { 4259 return ErrorCode.STREAM_CLOSING; 4260 } 4261 return ErrorCode.OK; 4262 } 4263 4264 /* 4265 * This function checks HEADERS, which is neither stream-opening nor 4266 * first response header, with the |stream| can be sent at this time. 4267 * The |stream| can be null. 4268 * 4269 * This function returns 0 if it succeeds, or one of the following 4270 * negative error codes: 4271 * 4272 * ErrorCode.STREAM_CLOSED 4273 * The stream is already closed or does not exist. 4274 * ErrorCode.STREAM_SHUT_WR 4275 * The transmission is not allowed for this stream (e.g., a frame 4276 * with END_STREAM flag set has already sent) 4277 * ErrorCode.STREAM_CLOSING 4278 * RST_STREAM was queued for this stream. 4279 * ErrorCode.INVALID_STREAM_STATE 4280 * The state of the stream is not valid. 4281 * ErrorCode.SESSION_CLOSING 4282 * This session is closing. 4283 */ 4284 ErrorCode predicateHeadersSend(Stream stream) 4285 { 4286 ErrorCode rv; 4287 rv = predicateForStreamSend(stream); 4288 if (rv != ErrorCode.OK) { 4289 return rv; 4290 } 4291 assert(stream); 4292 if (isMyStreamId(stream.id)) 4293 { 4294 if (stream.state == StreamState.CLOSING) { 4295 return ErrorCode.STREAM_CLOSING; 4296 } 4297 return ErrorCode.OK; 4298 } 4299 if (stream.state == StreamState.OPENED) { 4300 return ErrorCode.OK; 4301 } 4302 if (stream.state == StreamState.CLOSING) { 4303 return ErrorCode.STREAM_CLOSING; 4304 } 4305 return ErrorCode.INVALID_STREAM_STATE; 4306 } 4307 4308 /* 4309 * This function checks PUSH_PROMISE frame |frame| with the |stream| 4310 * can be sent at this time. The |stream| can be null. 4311 * 4312 * This function returns 0 if it succeeds, or one of the following 4313 * negative error codes: 4314 * 4315 * ErrorCode.START_STREAM_NOT_ALLOWED 4316 * New stream cannot be created because GOAWAY is already sent or 4317 * received. 4318 * ErrorCode.PROTO 4319 * The client side attempts to send PUSH_PROMISE, or the server 4320 * sends PUSH_PROMISE for the stream not initiated by the client. 4321 * ErrorCode.STREAM_CLOSED 4322 * The stream is already closed or does not exist. 4323 * ErrorCode.STREAM_CLOSING 4324 * RST_STREAM was queued for this stream. 4325 * ErrorCode.STREAM_SHUT_WR 4326 * The transmission is not allowed for this stream (e.g., a frame 4327 * with END_STREAM flag set has already sent) 4328 * ErrorCode.PUSH_DISABLED 4329 * The remote peer disabled reception of PUSH_PROMISE. 4330 * ErrorCode.SESSION_CLOSING 4331 * This session is closing. 4332 */ 4333 ErrorCode predicatePushPromiseSend(Stream stream) 4334 { 4335 ErrorCode rv; 4336 4337 if (!is_server) { 4338 return ErrorCode.PROTO; 4339 } 4340 4341 rv = predicateForStreamSend(stream); 4342 if (rv != ErrorCode.OK) { 4343 return rv; 4344 } 4345 4346 assert(stream); 4347 4348 if (remote_settings.enable_push == 0) { 4349 return ErrorCode.PUSH_DISABLED; 4350 } 4351 if (stream.state == StreamState.CLOSING) { 4352 return ErrorCode.STREAM_CLOSING; 4353 } 4354 if (goaway_flags & GoAwayFlags.RECV) { 4355 return ErrorCode.START_STREAM_NOT_ALLOWED; 4356 } 4357 return ErrorCode.OK; 4358 } 4359 4360 /* 4361 * This function checks WINDOW_UPDATE with the stream ID |stream_id| 4362 * can be sent at this time. Note that END_STREAM flag of the previous 4363 * frame does not affect the transmission of the WINDOW_UPDATE frame. 4364 * 4365 * This function returns 0 if it succeeds, or one of the following 4366 * negative error codes: 4367 * 4368 * ErrorCode.STREAM_CLOSED 4369 * The stream is already closed or does not exist. 4370 * ErrorCode.STREAM_CLOSING 4371 * RST_STREAM was queued for this stream. 4372 * ErrorCode.INVALID_STREAM_STATE 4373 * The state of the stream is not valid. 4374 * ErrorCode.SESSION_CLOSING 4375 * This session is closing. 4376 */ 4377 ErrorCode predicateWindowUpdateSend(int stream_id) 4378 { 4379 Stream stream; 4380 if (stream_id == 0) { 4381 /* Connection-level window update */ 4382 return ErrorCode.OK; 4383 } 4384 stream = getStream(stream_id); 4385 if (!stream) { 4386 return ErrorCode.STREAM_CLOSED; 4387 } 4388 if (isClosing()) { 4389 return ErrorCode.SESSION_CLOSING; 4390 } 4391 if (stream.state == StreamState.CLOSING) { 4392 return ErrorCode.STREAM_CLOSING; 4393 } 4394 if (isReservedLocal(stream)) { 4395 return ErrorCode.INVALID_STREAM_STATE; 4396 } 4397 return ErrorCode.OK; 4398 } 4399 4400 /* 4401 * This function checks DATA with the |stream| can be sent at this 4402 * time. The |stream| can be null. 4403 * 4404 * This function returns 0 if it succeeds, or one of the following 4405 * negative error codes: 4406 * 4407 * ErrorCode.STREAM_CLOSED 4408 * The stream is already closed or does not exist. 4409 * ErrorCode.STREAM_SHUT_WR 4410 * The transmission is not allowed for this stream (e.g., a frame 4411 * with END_STREAM flag set has already sent) 4412 * ErrorCode.STREAM_CLOSING 4413 * RST_STREAM was queued for this stream. 4414 * ErrorCode.INVALID_STREAM_STATE 4415 * The state of the stream is not valid. 4416 * ErrorCode.SESSION_CLOSING 4417 * This session is closing. 4418 */ 4419 ErrorCode predicateDataSend(Stream stream) 4420 { 4421 ErrorCode rv; 4422 rv = predicateForStreamSend(stream); 4423 if (rv != ErrorCode.OK) { 4424 return rv; 4425 } 4426 assert(stream); 4427 if (isMyStreamId(stream.id)) { 4428 /* Request body data */ 4429 /* If stream.state is StreamState.CLOSING, RST_STREAM was queued but not yet sent. In this case, we won't send DATA frames. */ 4430 if (stream.state == StreamState.CLOSING) { 4431 return ErrorCode.STREAM_CLOSING; 4432 } 4433 if (stream.state == StreamState.RESERVED) { 4434 return ErrorCode.INVALID_STREAM_STATE; 4435 } 4436 return ErrorCode.OK; 4437 } 4438 /* Response body data */ 4439 if (stream.state == StreamState.OPENED) { 4440 return ErrorCode.OK; 4441 } 4442 if (stream.state == StreamState.CLOSING) { 4443 return ErrorCode.STREAM_CLOSING; 4444 } 4445 return ErrorCode.INVALID_STREAM_STATE; 4446 } 4447 4448 4449 /* Take into account settings max frame size and both connection-level flow control here */ 4450 int enforceFlowControlLimits(Stream stream, int requested_window_size) 4451 { 4452 LOGF("send: remote windowsize connection=%d, remote maxframsize=%u, stream(id %d=%d", 4453 remote_window_size, 4454 remote_settings.max_frame_size, stream.id, 4455 stream.remoteWindowSize); 4456 4457 return min(min(min(requested_window_size, stream.remoteWindowSize), remote_window_size), cast(int)remote_settings.max_frame_size); 4458 } 4459 4460 /* 4461 * Now we have SETTINGS synchronization, flow control error can be 4462 * detected strictly. If DATA frame is received with length > 0 and 4463 * current received window size + delta length is strictly larger than 4464 * local window size, it is subject to FLOW_CONTROL_ERROR, so return 4465 * false. Note that local_window_size is calculated after SETTINGS ACK is 4466 * received from peer, so peer must honor this limit. If the resulting 4467 * recv_window_size is strictly larger than MAX_WINDOW_SIZE, 4468 * return false too. 4469 */ 4470 bool adjustRecvWindowSize(ref int _recv_window_size, size_t delta, int local_window_size) 4471 { 4472 if (_recv_window_size > local_window_size - cast(int)delta || 4473 _recv_window_size > MAX_WINDOW_SIZE - cast(int)delta) 4474 { 4475 return false; 4476 } 4477 _recv_window_size += delta; 4478 return true; 4479 } 4480 /* 4481 * Accumulates received bytes |delta_size| for stream-level flow 4482 * control and decides whether to send WINDOW_UPDATE to that stream. 4483 * If OptionFlags.NO_AUTO_WINDOW_UPDATE is set, WINDOW_UPDATE will not 4484 * be sent. 4485 */ 4486 void updateRecvStreamWindowSize(Stream stream, size_t delta_size, int send_window_update) 4487 { 4488 bool ok = adjustRecvWindowSize(stream.recvWindowSize, delta_size, stream.localWindowSize); 4489 if (!ok) { 4490 addRstStream(stream.id, FrameError.FLOW_CONTROL_ERROR); 4491 return; 4492 } 4493 /* We don't have to send WINDOW_UPDATE if the data received is the last chunk in the incoming stream. */ 4494 if (send_window_update && !(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) { 4495 /* We have to use local_settings here because it is the constraint the remote endpoint should honor. */ 4496 if (shouldSendWindowUpdate(stream.localWindowSize, stream.recvWindowSize)) { 4497 addWindowUpdate(FrameFlags.NONE, stream.id, stream.recvWindowSize); 4498 stream.recvWindowSize = 0; 4499 } 4500 } 4501 } 4502 4503 /* 4504 * Accumulates received bytes |delta_size| for connection-level flow 4505 * control and decides whether to send WINDOW_UPDATE to the 4506 * connection. If OptionFlags.NO_AUTO_WINDOW_UPDATE is set, 4507 * WINDOW_UPDATE will not be sent. 4508 */ 4509 ErrorCode updateRecvConnectionWindowSize(size_t delta_size) 4510 { 4511 ErrorCode rv; 4512 bool ok = adjustRecvWindowSize(recv_window_size, delta_size, local_window_size); 4513 if (!ok) { 4514 return terminateSession(FrameError.FLOW_CONTROL_ERROR); 4515 } 4516 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) 4517 { 4518 4519 if (shouldSendWindowUpdate(local_window_size, recv_window_size)) 4520 { 4521 /* Use stream ID 0 to update connection-level flow control window */ 4522 addWindowUpdate(FrameFlags.NONE, 0, recv_window_size); 4523 recv_window_size = 0; 4524 } 4525 } 4526 return ErrorCode.OK; 4527 } 4528 4529 ErrorCode updateConsumedSize(ref int consumed_size, ref int recv_window_size, int stream_id, size_t delta_size, int local_window_size) 4530 { 4531 int recv_size; 4532 ErrorCode rv; 4533 4534 if (cast(size_t)consumed_size > MAX_WINDOW_SIZE - delta_size) 4535 { 4536 return terminateSession(FrameError.FLOW_CONTROL_ERROR); 4537 } 4538 4539 consumed_size += delta_size; 4540 4541 /* recv_window_size may be smaller than consumed_size, because it may be decreased by negative value with http2_submit_window_update(). */ 4542 recv_size = min(consumed_size, recv_window_size); 4543 4544 if (shouldSendWindowUpdate(local_window_size, recv_size)) 4545 { 4546 addWindowUpdate(FrameFlags.NONE, stream_id, recv_size); 4547 recv_window_size -= recv_size; 4548 consumed_size -= recv_size; 4549 } 4550 4551 return ErrorCode.OK; 4552 } 4553 4554 ErrorCode updateStreamConsumedSize(Stream stream, size_t delta_size) 4555 { 4556 return updateConsumedSize(stream.consumedSize, stream.recvWindowSize, stream.id, delta_size, stream.localWindowSize); 4557 } 4558 4559 ErrorCode updateConnectionConsumedSize(size_t delta_size) 4560 { 4561 return updateConsumedSize(consumed_size, recv_window_size, 0, delta_size, local_window_size); 4562 } 4563 4564 4565 /* 4566 * Returns the maximum length of next data read. If the 4567 * connection-level and/or stream-wise flow control are enabled, the 4568 * return value takes into account those current window sizes. The remote 4569 * settings for max frame size is also taken into account. 4570 */ 4571 int nextDataRead(Stream stream) 4572 { 4573 int window_size; 4574 window_size = enforceFlowControlLimits(stream, DATA_PAYLOADLEN); 4575 4576 LOGF("send: available window=%d", window_size); 4577 4578 return window_size > 0 ? window_size : 0; 4579 } 4580 4581 int callSelectPadding(in Frame frame, size_t max_payloadlen) 4582 { 4583 int rv; 4584 4585 if (frame.hd.length >= max_payloadlen) { 4586 return frame.hd.length; 4587 } 4588 4589 int max_paddedlen = cast(int) min(frame.hd.length + MAX_PADLEN, max_payloadlen); 4590 4591 try rv = connector.selectPaddingLength(frame, max_paddedlen); 4592 catch (Exception e) return cast(int) ErrorCode.CALLBACK_FAILURE; 4593 if (rv < cast(int)frame.hd.length || rv > cast(int)max_paddedlen) { 4594 return cast(int) ErrorCode.CALLBACK_FAILURE; 4595 } 4596 return rv; 4597 } 4598 4599 ErrorCode callWriteData(OutboundItem item, Buffers framebufs) 4600 { 4601 Buffer* buf = &framebufs.cur.buf; 4602 Frame* frame = &item.frame; 4603 uint length = frame.hd.length - frame.data.padlen; 4604 FrameHeader hd; 4605 hd.unpack(buf.pos[0 .. FRAME_HDLEN]); 4606 ErrorCode rv = connector.writeData(*frame, buf.pos[0 .. FRAME_HDLEN], length); 4607 4608 if (rv == ErrorCode.OK || rv == ErrorCode.WOULDBLOCK || rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) 4609 return rv; 4610 return ErrorCode.CALLBACK_FAILURE; 4611 4612 } 4613 4614 bool callOnFrameReady(in Frame frame) 4615 { 4616 try return connector.onFrameReady(frame); 4617 catch(Exception e) return false; 4618 } 4619 4620 bool callOnFrameSent(in Frame frame) 4621 { 4622 try return connector.onFrameSent(frame); 4623 catch(Exception e) return false; 4624 } 4625 4626 bool callOnFrameHeader(in FrameHeader hd) 4627 { 4628 try return connector.onFrameHeader(hd); 4629 catch(Exception e) return false; 4630 } 4631 4632 bool callOnHeaders(in Frame frame) 4633 { 4634 LOGF("recv: call onHeaders callback stream_id=%d", frame.hd.stream_id); 4635 try return connector.onHeaders(frame); 4636 catch(Exception e) return false; 4637 } 4638 4639 bool callOnHeaderField(in Frame frame, in HeaderField hf, ref bool pause, ref bool close) 4640 { 4641 try return connector.onHeaderField(frame, hf, pause, close); 4642 catch(Exception e) return false; 4643 } 4644 4645 int callRead(ubyte[] buf) 4646 { 4647 int len; 4648 try len = connector.read(buf); 4649 catch(Exception e) return ErrorCode.CALLBACK_FAILURE; 4650 4651 if (len > 0) { 4652 if (cast(size_t) len > buf.length) 4653 return ErrorCode.CALLBACK_FAILURE; 4654 } else if (len < 0 && len != cast(int) ErrorCode.WOULDBLOCK && len != cast(int)ErrorCode.EOF) 4655 return ErrorCode.CALLBACK_FAILURE; 4656 4657 return len; 4658 } 4659 4660 bool callOnFrame(in Frame frame) 4661 { 4662 try return connector.onFrame(frame); 4663 catch(Exception e) return false; 4664 } 4665 4666 /* 4667 * Checks that we can receive the DATA frame for stream, which is 4668 * indicated by |session.iframe.frame.hd.stream_id|. If it is a 4669 * connection error situation, GOAWAY frame will be issued by this 4670 * function. 4671 * 4672 * If the DATA frame is allowed, returns 0. 4673 * 4674 * This function returns 0 if it succeeds, or one of the following 4675 * negative error codes: 4676 * 4677 * ErrorCode.IGN_PAYLOAD 4678 * The reception of DATA frame is connection error; or should be 4679 * ignored. 4680 */ 4681 ErrorCode onDataFailFast() 4682 { 4683 ErrorCode rv; 4684 Stream stream; 4685 int stream_id; 4686 string failure_reason; 4687 FrameError error_code = FrameError.PROTOCOL_ERROR; 4688 4689 stream_id = iframe.frame.hd.stream_id; 4690 4691 if (stream_id == 0) { 4692 /* The spec says that if a DATA frame is received whose stream ID 4693 is 0, the recipient MUST respond with a connection error of 4694 type PROTOCOL_ERROR. */ 4695 failure_reason = "DATA: stream_id == 0"; 4696 goto fail; 4697 } 4698 stream = getStream(stream_id); 4699 if (!stream) { 4700 if (idleStreamDetect(stream_id)) 4701 { 4702 failure_reason = "DATA: stream in idle"; 4703 error_code = FrameError.STREAM_CLOSED; 4704 goto fail; 4705 } 4706 return ErrorCode.IGN_PAYLOAD; 4707 } 4708 if (stream.shutFlags & ShutdownFlag.RD) { 4709 failure_reason = "DATA: stream in half-closed(remote)"; 4710 error_code = FrameError.STREAM_CLOSED; 4711 goto fail; 4712 } 4713 4714 if (isMyStreamId(stream_id)) { 4715 if (stream.state == StreamState.CLOSING) { 4716 return ErrorCode.IGN_PAYLOAD; 4717 } 4718 if (stream.state != StreamState.OPENED) { 4719 failure_reason = "DATA: stream not opened"; 4720 goto fail; 4721 } 4722 return ErrorCode.OK; 4723 } 4724 if (stream.state == StreamState.RESERVED) { 4725 failure_reason = "DATA: stream in reserved"; 4726 goto fail; 4727 } 4728 if (stream.state == StreamState.CLOSING) { 4729 return ErrorCode.IGN_PAYLOAD; 4730 } 4731 return ErrorCode.OK; 4732 fail: 4733 rv = terminateSessionWithReason(error_code, failure_reason); 4734 if (isFatal(rv)) { 4735 return rv; 4736 } 4737 return ErrorCode.IGN_PAYLOAD; 4738 } 4739 4740 4741 ErrorCode afterHeaderBlockReceived() 4742 { 4743 ErrorCode rv; 4744 bool call_cb = true; 4745 Frame* frame = &iframe.frame; 4746 Stream stream; 4747 4748 /* We don't call Connector.onFrame if stream has been closed already or being closed. */ 4749 stream = getStream(frame.hd.stream_id); 4750 if (!stream || stream.state == StreamState.CLOSING) 4751 { 4752 return ErrorCode.OK; 4753 } 4754 4755 if (isHTTPMessagingEnabled()) { 4756 if (frame.hd.type == FrameType.PUSH_PROMISE) { 4757 Stream subject_stream; 4758 4759 subject_stream = getStream(frame.push_promise.promised_stream_id); 4760 if (subject_stream) { 4761 if (!subject_stream.onRequestHeaders(*frame)) 4762 rv = ErrorCode.ERROR; 4763 } 4764 } else { 4765 assert(frame.hd.type == FrameType.HEADERS); 4766 with(HeadersCategory) switch (frame.headers.cat) { 4767 case REQUEST: 4768 if (!stream.onRequestHeaders(*frame)) 4769 rv = ErrorCode.ERROR; 4770 break; 4771 case RESPONSE: 4772 case PUSH_RESPONSE: 4773 if (!stream.onResponseHeaders()) 4774 rv = ErrorCode.ERROR; 4775 break; 4776 case HEADERS: 4777 if (stream.httpFlags & HTTPFlags.EXPECT_FINAL_RESPONSE) { 4778 assert(!is_server); 4779 if (!stream.onResponseHeaders()) 4780 rv = ErrorCode.ERROR; 4781 } else { 4782 if (!stream.validateTrailerHeaders(*frame)) 4783 rv = ErrorCode.ERROR; 4784 } 4785 break; 4786 default: 4787 assert(0); 4788 } 4789 if (rv == 0 && (frame.hd.flags & FrameFlags.END_STREAM)) { 4790 if (!stream.validateRemoteEndStream()) 4791 rv = ErrorCode.ERROR; 4792 } 4793 } 4794 if (rv != ErrorCode.OK) { 4795 int stream_id; 4796 4797 if (frame.hd.type == FrameType.PUSH_PROMISE) { 4798 stream_id = frame.push_promise.promised_stream_id; 4799 } else { 4800 stream_id = frame.hd.stream_id; 4801 } 4802 4803 call_cb = false; 4804 4805 handleInvalidStream2(stream_id, *frame, FrameError.PROTOCOL_ERROR); 4806 } 4807 } 4808 4809 if (call_cb) { 4810 bool ok = callOnFrame(*frame); 4811 if (!ok) 4812 return ErrorCode.CALLBACK_FAILURE; 4813 } 4814 4815 if (frame.hd.type != FrameType.HEADERS) { 4816 return ErrorCode.OK; 4817 } 4818 4819 switch (frame.headers.cat) { 4820 case HeadersCategory.REQUEST: 4821 endRequestHeadersReceived(*frame, stream); 4822 return ErrorCode.OK; 4823 case HeadersCategory.RESPONSE: 4824 case HeadersCategory.PUSH_RESPONSE: 4825 return endResponseHeadersReceived(*frame, stream); 4826 case HeadersCategory.HEADERS: 4827 return endHeadersReceived(*frame, stream); 4828 default: 4829 assert(0); 4830 } 4831 } 4832 4833 ErrorCode processHeadersFrame() 4834 { 4835 Frame* frame = &iframe.frame; 4836 Stream stream; 4837 4838 frame.headers.unpack(iframe.sbuf[]); 4839 4840 stream = getStream(frame.hd.stream_id); 4841 if (!stream) { 4842 frame.headers.cat = HeadersCategory.REQUEST; 4843 return onRequestHeaders(*frame); 4844 } 4845 4846 if (isMyStreamId(frame.hd.stream_id)) 4847 { 4848 if (stream.state == StreamState.OPENING) { 4849 frame.headers.cat = HeadersCategory.RESPONSE; 4850 return onResponseHeaders(*frame, stream); 4851 } 4852 frame.headers.cat = HeadersCategory.HEADERS; 4853 return onHeaders(*frame, stream); 4854 } 4855 if (stream.state == StreamState.RESERVED) { 4856 frame.headers.cat = HeadersCategory.PUSH_RESPONSE; 4857 return onPushResponseHeaders(*frame, stream); 4858 } 4859 frame.headers.cat = HeadersCategory.HEADERS; 4860 return onHeaders(*frame, stream); 4861 } 4862 4863 ErrorCode processPriorityFrame() 4864 { 4865 Frame* frame = &iframe.frame; 4866 4867 frame.priority.unpack(iframe.sbuf[]); 4868 4869 return onPriority(*frame); 4870 } 4871 4872 ErrorCode processRstStreamFrame() 4873 { 4874 Frame* frame = &iframe.frame; 4875 4876 frame.rst_stream.unpack(iframe.sbuf[]); 4877 4878 return onRstStream(*frame); 4879 } 4880 4881 4882 ErrorCode processSettingsFrame() 4883 { 4884 Frame* frame = &iframe.frame; 4885 size_t i; 4886 Setting min_header_size_entry; 4887 min_header_size_entry = iframe.iva[INBOUND_NUM_IV - 1]; 4888 4889 if (min_header_size_entry.value < uint.max) { 4890 /* If we have less value, then we must have Setting.HEADER_TABLE_SIZE in i < iframe.niv */ 4891 for (i = 0; i < iframe.niv; ++i) { 4892 if (iframe.iva[i].id == Setting.HEADER_TABLE_SIZE) { 4893 break; 4894 } 4895 } 4896 4897 assert(i < iframe.niv); 4898 4899 if (min_header_size_entry.value != iframe.iva[i].value) { 4900 iframe.iva[iframe.niv++] = iframe.iva[i]; 4901 iframe.iva[i] = min_header_size_entry; 4902 } 4903 } 4904 4905 frame.settings.unpack(iframe.iva[0 .. iframe.niv]); 4906 4907 return onSettings(*frame, false /* ACK */); 4908 } 4909 4910 ErrorCode processPushPromiseFrame() 4911 { 4912 Frame* frame = &iframe.frame; 4913 4914 frame.push_promise.unpack(iframe.sbuf[]); 4915 4916 return onPushPromise(*frame); 4917 } 4918 4919 ErrorCode processPingFrame() 4920 { 4921 Frame* frame = &iframe.frame; 4922 4923 frame.ping.unpack(iframe.sbuf[]); 4924 4925 return onPing(*frame); 4926 } 4927 4928 ErrorCode processGoAwayFrame() 4929 { 4930 Frame* frame = &iframe.frame; 4931 4932 frame.goaway.unpack(iframe.sbuf[], iframe.lbuf[]); 4933 4934 iframe.lbuf = Buffer(null); 4935 4936 return onGoAway(*frame); 4937 } 4938 4939 ErrorCode processWindowUpdateFrame() 4940 { 4941 Frame* frame = &iframe.frame; 4942 4943 frame.window_update.unpack(iframe.sbuf[]); 4944 4945 return onWindowUpdate(*frame); 4946 } 4947 4948 /* For errors, this function only returns FATAL error. */ 4949 ErrorCode processDataFrame() 4950 { 4951 ErrorCode rv; 4952 rv = onData(iframe.frame); 4953 if (isFatal(rv)) { 4954 return rv; 4955 } 4956 return ErrorCode.OK; 4957 } 4958 4959 ErrorCode handleInvalidStream(Frame frame, FrameError error_code) { 4960 4961 return handleInvalidStream2(frame.hd.stream_id, frame, error_code); 4962 } 4963 4964 ErrorCode handleInvalidStream2(int stream_id, Frame frame, FrameError error_code) { 4965 4966 addRstStream(stream_id, error_code); 4967 4968 try 4969 if (!connector.onInvalidFrame(frame, error_code)) 4970 return ErrorCode.CALLBACK_FAILURE; 4971 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 4972 4973 return ErrorCode.OK; 4974 } 4975 4976 ErrorCode handleInflateInvalidStream(Frame frame, FrameError error_code) { 4977 ErrorCode rv; 4978 rv = handleInvalidStream(frame, error_code); 4979 if (isFatal(rv)) { 4980 return rv; 4981 } 4982 return ErrorCode.IGN_HEADER_BLOCK; 4983 } 4984 4985 /* 4986 * Handles invalid frame which causes connection error. 4987 */ 4988 ErrorCode handleInvalidConnection(Frame frame, FrameError error_code, string reason) 4989 { 4990 try 4991 if (!connector.onInvalidFrame(frame, error_code)) 4992 return ErrorCode.CALLBACK_FAILURE; 4993 catch (Exception e) return ErrorCode.CALLBACK_FAILURE; 4994 4995 return terminateSessionWithReason(error_code, reason); 4996 } 4997 4998 ErrorCode handleInflateInvalidConnection(Frame frame, FrameError error_code, string reason) { 4999 ErrorCode rv; 5000 rv = handleInvalidConnection(frame, error_code, reason); 5001 if (isFatal(rv)) { 5002 return rv; 5003 } 5004 return ErrorCode.IGN_HEADER_BLOCK; 5005 } 5006 5007 5008 /* Add padding to HEADERS or PUSH_PROMISE. We use frame.headers.padlen in this function 5009 * to use the fact that frame.push_promise has also padlen in the same position. */ 5010 ErrorCode headersAddPad(Frame frame) 5011 { 5012 ErrorCode rv; 5013 int padded_payloadlen; 5014 Buffers framebufs = aob.framebufs; 5015 int padlen; 5016 int max_payloadlen; 5017 5018 max_payloadlen = min(MAX_PAYLOADLEN, frame.hd.length + MAX_PADLEN); 5019 5020 padded_payloadlen = callSelectPadding(frame, max_payloadlen); 5021 5022 if (isFatal(padded_payloadlen)) { 5023 return cast(ErrorCode)padded_payloadlen; 5024 } 5025 5026 padlen = padded_payloadlen - frame.hd.length; 5027 5028 LOGF("send: padding selected: payloadlen=%d, padlen=%d", padded_payloadlen, padlen); 5029 5030 frame.hd.addPad(framebufs, padlen, false); 5031 5032 frame.headers.padlen = padlen; 5033 5034 return ErrorCode.OK; 5035 } 5036 5037 size_t estimateHeadersPayload(in HeaderField[] hfa, size_t additional) 5038 { 5039 return hd_deflater.upperBound(hfa) + additional; 5040 } 5041 5042 /* 5043 * Updates the remote initial window size of all active streams. If 5044 * error occurs, all streams may not be updated. 5045 * 5046 */ 5047 ErrorCode updateRemoteInitialWindowSize(int new_initial_window_size) 5048 { 5049 ErrorCode rv; 5050 auto new_window_size = new_initial_window_size; 5051 auto old_window_size = remote_settings.initial_window_size; 5052 5053 foreach (stream; streams) 5054 { 5055 5056 bool ok = stream.updateRemoteInitialWindowSize(new_window_size, old_window_size); 5057 if (!ok) 5058 return terminateSession(FrameError.FLOW_CONTROL_ERROR); 5059 5060 /* If window size gets positive, push deferred DATA frame to outbound queue. */ 5061 if (stream.remoteWindowSize > 0 && stream.isDeferredByFlowControl()) 5062 stream.resumeDeferredItem(StreamFlags.DEFERRED_FLOW_CONTROL, this); 5063 5064 } 5065 5066 return rv; 5067 } 5068 5069 5070 /* 5071 * Updates the local initial window size of all active streams. If 5072 * error occurs, all streams may not be updated. 5073 */ 5074 ErrorCode updateLocalInitialWindowSize(int new_initial_window_size, int old_initial_window_size) 5075 { 5076 ErrorCode rv; 5077 auto new_window_size = new_initial_window_size; 5078 auto old_window_size = old_initial_window_size; 5079 5080 foreach(stream; streams) { 5081 if (!stream.updateLocalInitialWindowSize(new_window_size, old_window_size)) 5082 return terminateSession(FrameError.FLOW_CONTROL_ERROR); 5083 5084 if (!(opt_flags & OptionsMask.NO_AUTO_WINDOW_UPDATE)) { 5085 5086 if (shouldSendWindowUpdate(stream.localWindowSize, stream.recvWindowSize)) { 5087 5088 addWindowUpdate(FrameFlags.NONE, stream.id, stream.recvWindowSize); 5089 stream.recvWindowSize = 0; 5090 } 5091 } 5092 } 5093 return rv; 5094 } 5095 5096 /* 5097 * Returns the number of active streams, which includes streams in 5098 * reserved state. 5099 */ 5100 size_t getNumActiveStreams() { 5101 return streams.length - num_closed_streams - num_idle_streams; 5102 } 5103 5104 /* Closes non-idle and non-closed streams whose stream ID > last_stream_id. 5105 * If incoming is nonzero, we are going to close incoming streams. 5106 * Otherwise, close outgoing streams. */ 5107 ErrorCode closeStreamOnGoAway(int last_stream_id, bool incoming) 5108 { 5109 ErrorCode rv; 5110 5111 foreach(stream; streams) { 5112 5113 if ((!isMyStreamId(stream.id) && !incoming) || (isMyStreamId(stream.id) && incoming)) 5114 continue; 5115 5116 if (stream.state != StreamState.IDLE && !(stream.flags & StreamFlags.CLOSED) && stream.id > last_stream_id) 5117 { 5118 rv = closeStream(stream.id, FrameError.REFUSED_STREAM); 5119 if (isFatal(rv)) 5120 return rv; 5121 } 5122 } 5123 5124 return rv; 5125 } 5126 5127 void cycleWeightOutboundItem(OutboundItem item, int ini_weight) 5128 { 5129 if (item.weight == MIN_WEIGHT || item.weight > ini_weight) { 5130 5131 item.weight = ini_weight; 5132 5133 if (item.cycle == last_cycle) { 5134 item.cycle = ++last_cycle; 5135 } else { 5136 item.cycle = last_cycle; 5137 } 5138 } else { 5139 --item.weight; 5140 } 5141 } 5142 5143 /* 5144 * This function serializes frame for transmission. 5145 * 5146 * This function returns 0 if it succeeds, or one of negative error 5147 * codes, including both fatal and non-fatal ones. 5148 */ 5149 ErrorCode prepareFrame(OutboundItem item) 5150 { 5151 ErrorCode rv; 5152 Frame* frame = &item.frame; 5153 if (frame.hd.type != FrameType.DATA) { 5154 with(FrameType) switch (frame.hd.type) { 5155 case HEADERS: { 5156 HeadersAuxData *aux_data; 5157 size_t estimated_payloadlen; 5158 5159 aux_data = &item.aux_data.headers; 5160 5161 estimated_payloadlen = estimateHeadersPayload(frame.headers.hfa, PRIORITY_SPECLEN); 5162 5163 if (estimated_payloadlen > MAX_HEADERSLEN) { 5164 return ErrorCode.FRAME_SIZE_ERROR; 5165 } 5166 5167 if (frame.headers.cat == HeadersCategory.REQUEST) { 5168 /* initial HEADERS, which opens stream */ 5169 Stream stream = openStream(frame.hd.stream_id, StreamFlags.NONE, frame.headers.pri_spec, StreamState.INITIAL, aux_data.stream_user_data); 5170 5171 rv = predicateRequestHeadersSend(item); 5172 if (rv != ErrorCode.OK) { 5173 return rv; 5174 } 5175 5176 if (isHTTPMessagingEnabled()) { 5177 stream.setRequestMethod(*frame); 5178 } 5179 } else { 5180 Stream stream = getStream(frame.hd.stream_id); 5181 5182 if (predicatePushResponseHeadersSend(stream) == 0) 5183 { 5184 frame.headers.cat = HeadersCategory.PUSH_RESPONSE; 5185 if (aux_data.stream_user_data) 5186 stream.userData = aux_data.stream_user_data; 5187 } else if (predicateResponseHeadersSend(stream) == 0) { 5188 frame.headers.cat = HeadersCategory.RESPONSE; 5189 } else { 5190 frame.headers.cat = HeadersCategory.HEADERS; 5191 5192 rv = predicateHeadersSend(stream); 5193 5194 if (rv != ErrorCode.OK) { 5195 if (stream && stream.item == item) 5196 stream.detachItem(this); 5197 return rv; 5198 } 5199 } 5200 } 5201 5202 rv = frame.headers.pack(aob.framebufs, hd_deflater); 5203 5204 if (rv != ErrorCode.OK) { 5205 return rv; 5206 } 5207 5208 LOGF("send: before padding, HEADERS serialized in %d bytes", aob.framebufs.length); 5209 5210 rv = headersAddPad(*frame); 5211 5212 if (rv != ErrorCode.OK) { 5213 return rv; 5214 } 5215 5216 LOGF("send: HEADERS finally serialized in %d bytes", aob.framebufs.length); 5217 5218 break; 5219 } 5220 case PRIORITY: { 5221 if (isClosing()) { 5222 return ErrorCode.SESSION_CLOSING; 5223 } 5224 /* PRIORITY frame can be sent at any time and to any stream ID. */ 5225 frame.priority.pack(aob.framebufs); 5226 5227 /* Peer can send PRIORITY frame against idle stream to create 5228 "anchor" in dependency tree. Only client can do this in 5229 libhttp2. In libhttp2, only server retains non-active (closed 5230 or idle) streams in memory, so we don't open stream here. */ 5231 break; 5232 } 5233 case RST_STREAM: 5234 if (isClosing()) { 5235 return ErrorCode.SESSION_CLOSING; 5236 } 5237 frame.rst_stream.pack(aob.framebufs); 5238 break; 5239 case SETTINGS: { 5240 rv = frame.settings.pack(aob.framebufs); 5241 if (rv != ErrorCode.OK) { 5242 return rv; 5243 } 5244 break; 5245 } 5246 case PUSH_PROMISE: { 5247 Stream stream; 5248 HeadersAuxData *aux_data; 5249 PrioritySpec pri_spec; 5250 size_t estimated_payloadlen; 5251 5252 aux_data = &item.aux_data.headers; 5253 5254 stream = getStream(frame.hd.stream_id); 5255 5256 /* stream could be null if associated stream was already closed. */ 5257 if (stream) 5258 pri_spec = PrioritySpec(stream.id, DEFAULT_WEIGHT, 0); 5259 5260 openStream(frame.push_promise.promised_stream_id, StreamFlags.NONE, pri_spec, StreamState.RESERVED, aux_data.stream_user_data); 5261 5262 estimated_payloadlen = estimateHeadersPayload(frame.push_promise.hfa, 0); 5263 5264 if (estimated_payloadlen > MAX_HEADERSLEN) 5265 return ErrorCode.FRAME_SIZE_ERROR; 5266 5267 /* predicte should fail if stream is null. */ 5268 rv = predicatePushPromiseSend(stream); 5269 if (rv != ErrorCode.OK) { 5270 return rv; 5271 } 5272 5273 assert(stream); 5274 5275 rv = frame.push_promise.pack(aob.framebufs, hd_deflater); 5276 if (rv != 0) 5277 return rv; 5278 5279 rv = headersAddPad(*frame); 5280 if (rv != 0) 5281 return rv; 5282 5283 break; 5284 } 5285 case PING: 5286 if (isClosing()) { 5287 return ErrorCode.SESSION_CLOSING; 5288 } 5289 frame.ping.pack(aob.framebufs); 5290 break; 5291 case WINDOW_UPDATE: { 5292 rv = predicateWindowUpdateSend(frame.hd.stream_id); 5293 if (rv != ErrorCode.OK) { 5294 return rv; 5295 } 5296 frame.window_update.pack(aob.framebufs); 5297 break; 5298 } 5299 case GOAWAY: 5300 rv = frame.goaway.pack(aob.framebufs); 5301 if (rv != ErrorCode.OK) { 5302 return rv; 5303 } 5304 local_last_stream_id = frame.goaway.last_stream_id; 5305 5306 break; 5307 default: 5308 return ErrorCode.INVALID_ARGUMENT; 5309 } 5310 return ErrorCode.OK; 5311 } else { 5312 int next_readmax; 5313 Stream stream = getStream(frame.hd.stream_id); 5314 5315 if (stream) { 5316 assert(stream.item == item); 5317 } 5318 5319 rv = predicateDataSend(stream); 5320 if (rv != ErrorCode.OK) { 5321 if (stream) 5322 stream.detachItem(this); 5323 5324 return rv; 5325 } 5326 /* Assuming stream is not null */ 5327 assert(stream); 5328 next_readmax = nextDataRead(stream); 5329 5330 if (next_readmax == 0) { 5331 5332 /* This must be true since we only pop DATA frame item from queue when session.remote_window_size > 0 */ 5333 assert(remote_window_size > 0); 5334 5335 stream.deferItem(StreamFlags.DEFERRED_FLOW_CONTROL, this); 5336 aob.item = null; 5337 aob.reset(); 5338 return ErrorCode.DEFERRED; 5339 } 5340 5341 rv = packData(aob.framebufs, next_readmax, *frame, item.aux_data.data); 5342 if (rv == ErrorCode.DEFERRED) { 5343 stream.deferItem(StreamFlags.DEFERRED_USER, this); 5344 aob.item = null; 5345 aob.reset(); 5346 return ErrorCode.DEFERRED; 5347 } 5348 if (rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) { 5349 stream.detachItem(this); 5350 addRstStream(frame.hd.stream_id, FrameError.INTERNAL_ERROR); 5351 return ErrorCode.TEMPORAL_CALLBACK_FAILURE; 5352 } 5353 if (rv != 0) { 5354 stream.detachItem(this); 5355 return rv; 5356 } 5357 return ErrorCode.OK; 5358 } 5359 } 5360 5361 /* 5362 * Called after a frame is sent. This function runs 5363 * $(D Connector.onFrameSent) and handles stream closure upon END_STREAM 5364 * or RST_STREAM. This function does not reset aob. It is a 5365 * responsibility of $(D resetActiveOutboundItem). 5366 * 5367 * This function returns 0 if it succeeds, or one of the following 5368 * negative error codes: 5369 * 5370 * ErrorCode.CALLBACK_FAILURE 5371 * The callback function failed. 5372 */ 5373 ErrorCode afterFrameSent() 5374 { 5375 ErrorCode rv; 5376 OutboundItem item = aob.item; 5377 Buffers framebufs = aob.framebufs; 5378 Frame* frame = &item.frame; 5379 5380 if (frame.hd.type != FrameType.DATA) { 5381 5382 if (frame.hd.type == FrameType.HEADERS || frame.hd.type == FrameType.PUSH_PROMISE) { 5383 5384 if (framebufs.nextPresent()) { 5385 LOGF("send: CONTINUATION exists, just return"); 5386 return ErrorCode.OK; 5387 } 5388 } 5389 bool ok = callOnFrameSent(*frame); 5390 if (!ok) { 5391 return ErrorCode.CALLBACK_FAILURE; 5392 } 5393 with(FrameType) switch (frame.hd.type) { 5394 case HEADERS: { 5395 HeadersAuxData *aux_data; 5396 Stream stream = getStream(frame.hd.stream_id); 5397 if (!stream) 5398 break; 5399 if (stream.item == item) 5400 stream.detachItem(this); 5401 5402 final switch (frame.headers.cat) { 5403 case HeadersCategory.REQUEST: { 5404 stream.state = StreamState.OPENING; 5405 if (frame.hd.flags & FrameFlags.END_STREAM) { 5406 stream.shutdown(ShutdownFlag.WR); 5407 } 5408 rv = closeStreamIfShutRdWr(stream); 5409 if (isFatal(rv)) { 5410 return rv; 5411 } 5412 /* We assume aux_data is a pointer to HeadersAuxData */ 5413 aux_data = &item.aux_data.headers; 5414 if (aux_data.data_prd) { 5415 /* submitData() makes a copy of aux_data.data_prd */ 5416 rv = submitData(this, FrameFlags.END_STREAM, frame.hd.stream_id, aux_data.data_prd); 5417 if (isFatal(rv)) { 5418 return rv; 5419 } 5420 /* TODO: submitData() may fail if stream has already DATA frame item. We might have to handle it here. */ 5421 } 5422 break; 5423 } 5424 case HeadersCategory.PUSH_RESPONSE: 5425 stream.flags = cast(StreamFlags)(stream.flags & ~StreamFlags.PUSH); 5426 ++num_outgoing_streams; 5427 goto case HeadersCategory.RESPONSE; 5428 case HeadersCategory.RESPONSE: 5429 stream.state = StreamState.OPENED; 5430 goto case HeadersCategory.HEADERS; 5431 case HeadersCategory.HEADERS: 5432 if (frame.hd.flags & FrameFlags.END_STREAM) { 5433 stream.shutdown(ShutdownFlag.WR); 5434 } 5435 rv = closeStreamIfShutRdWr(stream); 5436 if (isFatal(rv)) { 5437 return rv; 5438 } 5439 /* We assume aux_data is a pointer to HeadersAuxData */ 5440 aux_data = &item.aux_data.headers; 5441 if (aux_data.data_prd) { 5442 rv = submitData(this, FrameFlags.END_STREAM, frame.hd.stream_id, aux_data.data_prd); 5443 if (isFatal(rv)) { 5444 return rv; 5445 } 5446 /* TODO submitData() may fail if stream has already DATA frame item. 5447 * We might have to handle it here. */ 5448 } 5449 break; 5450 } 5451 break; 5452 } 5453 case PRIORITY: { 5454 Stream stream; 5455 5456 if (is_server) { 5457 break; 5458 } 5459 5460 stream = getStreamRaw(frame.hd.stream_id); 5461 5462 if (!stream) { 5463 break; 5464 } 5465 5466 reprioritizeStream(stream, frame.priority.pri_spec); 5467 5468 break; 5469 } 5470 case RST_STREAM: 5471 rv = closeStream(frame.hd.stream_id, frame.rst_stream.error_code); 5472 if (isFatal(rv)) { 5473 return rv; 5474 } 5475 break; 5476 case GOAWAY: { 5477 GoAwayAuxData aux_data = item.aux_data.goaway; 5478 5479 if ((aux_data.flags & GoAwayAuxFlags.SHUTDOWN_NOTICE) == 0) { 5480 5481 if (aux_data.flags & GoAwayAuxFlags.TERM_ON_SEND) { 5482 goaway_flags |= GoAwayFlags.TERM_SENT; 5483 } 5484 5485 goaway_flags |= GoAwayFlags.SENT; 5486 5487 rv = closeStreamOnGoAway(frame.goaway.last_stream_id, true); 5488 5489 if (isFatal(rv)) { 5490 return rv; 5491 } 5492 } 5493 5494 break; 5495 } 5496 default: 5497 break; 5498 } 5499 5500 return ErrorCode.OK; 5501 } 5502 5503 Stream stream = getStream(frame.hd.stream_id); 5504 DataAuxData *aux_data = &item.aux_data.data; 5505 /* We update flow control window after a frame was completely 5506 sent. This is possible because we choose payload length not to 5507 exceed the window */ 5508 remote_window_size -= frame.hd.length; 5509 if (stream) { 5510 stream.remoteWindowSize = stream.remoteWindowSize - frame.hd.length; 5511 } 5512 5513 if (stream && aux_data.eof) { 5514 stream.detachItem(this); 5515 5516 /* Call onFrameSent after detachItem(), so that application can issue submitData() in the callback. */ 5517 bool ok = callOnFrameSent(*frame); 5518 if (!ok) { 5519 return ErrorCode.CALLBACK_FAILURE; 5520 } 5521 5522 if (frame.hd.flags & FrameFlags.END_STREAM) { 5523 int stream_closed; 5524 5525 stream_closed = (stream.shutFlags & ShutdownFlag.RDWR) == ShutdownFlag.RDWR; 5526 5527 stream.shutdown(ShutdownFlag.WR); 5528 5529 rv = closeStreamIfShutRdWr(stream); 5530 if (isFatal(rv)) { 5531 return rv; 5532 } 5533 /* stream may be null if it was closed */ 5534 if (stream_closed) 5535 stream = null; 5536 } 5537 return ErrorCode.OK; 5538 } 5539 5540 bool ok = callOnFrameSent(*frame); 5541 5542 if (!ok) { 5543 return ErrorCode.CALLBACK_FAILURE; 5544 } 5545 5546 return ErrorCode.OK; 5547 } 5548 5549 /* 5550 * Called after a frame is sent and after $(D afterFrameSent). 5551 * This function is responsible for resetting aob. 5552 * 5553 * This function returns 0 if it succeeds, or one of the following 5554 * negative error codes: 5555 * 5556 * ErrorCode.CALLBACK_FAILURE 5557 * The callback function failed. 5558 */ 5559 ErrorCode resetActiveOutboundItem() 5560 { 5561 ErrorCode rv; 5562 OutboundItem item = aob.item; 5563 Buffers framebufs = aob.framebufs; 5564 Frame* frame = &item.frame; 5565 5566 if (frame.hd.type != FrameType.DATA) { 5567 5568 if (frame.hd.type == FrameType.HEADERS || 5569 frame.hd.type == FrameType.PUSH_PROMISE) { 5570 5571 if (framebufs.nextPresent()) { 5572 framebufs.cur = framebufs.cur.next; 5573 5574 LOGF("send: next CONTINUATION frame, %d bytes", framebufs.cur.buf.length); 5575 5576 return ErrorCode.OK; 5577 } 5578 } 5579 5580 aob.reset(); 5581 5582 return ErrorCode.OK; 5583 5584 } 5585 5586 OutboundItem next_item; 5587 Stream stream; 5588 DataAuxData* aux_data = &item.aux_data.data; 5589 5590 /* On EOF, we have already detached data. Please note that 5591 application may issue submitData() in 5592 $(D Connector.onFrameSent) (call from afterFrameSent), 5593 which attach data to stream. We don't want to detach it. */ 5594 if (aux_data.eof) { 5595 aob.reset(); 5596 return ErrorCode.OK; 5597 } 5598 5599 aux_data.no_copy = false; 5600 5601 stream = getStream(frame.hd.stream_id); 5602 5603 /* If Session is closed or RST_STREAM was queued, we won't send further data. */ 5604 if (predicateDataSend(stream) != 0) { 5605 if (stream) 5606 stream.detachItem(this); 5607 aob.reset(); 5608 5609 return ErrorCode.OK; 5610 } 5611 5612 /* Assuming stream is not null */ 5613 assert(stream); 5614 next_item = getNextOutboundItem(); 5615 5616 /* Imagine we hit connection window size limit while sending DATA 5617 frame. If we decrement weight here, its stream might get 5618 inferior share because the other streams' weight is not 5619 decremented because of flow control. */ 5620 if (remote_window_size > 0 || stream.remoteWindowSize <= 0) { 5621 cycleWeightOutboundItem(aob.item, stream.effectiveWeight); 5622 } 5623 5624 /* If priority of this stream is higher or equal to other stream 5625 waiting at the top of the queue, we continue to send this 5626 data. */ 5627 if (stream.dpri == StreamDPRI.TOP && (!next_item || PriorityQueue.compare(item, next_item) < 0)) 5628 { 5629 int next_readmax = nextDataRead(stream); 5630 5631 if (next_readmax == 0) { 5632 5633 if (remote_window_size == 0 && stream.remoteWindowSize > 0) { 5634 5635 /* If DATA cannot be sent solely due to connection level 5636 window size, just push item to queue again. We never pop 5637 DATA item while connection level window size is 0. */ 5638 ob_da_pq.push(aob.item); 5639 5640 if (isFatal(rv)) { 5641 return rv; 5642 } 5643 5644 aob.item.queued = 1; 5645 } else 5646 stream.deferItem(StreamFlags.DEFERRED_FLOW_CONTROL, this); 5647 5648 aob.item = null; 5649 aob.reset(); 5650 5651 return ErrorCode.OK; 5652 } 5653 5654 framebufs.reset(); 5655 5656 rv = packData(framebufs, next_readmax, *frame, item.aux_data.data); 5657 if (isFatal(rv)) { 5658 return rv; 5659 } 5660 5661 if (rv == ErrorCode.DEFERRED) { 5662 stream.deferItem(StreamFlags.DEFERRED_USER, this); 5663 5664 aob.item = null; 5665 aob.reset(); 5666 5667 return ErrorCode.OK; 5668 } 5669 if (rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) 5670 { 5671 /* Stop DATA frame chain and issue RST_STREAM to close the stream. We don't return ErrorCode.TEMPORAL_CALLBACK_FAILURE intentionally. */ 5672 addRstStream(frame.hd.stream_id, FrameError.INTERNAL_ERROR); 5673 stream.detachItem(this); 5674 aob.reset(); 5675 return ErrorCode.OK; 5676 } 5677 5678 assert(rv == 0); 5679 5680 if (aux_data.no_copy) 5681 aob.state = OutboundState.SEND_NO_COPY; 5682 else 5683 aob.state = OutboundState.SEND_DATA; 5684 return ErrorCode.OK; 5685 } 5686 5687 if (stream.dpri == StreamDPRI.TOP) { 5688 ob_da_pq.push(aob.item); 5689 5690 aob.item.queued = true; 5691 } 5692 5693 aob.item = null; 5694 aob.reset(); 5695 return ErrorCode.OK; 5696 } 5697 5698 // fetch data and feed it to data_arr 5699 ErrorCode memSendInternal(ref ubyte[] data_arr, bool fast_cb) 5700 { 5701 ErrorCode rv; 5702 Buffers framebufs = aob.framebufs; 5703 5704 data_arr = null; 5705 5706 for (;;) { 5707 final switch (aob.state) { 5708 case OutboundState.POP_ITEM: { 5709 OutboundItem item; 5710 5711 item = popNextOutboundItem(); 5712 if (!item) { 5713 return ErrorCode.OK; 5714 } 5715 5716 if (item.frame.hd.type == FrameType.DATA || item.frame.hd.type == FrameType.HEADERS) { 5717 Frame* frame = &item.frame; 5718 Stream stream = getStream(frame.hd.stream_id); 5719 5720 if (stream && item == stream.item && stream.dpri != StreamDPRI.TOP) { 5721 /* We have DATA with higher priority in queue within the same dependency tree. */ 5722 break; 5723 } 5724 } 5725 rv = prepareFrame(item); 5726 if (rv == ErrorCode.DEFERRED) { 5727 LOGF("send: frame transmission deferred"); 5728 break; 5729 } 5730 5731 if (rv < 0) { 5732 int opened_stream_id; 5733 FrameError error_code = FrameError.INTERNAL_ERROR; 5734 import libhttp2.helpers : toString; 5735 LOGF("send: frame preparation failed with %s", toString(cast(ErrorCode)rv)); 5736 /* TODO: If the error comes from compressor, the connection must be closed. */ 5737 if (item.frame.hd.type != FrameType.DATA && !isFatal(rv)) { 5738 Frame* frame = &item.frame; 5739 /* The library is responsible for the transmission of WINDOW_UPDATE frame, so we don't call error callback for it. */ 5740 try if (frame.hd.type != FrameType.WINDOW_UPDATE && !connector.onFrameFailure(*frame, rv)) 5741 { 5742 item.free(); 5743 Mem.free(item); 5744 return ErrorCode.CALLBACK_FAILURE; 5745 } catch (Exception e) { 5746 item.free(); 5747 Mem.free(item); 5748 return ErrorCode.CALLBACK_FAILURE; 5749 } 5750 } 5751 5752 /* We have to close stream opened by failed request HEADERS or PUSH_PROMISE. */ 5753 switch (item.frame.hd.type) { 5754 case FrameType.HEADERS: 5755 if (item.frame.headers.cat == HeadersCategory.REQUEST) { 5756 opened_stream_id = item.frame.hd.stream_id; 5757 if (item.aux_data.headers.canceled) { 5758 error_code = item.aux_data.headers.error_code; 5759 } 5760 } 5761 break; 5762 case FrameType.PUSH_PROMISE: 5763 opened_stream_id = item.frame.push_promise.promised_stream_id; 5764 break; 5765 5766 default: break; 5767 } 5768 if (opened_stream_id) { 5769 /* careful not to override rv */ 5770 ErrorCode rv2; 5771 rv2 = closeStream(opened_stream_id, error_code); 5772 5773 if (isFatal(rv2)) { 5774 return rv2; 5775 } 5776 } 5777 5778 item.free(); 5779 Mem.free(item); 5780 aob.reset(); 5781 5782 if (rv == ErrorCode.HEADER_COMP) { 5783 /* If header compression error occurred, should terminiate connection. */ 5784 rv = terminateSession(FrameError.INTERNAL_ERROR); 5785 } 5786 if (isFatal(rv)) { 5787 return rv; 5788 } 5789 break; 5790 } 5791 5792 aob.item = item; 5793 5794 framebufs.rewind(); 5795 5796 if (item.frame.hd.type != FrameType.DATA) { 5797 Frame* frame = &item.frame; 5798 5799 LOGF("send: next frame: payloadlen=%d, type=%u, flags=0x%02x, stream_id=%d", 5800 frame.hd.length, frame.hd.type, frame.hd.flags, 5801 frame.hd.stream_id); 5802 5803 bool ok = callOnFrameReady(*frame); 5804 if (!ok) { 5805 return ErrorCode.CALLBACK_FAILURE; 5806 } 5807 } else { 5808 LOGF("send: next frame: DATA"); 5809 5810 if (item.aux_data.data.no_copy) 5811 { 5812 aob.state = OutboundState.SEND_NO_COPY; 5813 break; 5814 } 5815 } 5816 5817 LOGF("send: start transmitting frame type=%u, length=%d", 5818 framebufs.cur.buf.pos[3], 5819 framebufs.cur.buf.last - framebufs.cur.buf.pos); 5820 5821 aob.state = OutboundState.SEND_DATA; 5822 5823 break; 5824 } 5825 case OutboundState.SEND_DATA: { 5826 size_t datalen; 5827 Buffer* buf = &framebufs.cur.buf; 5828 5829 if (buf.pos is buf.last) { 5830 LOGF("send: end transmission of a frame"); 5831 5832 /* Frame has completely sent */ 5833 if (fast_cb) { 5834 rv = resetActiveOutboundItem(); 5835 } else { 5836 rv = afterFrameSent(); 5837 if (rv < 0) { 5838 /* FATAL */ 5839 assert(isFatal(rv)); 5840 return rv; 5841 } 5842 rv = resetActiveOutboundItem(); 5843 } 5844 if (rv < 0) { 5845 /* FATAL */ 5846 assert(isFatal(rv)); 5847 return rv; 5848 } 5849 /* We have already adjusted the next state */ 5850 break; 5851 } 5852 5853 datalen = buf.length; 5854 data_arr = buf.pos[0 .. datalen]; 5855 5856 /* We increment the offset here. If write() does not send everything, we will adjust it. */ 5857 buf.pos += datalen; 5858 5859 return ErrorCode.OK; 5860 } 5861 case OutboundState.SEND_NO_COPY: 5862 { 5863 LOGF("send: no copy DATA\n"); 5864 5865 Frame* frame = &aob.item.frame; 5866 Stream stream = getStream(frame.hd.stream_id); 5867 5868 5869 if (!stream) { 5870 LOGF("send: no copy DATA cancelled because stream was closed\n"); 5871 aob.reset(); 5872 break; 5873 } 5874 5875 rv = callWriteData(aob.item, framebufs); 5876 if (isFatal(rv)) { 5877 return rv; 5878 } 5879 5880 if (rv == ErrorCode.TEMPORAL_CALLBACK_FAILURE) { 5881 stream.detachItem(this); 5882 5883 addRstStream(frame.hd.stream_id, FrameError.INTERNAL_ERROR); 5884 5885 aob.reset(); 5886 5887 break; 5888 } 5889 5890 if (rv == ErrorCode.WOULDBLOCK) { 5891 return ErrorCode.OK; 5892 } 5893 5894 assert(rv == ErrorCode.OK); 5895 5896 rv = afterFrameSent(); 5897 if (rv < 0) { 5898 assert(isFatal(rv)); 5899 return rv; 5900 } 5901 rv = resetActiveOutboundItem(); 5902 if (rv < 0) { 5903 assert(isFatal(rv)); 5904 return rv; 5905 } 5906 5907 /* We have already adjusted the next state */ 5908 5909 break; 5910 } 5911 } 5912 } 5913 } 5914 5915 /* 5916 * Inflates header block in the memory pointed by |input| with |input.length| 5917 * bytes. If this function returns ErrorCode.PAUSE, the caller must 5918 * call this function again, until it returns 0 or one of negative 5919 * error code. If |call_header_cb| is zero, the on_header_callback 5920 * are not invoked and the function never return ErrorCode.PAUSE. If 5921 * the given |in| is the last chunk of header block, the |final| must 5922 * be nonzero. If header block is successfully processed (which is 5923 * indicated by the return value 0, ErrorCode.PAUSE or 5924 * ErrorCode.TEMPORAL_CALLBACK_FAILURE), the number of processed 5925 * input bytes is assigned to the |*readlen_ptr|. 5926 * 5927 * This function return 0 if it succeeds, or one of the negative error 5928 * codes: 5929 * 5930 * ErrorCode.CALLBACK_FAILURE 5931 * The callback function failed. 5932 * ErrorCode.TEMPORAL_CALLBACK_FAILURE 5933 * The callback returns this error code, indicating that this 5934 * stream should be RST_STREAMed.. 5935 * ErrorCode.PAUSE 5936 * The callback function returned ErrorCode.PAUSE 5937 * ErrorCode.HEADER_COMP 5938 * Header decompression failed 5939 */ 5940 ErrorCode inflateHeaderBlock(Frame frame, ref size_t readlen_ref, ubyte[] input, bool is_final, bool call_header_cb) 5941 { 5942 int proclen; 5943 ErrorCode rv; 5944 InflateFlag inflate_flag; 5945 HeaderField hf; 5946 Stream stream; 5947 Stream subject_stream; 5948 bool trailer; 5949 5950 readlen_ref = 0; 5951 stream = getStream(frame.hd.stream_id); 5952 5953 if (frame.hd.type == FrameType.PUSH_PROMISE) { 5954 subject_stream = getStream(frame.push_promise.promised_stream_id); 5955 } else { 5956 subject_stream = stream; 5957 trailer = isTrailerHeaders(stream, frame); 5958 } 5959 5960 LOGF("recv: decoding header block %d bytes", input.length); 5961 size_t inlen = input.length; 5962 ubyte* inptr = input.ptr; 5963 for (;;) { 5964 inflate_flag = InflateFlag.NONE; 5965 proclen = hd_inflater.inflate(hf, inflate_flag, inptr[0 .. inlen], is_final); 5966 5967 if (isFatal(cast(int)proclen)) { 5968 return cast(ErrorCode)proclen; 5969 } 5970 5971 if (proclen < 0) { 5972 if (iframe.state == InboundState.READ_HEADER_BLOCK) 5973 { 5974 if (stream && stream.state != StreamState.CLOSING) 5975 { 5976 /* Adding RST_STREAM here is very important. It prevents 5977 from invoking subsequent callbacks for the same stream ID. */ 5978 addRstStream(frame.hd.stream_id, FrameError.COMPRESSION_ERROR); 5979 5980 } 5981 } 5982 rv = terminateSession(FrameError.COMPRESSION_ERROR); 5983 if (isFatal(rv)) { 5984 return rv; 5985 } 5986 5987 return ErrorCode.HEADER_COMP; 5988 } 5989 5990 inptr += proclen; 5991 inlen -= proclen; 5992 readlen_ref += proclen; 5993 5994 LOGF("recv: proclen=%d", proclen); 5995 5996 if (call_header_cb && (inflate_flag & InflateFlag.EMIT)) { 5997 rv = ErrorCode.OK; 5998 if (subject_stream && isHTTPMessagingEnabled()) { 5999 rv = validateHeaderField(subject_stream, frame, hf, trailer); 6000 if (rv == ErrorCode.HTTP_HEADER) { 6001 LOGF("recv: HTTP error: type=%d, id=%d, header %.*s: %.*s", 6002 frame.hd.type, subject_stream.id, cast(int)hf.name.length, 6003 hf.name, cast(int)hf.value.length, hf.value); 6004 6005 handleInvalidStream2(subject_stream.id, frame, FrameError.PROTOCOL_ERROR); 6006 return ErrorCode.TEMPORAL_CALLBACK_FAILURE; 6007 } 6008 else if (rv == ErrorCode.IGN_HTTP_HEADER) { 6009 /* header is ignored */ 6010 LOGF("recv: HTTP ignored: type=%d, id=%d, header %s: %s", frame.hd.type, subject_stream.id, hf.name, hf.value); 6011 } 6012 6013 } 6014 6015 if (rv == ErrorCode.OK) { 6016 bool pause; 6017 bool close; 6018 bool ok = callOnHeaderField(frame, hf, pause, close); 6019 if (!ok) 6020 return ErrorCode.CALLBACK_FAILURE; 6021 if (close) 6022 return ErrorCode.TEMPORAL_CALLBACK_FAILURE; 6023 if (pause) 6024 return ErrorCode.PAUSE; 6025 } 6026 } 6027 6028 if (inflate_flag & InflateFlag.FINAL) { 6029 hd_inflater.endHeaders(); 6030 break; 6031 } 6032 if ((inflate_flag & InflateFlag.EMIT) == 0 && inlen == 0) { 6033 break; 6034 } 6035 } 6036 return ErrorCode.OK; 6037 } 6038 6039 package: /* Used only for tests */ 6040 /* 6041 * Returns top of outbound frame queue. This function returns null if 6042 * queue is empty. 6043 */ 6044 @property OutboundItem ob_pq_top() { 6045 return ob_pq.top; 6046 } 6047 6048 package: 6049 HashMap!(int, Stream) streams; 6050 6051 StreamRoots roots; 6052 6053 /// Priority Queue for outbound frames other than stream-starting HEADERS and DATA 6054 PriorityQueue ob_pq; 6055 6056 /// Priority Queue for outbound stream-starting HEADERS frame 6057 PriorityQueue ob_ss_pq; 6058 6059 /// Priority Queue for DATA frame 6060 PriorityQueue ob_da_pq; 6061 6062 ActiveOutboundItem aob; 6063 InboundFrame iframe; 6064 Deflater hd_deflater; 6065 Inflater hd_inflater; 6066 Connector connector; 6067 6068 /// Sequence number of outbound frame to maintain the order of enqueue if priority is equal. 6069 long next_seq; 6070 6071 /** Reset count of OutboundItem's weight. We decrements 6072 weight each time DATA is sent to simulate resource sharing. We 6073 use priority queue and larger weight has the precedence. If 6074 weight is reached to lowest weight, it resets to its initial 6075 weight. If this happens, other items which have the lower weight 6076 currently but same initial weight cannot send DATA until item 6077 having large weight is decreased. To avoid this, we use this 6078 cycle variable. Initally, this is set to 1. If weight gets 6079 lowest weight, and if item's cycle == last_cycle, we increments 6080 last_cycle and assigns it to item's cycle. Otherwise, just 6081 assign last_cycle. In priority queue comparator, we first 6082 compare items' cycle value. Lower cycle value has the 6083 precedence. */ 6084 ulong last_cycle = 1; 6085 6086 /// Points to the latest closed stream. null if there is no closed stream. 6087 /// Notes: Only used when session is initialized as server. 6088 Stream closed_stream_head; 6089 6090 /// Points to the oldest closed stream. null if there is no closed stream. 6091 /// Notes: Only used when session is initialized as server. 6092 Stream closed_stream_tail; 6093 6094 /// Points to the latest idle stream. null if there is no idle stream. 6095 /// Notes: Only used when session is initialized as server . 6096 Stream idle_stream_head; 6097 6098 /// Points to the oldest idle stream. null if there is no idle stream. 6099 /// Notes: Only used when session is initialized as server. 6100 Stream idle_stream_tail; 6101 6102 /// In-flight SETTINGS values. null for no in-flight SETTINGS. 6103 Setting[] inflight_iva; 6104 6105 /// The number of outgoing streams. This will be capped by remote_settings.max_concurrent_streams. 6106 size_t num_outgoing_streams; 6107 6108 /// The number of incoming streams. This will be capped by local_settings.max_concurrent_streams. 6109 size_t num_incoming_streams; 6110 6111 /// The number of closed streams still kept in |streams| hash. The closed streams can be accessed 6112 /// through single linked list |closed_stream_head|. 6113 /// Notes: The current implementation only keeps incoming streams if session is initialized as server. 6114 size_t num_closed_streams; 6115 6116 /// The number of idle streams kept in |streams| hash. The idle streams can be accessed through doubly linked list 6117 /// |idle_stream_head|. 6118 /// Notes: The current implementation only keeps idle streams if session is initialized as server. 6119 size_t num_idle_streams; 6120 6121 /// Next Stream ID. Made unsigned int to detect >= (1 << 31). 6122 uint next_stream_id; 6123 6124 /// The largest stream ID received so far 6125 int last_recv_stream_id; 6126 6127 /// The largest stream ID which has been processed in some way. 6128 /// Notes: This value will be used as last-stream-id when sending GOAWAY frame. 6129 int last_proc_stream_id; 6130 6131 /// Counter of unique ID of PING. Wraps when it exceeds max_UNIQUE_ID */ 6132 uint next_unique_id; 6133 6134 /// This is the last-stream-ID we have sent in GOAWAY 6135 int local_last_stream_id = (1u << 31) - 1; 6136 6137 /// This is the value in GOAWAY frame received from remote endpoint. 6138 int remote_last_stream_id = (1u << 31) - 1; 6139 6140 /// Current sender window size. This value is computed against the current initial window size of remote endpoint. 6141 int remote_window_size = INITIAL_CONNECTION_WINDOW_SIZE; 6142 6143 /// Keep track of the number of bytes received without WINDOW_UPDATE. This could be negative after 6144 /// submitting negative value to WINDOW_UPDATE. 6145 int recv_window_size; 6146 6147 /// The number of bytes consumed by the application and now is subject to WINDOW_UPDATE. 6148 /// Notes: This is only used when auto WINDOW_UPDATE is turned off. 6149 int consumed_size; 6150 6151 /// The amount of recv_window_size cut using submitting negative value to WINDOW_UPDATE 6152 int recv_reduction; 6153 6154 /// window size for local flow control. It is initially set to INITIAL_CONNECTION_WINDOW_SIZE and could be 6155 /// increased/decreased by submitting WINDOW_UPDATE. See submitWindowUpdate(). 6156 int local_window_size = INITIAL_CONNECTION_WINDOW_SIZE; 6157 6158 /// Settings value received from the remote endpoint. We just use ID as index. The index = 0 is unused. 6159 SettingsStorage remote_settings; 6160 6161 /// Settings value of the local endpoint. 6162 SettingsStorage local_settings; 6163 6164 /// Option flags. This is bitwise-OR of 0 or more of OptionsMask. 6165 OptionsMask opt_flags; 6166 6167 /// Unacked local Setting.MAX_CONCURRENT_STREAMS value. We use this to refuse the incoming stream if it exceeds this value. 6168 uint pending_local_max_concurrent_stream = INITIAL_MAX_CONCURRENT_STREAMS; 6169 6170 /// Unacked local ENABLE_PUSH value. We use this to refuse PUSH_PROMISE before SETTINGS ACK is received. 6171 bool pending_enable_push = true; 6172 6173 /// true if the session is server side. 6174 bool is_server; 6175 6176 /// Flags indicating GOAWAY is sent and/or recieved. 6177 GoAwayFlags goaway_flags = GoAwayFlags.NONE; 6178 6179 6180 } 6181 /** 6182 * @function 6183 * 6184 * Serializes the SETTINGS values |iv| in the |buf|. The size of the 6185 * |buf| is specified by |buflen|. The number of entries in the |iv| 6186 * array is given by |niv|. The required space in |buf| for the |niv| 6187 * entries is `8*niv` bytes and if the given buffer is too small, an 6188 * error is returned. This function is used mainly for creating a 6189 * SETTINGS payload to be sent with the `HTTP2-Settings` header 6190 * field in an HTTP Upgrade request. The data written in |buf| is NOT 6191 * base64url encoded and the application is responsible for encoding. 6192 * 6193 * This function returns the number of bytes written in |buf|, or one 6194 * of the following negative error codes: 6195 * 6196 * $(D ErrorCode.INVALID_ARGUMENT) 6197 * The |iv| contains duplicate settings ID or invalid value. 6198 * 6199 * $(D ErrorCode.INSUFF_BUFSIZE) 6200 * The provided |buflen| size is too small to hold the output. 6201 */ 6202 int packSettingsPayload(ubyte[] buf, in Setting[] iva) 6203 { 6204 if (!iva.check()) { 6205 return ErrorCode.INVALID_ARGUMENT; 6206 } 6207 6208 if (buf.length < (iva.length * FRAME_SETTINGS_ENTRY_LENGTH)) { 6209 return ErrorCode.INSUFF_BUFSIZE; 6210 } 6211 6212 return Settings.pack(buf, iva); 6213 } 6214 6215 /** 6216 * Submits HEADERS frame and optionally one or more DATA frames. 6217 * 6218 * The |pri_spec| is priority specification of this request. 6219 * To specify the priority, use `PrioritySpec()`. 6220 * 6221 * The `pri_spec.weight` must be in [$(D MIN_WEIGHT), 6222 * $(D MAX_WEIGHT)], inclusive. If `pri_spec.weight` is 6223 * strictly less than $(D MIN_WEIGHT), it becomes 6224 * $(D MIN_WEIGHT). If it is strictly greater than 6225 * $(D MAX_WEIGHT), it becomes $(D MAX_WEIGHT). 6226 * 6227 * The |hfa| is an array of header fields $(D HeaderField) with 6228 * |hfa.length| elements. The application is responsible to include 6229 * required pseudo-header fields (header field whose name starts with 6230 * ":") in |hfa| and must place pseudo-headers before regular header 6231 * fields. 6232 * 6233 * This function creates copies of all header fields in |hfa|. It 6234 * also lower-cases all names in |hfa|. The order of elements in 6235 * |hfa| is preserved. 6236 * 6237 * HTTP/2 specification has requirement about header fields in the 6238 * request HEADERS. See the specification for more details. 6239 * 6240 * If |data_prd| is not `null`, it provides data which will be sent 6241 * in subsequent DATA frames. In this case, a method that allows 6242 * request message bodies 6243 * (http://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html#sec9) must 6244 * be specified with `:method` key in |hfa| (e.g. `POST`). This 6245 * function does not take ownership of the |data_prd|. The function 6246 * copies the members of the |data_prd|. If |data_prd| is `null`, 6247 * HEADERS have END_STREAM set. The |stream_user_data| is data 6248 * associated to the stream opened by this request and can be an 6249 * arbitrary pointer, which can be retrieved later by 6250 * `getStreamUserData()`. 6251 * 6252 * This function returns assigned stream ID if it succeeds, or one of 6253 * the following negative error codes: 6254 * 6255 * $(D ErrorCode.STREAM_ID_NOT_AVAILABLE) 6256 * No stream ID is available because maximum stream ID was 6257 * reached. 6258 * 6259 * .. warning:: 6260 * 6261 * This function returns assigned stream ID if it succeeds. But 6262 * that stream is not opened yet. The application must not submit 6263 * a frame to that stream ID before $(D Connector.onFrameReady) is called for this 6264 * frame. 6265 * 6266 */ 6267 int submitRequest(Session session, in PrioritySpec pri_spec, in HeaderField[] hfa, in DataProvider data_prd, void* stream_user_data = null) 6268 { 6269 FrameFlags flags = setRequestFlags(pri_spec, data_prd); 6270 6271 return submitHeadersSharedHfa(session, flags, -1, pri_spec, hfa, data_prd, stream_user_data, false); 6272 } 6273 6274 /** 6275 * Submits response HEADERS frame and optionally one or more DATA 6276 * frames against the stream |stream_id|. 6277 * 6278 * The |hfa| is an array of $(D HeaderField) with 6279 * |hfa.length| elements. The application is responsible to include 6280 * required pseudo-header fields (header field whose name starts with 6281 * ":") in |hfa| and must place pseudo-headers before regular header 6282 * fields. 6283 * 6284 * This function creates copies of all header fields in |hfa|. It 6285 * also lower-cases all names in |hfa|. The order of elements in 6286 * |hfa| is preserved. 6287 * 6288 * HTTP/2 specification has requirement about header fields in the 6289 * response HEADERS. See the specification for more details. 6290 * 6291 * If |data_prd| is not `null`, it provides data which will be sent 6292 * in subsequent DATA frames. If |data_prd| is `null`, HEADERS will have 6293 * END_STREAM flag set. 6294 * 6295 * This method can be used as normal HTTP response and push response. 6296 * When pushing a resource using this function, the $(D Session) must be 6297 * configured using `new Session()` or its variants and 6298 * the target stream denoted by the |stream_id| must be reserved using 6299 * `submitPushPromise()`. 6300 * 6301 * To send non-final response headers (e.g., HTTP status 101), don't 6302 * use this function because this function half-closes the outbound 6303 * stream. Instead, use `submitHeaders()` for this purpose. 6304 * 6305 * This function returns 0 if it succeeds, or one of the following 6306 * negative error codes: 6307 * 6308 * $(D ErrorCode.INVALID_ARGUMENT) 6309 * The |stream_id| is 0. 6310 * 6311 * .. warning:: 6312 * 6313 * Calling this function twice for the same stream ID may lead to 6314 * program crash. It is generally considered to a programming error 6315 * to commit response twice. 6316 */ 6317 ErrorCode submitResponse(Session session, int stream_id, in HeaderField[] hfa, in DataProvider data_prd) 6318 { 6319 FrameFlags flags = setResponseFlags(data_prd); 6320 return cast(ErrorCode)submitHeadersSharedHfa(session, flags, stream_id, PrioritySpec.init, hfa, data_prd, null, true); 6321 } 6322 6323 /** 6324 * Submits HEADERS frame. The |flags| is bitwise OR of the 6325 * following values: 6326 * 6327 * * $(D FrameFlags.END_STREAM) 6328 * 6329 * If |flags| includes $(D FrameFlags.END_STREAM), this frame has 6330 * END_STREAM flag set. 6331 * 6332 * The library handles the CONTINUATION frame internally and it 6333 * correctly sets END_HEADERS to the last sequence of the PUSH_PROMISE 6334 * or CONTINUATION frame. 6335 * 6336 * If the |stream_id| is -1, this frame is assumed as request (i.e., 6337 * request HEADERS frame which opens new stream). In this case, the 6338 * assigned stream ID will be returned. Otherwise, specify stream ID 6339 * in |stream_id|. 6340 * 6341 * The |pri_spec| is priority specification of this request. init 6342 * means the default priority. To specify the priority, 6343 * use $(D PrioritySpec) constructor. 6344 * 6345 * The `pri_spec.weight` must be in [$(D MIN_WEIGHT), 6346 * $(D MAX_WEIGHT)], inclusive. If `pri_spec.weight` is 6347 * strictly less than $(D MIN_WEIGHT), it becomes 6348 * $(D MIN_WEIGHT). If it is strictly greater than 6349 * $(D MAX_WEIGHT), it becomes $(D MAX_WEIGHT). 6350 * 6351 * The |hfa| is an array of header fields $(D HeaderField) with 6352 * |hfa.length| elements. The application is responsible to include 6353 * required pseudo-header fields (header field whose name starts with 6354 * ":") in |hfa| and must place pseudo-headers before regular header 6355 * fields. 6356 * 6357 * This function creates copies of all header fields in |hfa|. It 6358 * also lower-cases all names in |hfa|. The order of elements in 6359 * |hfa| is preserved. 6360 * 6361 * The |stream_user_data| is a pointer to an arbitrary data which is 6362 * associated to the stream this frame will open. Therefore it is 6363 * only used if this frame opens streams, in other words, it changes 6364 * stream state from idle or reserved to open. 6365 * 6366 * This function is low-level in a sense that the application code can 6367 * specify flags directly. For usual HTTP request, 6368 * `submitRequest()` is useful. 6369 * 6370 * This function returns newly assigned stream ID if it succeeds and 6371 * |stream_id| is -1. Otherwise, this function returns 0 if it 6372 * succeeds, or one of the following negative error codes: 6373 * 6374 * $(D ErrorCode.STREAM_ID_NOT_AVAILABLE) 6375 * No stream ID is available because maximum stream ID was 6376 * reached. 6377 * $(D ErrorCode.INVALID_ARGUMENT) 6378 * The |stream_id| is 0. 6379 * 6380 * .. warning:: 6381 * 6382 * This function returns assigned stream ID if it succeeds and 6383 * |stream_id| is -1. But that stream is not opened yet. The 6384 * application must not submit frame to that stream ID before 6385 * $(D Connector.onFrameHeader) is called for this 6386 * frame. 6387 * 6388 */ 6389 int submitHeaders(Session session, FrameFlags flags, int stream_id = -1, in PrioritySpec pri_spec = PrioritySpec.init, in HeaderField[] hfa = null, void *stream_user_data = null) 6390 { 6391 flags &= FrameFlags.END_STREAM; 6392 6393 if (pri_spec != PrioritySpec.init) 6394 flags |= FrameFlags.PRIORITY; 6395 6396 return submitHeadersSharedHfa(session, flags, stream_id, pri_spec, hfa, DataProvider.init, stream_user_data, false); 6397 } 6398 6399 /** 6400 * Submits one or more DATA frames to the stream |stream_id|. The 6401 * data to be sent are provided by |data_prd|. If |flags| contains 6402 * $(D FrameFlags.END_STREAM), the last DATA frame has END_STREAM 6403 * flag set. 6404 * 6405 * This function does not take ownership of the |data_prd|. The 6406 * function copies the members of the |data_prd|. 6407 * 6408 * This function returns 0 if it succeeds, or one of the following 6409 * negative error codes: 6410 * 6411 * $(D ErrorCode.DATA_EXIST) 6412 * DATA has been already submitted and not fully processed yet. 6413 * $(D ErrorCode.INVALID_ARGUMENT) 6414 * The |stream_id| is 0. 6415 * $(D ErrorCode.STREAM_CLOSED) 6416 * The stream was alreay closed; or the |stream_id| is invalid. 6417 * 6418 * .. note:: 6419 * 6420 * Currently, only one data is allowed for a stream at a time. 6421 * Submitting data more than once before first data is finished 6422 * results in $(D ErrorCode.DATA_EXIST) error code. The 6423 * earliest callback which tells that previous data is done is 6424 * $(D Connector.onFrameSent). In side that callback, 6425 * new data can be submitted using `submitData()`. Of 6426 * course, all data except for last one must not have 6427 * $(D FrameFlags.END_STREAM) flag set in |flags|. 6428 */ 6429 ErrorCode submitData(Session session, FrameFlags flags, int stream_id, in DataProvider data_prd) 6430 { 6431 OutboundItem item; 6432 Frame* frame; 6433 DataAuxData* aux_data; 6434 DataFlags nflags = cast(DataFlags)(flags & FrameFlags.END_STREAM); 6435 6436 if (stream_id == 0) 6437 return ErrorCode.INVALID_ARGUMENT; 6438 6439 item = Mem.alloc!OutboundItem(session); 6440 scope(failure) Mem.free(item); 6441 6442 frame = &item.frame; 6443 aux_data = &item.aux_data.data; 6444 aux_data.data_prd = data_prd; 6445 aux_data.eof = false; 6446 aux_data.flags = nflags; 6447 6448 /* flags are sent on transmission */ 6449 frame.data = Data(FrameFlags.NONE, stream_id); 6450 scope(failure) frame.data.free(); 6451 session.addItem(item); 6452 return ErrorCode.OK; 6453 } 6454 6455 /** 6456 * Submits PRIORITY frame to change the priority of stream |stream_id| 6457 * to the priority specification |pri_spec|. 6458 * 6459 * 6460 * The |pri_spec| is priority specification of this request. `null` 6461 * is not allowed for this function. To specify the priority, use 6462 * `PrioritySpec.init`. This function will copy its data 6463 * members. 6464 * 6465 * The `pri_spec.weight` must be in [$(D MIN_WEIGHT), 6466 * $(D MAX_WEIGHT)], inclusive. If `pri_spec.weight` is 6467 * strictly less than $(D MIN_WEIGHT), it becomes 6468 * $(D MIN_WEIGHT). If it is strictly greater than 6469 * $(D MAX_WEIGHT), it becomes $(D MAX_WEIGHT). 6470 * 6471 * This function returns 0 if it succeeds, or one of the following 6472 * negative error codes: 6473 * 6474 * $(D ErrorCode.INVALID_ARGUMENT) 6475 * The |stream_id| is 0; or the |pri_spec| is null; or trying to 6476 * depend on itself. 6477 */ 6478 ErrorCode submitPriority(Session session, int stream_id, in PrioritySpec pri_spec) 6479 { 6480 OutboundItem item; 6481 Frame* frame; 6482 PrioritySpec copy_pri_spec; 6483 6484 if (stream_id == 0 || pri_spec == PrioritySpec.init) 6485 return ErrorCode.INVALID_ARGUMENT; 6486 6487 if (stream_id == pri_spec.stream_id) 6488 return ErrorCode.INVALID_ARGUMENT; 6489 6490 copy_pri_spec = pri_spec; 6491 6492 copy_pri_spec.adjustWeight(); 6493 6494 item = Mem.alloc!OutboundItem(session); 6495 scope(failure) Mem.free(item); 6496 frame = &item.frame; 6497 6498 frame.priority = Priority(stream_id, copy_pri_spec); 6499 scope(failure) frame.priority.free(); 6500 6501 session.addItem(item); 6502 return ErrorCode.OK; 6503 } 6504 6505 6506 /** 6507 * @function 6508 * 6509 * Submits RST_STREAM frame to cancel/reject the stream |stream_id| 6510 * with the error code |error_code|. 6511 * 6512 * The pre-defined error code is one of $(D FrameError). 6513 * 6514 * This function returns 0 if it succeeds, or one of the following 6515 * negative error codes: 6516 * 6517 * $(D ErrorCode.NOMEM) 6518 * Out of memory. 6519 * $(D ErrorCode.INVALID_ARGUMENT) 6520 * The |stream_id| is 0. 6521 */ 6522 ErrorCode submitRstStream(Session session, int stream_id, FrameError error_code) 6523 { 6524 if (stream_id == 0) 6525 return ErrorCode.INVALID_ARGUMENT; 6526 6527 session.addRstStream(stream_id, error_code); 6528 return ErrorCode.OK; 6529 } 6530 6531 /** 6532 * @function 6533 * 6534 * Stores local settings and submits SETTINGS frame. The |iva| is the 6535 * pointer to the array of $(D Setting). The |iv.length| 6536 * indicates the number of settings. 6537 * 6538 * This function does not take ownership of the |iva|. This function 6539 * copies all the elements in the |iva|. 6540 * 6541 * While updating individual stream's local window size, if the window 6542 * size becomes strictly larger than max_WINDOW_SIZE, 6543 * RST_STREAM is issued against such a stream. 6544 * 6545 * SETTINGS with $(D FrameFlags.ACK) is automatically submitted 6546 * by the library and application could not send it at its will. 6547 * 6548 * This function returns 0 if it succeeds, or one of the following 6549 * negative error codes: 6550 * 6551 * $(D ErrorCode.INVALID_ARGUMENT) 6552 * The |iv| contains invalid value (e.g., initial window size 6553 * strictly greater than (1 << 31) - 1. 6554 * $(D ErrorCode.TOO_MANY_INFLIGHT_SETTINGS) 6555 * There is already another in-flight SETTINGS. Note that the 6556 * current implementation only allows 1 in-flight SETTINGS frame 6557 * without ACK flag set. 6558 * $(D ErrorCode.NOMEM) 6559 * Out of memory. 6560 */ 6561 ErrorCode submitSettings(Session session, in Setting[] iva) 6562 { 6563 return session.addSettings(FrameFlags.NONE, iva); 6564 } 6565 6566 /** 6567 * Submits PUSH_PROMISE frame. 6568 * 6569 * The |stream_id| must be client initiated stream ID. 6570 * 6571 * The |hfa| is an array of $(D HeaderField) with 6572 * |hfa.length| elements. The application is responsible to include 6573 * required pseudo-header fields (header field whose name starts with 6574 * ":") in |hfa| and must place pseudo-headers before regular header 6575 * fields. 6576 * 6577 * This function creates copies of all header fieldss in |hfa|. It 6578 * also lower-cases all names in |hfa|. The order of elements in 6579 * |hfa| is preserved. 6580 * 6581 * The |promised_stream_user_data| is a pointer to an arbitrary data 6582 * which is associated to the promised stream this frame will open and 6583 * make it in reserved state. It is available using $(D Session.getStreamUserData). 6584 * The application can access it in $(D Connector.onFrameHeader) and 6585 * $(D Connector.onFrameSent) of this frame. 6586 * 6587 * The client side is not allowed to use this function. 6588 * 6589 * To submit response headers and data, use 6590 * `submitResponse()`. 6591 * 6592 * This function returns assigned promised stream ID if it succeeds, 6593 * or one of the following negative error codes: 6594 * 6595 * $(D ErrorCode.PROTO) 6596 * This function was invoked when $(D Session) is initialized as 6597 * client. 6598 * $(D ErrorCode.STREAM_ID_NOT_AVAILABLE) 6599 * No stream ID is available because maximum stream ID was 6600 * reached. 6601 * $(D ErrorCode.INVALID_ARGUMENT) 6602 * The |stream_id| is 0; The |stream_id| does not designate stream 6603 * that peer initiated. 6604 * 6605 * .. warning:: 6606 * 6607 * This function returns assigned promised stream ID if it succeeds. 6608 * But that stream is not opened yet. The application must not 6609 * submit frame to that stream ID before 6610 * $(D Connector.onFrameHeader) is called for this 6611 * frame. 6612 * 6613 */ 6614 int submitPushPromise(Session session, int stream_id, in HeaderField[] hfa, void* promised_stream_user_data) 6615 { 6616 OutboundItem item; 6617 Frame* frame; 6618 HeaderField[] hfa_copy; 6619 FrameFlags flags_copy; 6620 int promised_stream_id; 6621 6622 if (stream_id == 0 || session.isMyStreamId(stream_id)) { 6623 return ErrorCode.INVALID_ARGUMENT; 6624 } 6625 6626 if (!session.is_server) 6627 return ErrorCode.PROTO; 6628 6629 /* All 32bit signed stream IDs are spent. */ 6630 if (session.next_stream_id > int.max) { 6631 return ErrorCode.STREAM_ID_NOT_AVAILABLE; 6632 } 6633 6634 item = Mem.alloc!OutboundItem(session); 6635 scope(failure) 6636 Mem.free(item); 6637 6638 item.aux_data.headers.stream_user_data = promised_stream_user_data; 6639 6640 frame = &item.frame; 6641 bool is_owner; 6642 hfa_copy = hfa.copy(); 6643 is_owner = true; 6644 scope(failure) if (is_owner) Mem.free(hfa_copy); 6645 flags_copy = FrameFlags.END_HEADERS; 6646 6647 promised_stream_id = session.next_stream_id; 6648 session.next_stream_id += 2; 6649 6650 is_owner = false; 6651 frame.push_promise = PushPromise(flags_copy, stream_id, promised_stream_id, hfa_copy); 6652 scope(failure) frame.push_promise.free(); 6653 6654 session.addItem(item); 6655 6656 return promised_stream_id; 6657 } 6658 6659 /** 6660 * Submits PING frame. You don't have to send PING back when you 6661 * received PING frame. The library automatically submits PING frame 6662 * in this case. 6663 * 6664 * 6665 * If the |opaque_data| is non `null`, then it should point to the 8 6666 * bytes array of memory to specify opaque data to send with PING 6667 * frame. If the |opaque_data| is `null`, zero-cleared 8 bytes will 6668 * be sent as opaque data. 6669 */ 6670 void submitPing(Session session, in ubyte[] opaque_data) 6671 { 6672 return session.addPing(FrameFlags.NONE, opaque_data); 6673 } 6674 6675 /** 6676 * @function 6677 * 6678 * Submits GOAWAY frame with the last stream ID |last_stream_id| and 6679 * the error code |error_code|. 6680 * 6681 * The pre-defined error code is one of $(D FrameError). 6682 * 6683 * The |flags| is currently ignored and should be 6684 * $(D FrameFlags.NONE). 6685 * 6686 * The |last_stream_id| is peer's stream ID or 0. So if $(D Session) is 6687 * initialized as client, |last_stream_id| must be even or 0. If 6688 * $(D Session) is initialized as server, |last_stream_id| must be odd or 6689 * 0. 6690 * 6691 * The HTTP/2 specification says last_stream_id must not be increased 6692 * from the value previously sent. So the actual value sent as 6693 * last_stream_id is the minimum value between the given 6694 * |last_stream_id| and the last_stream_id previously sent to the 6695 * peer. 6696 * 6697 * If the |opaque_data| is not `null` and |opaque_data_len| is not 6698 * zero, those data will be sent as additional debug data. The 6699 * library makes a copy of the memory region pointed by |opaque_data| 6700 * with the length |opaque_data_len|, so the caller does not need to 6701 * keep this memory after the return of this function. If the 6702 * |opaque_data_len| is 0, the |opaque_data| could be `null`. 6703 * 6704 * After successful transmission of GOAWAY, following things happen. 6705 * All incoming streams having strictly more than |last_stream_id| are 6706 * closed. All incoming HEADERS which starts new stream are simply 6707 * ignored. After all active streams are handled, both 6708 * `wantRead()` and `wantWrite()` return 0 and the application can close session. 6709 * 6710 * This function returns 0 if it succeeds, or one of the following 6711 * negative error codes: 6712 * 6713 * $(D ErrorCode.INVALID_ARGUMENT) 6714 * The |opaque_data.length| is too large; the |last_stream_id| is invalid. 6715 */ 6716 ErrorCode submitGoAway(Session session, int last_stream_id, FrameError error_code, in string opaque_data) 6717 { 6718 if (session.goaway_flags & GoAwayFlags.TERM_ON_SEND) { 6719 return ErrorCode.OK; 6720 } 6721 return session.addGoAway(last_stream_id, error_code, opaque_data, GoAwayAuxFlags.NONE); 6722 } 6723 6724 /** 6725 * Submits WINDOW_UPDATE frame. 6726 * 6727 * The |flags| is currently ignored and should be 6728 * $(D FrameFlags.NONE). 6729 * 6730 * If the |window_size_increment| is positive, the WINDOW_UPDATE with 6731 * that value as window_size_increment is queued. If the 6732 * |window_size_increment| is larger than the received bytes from the 6733 * remote endpoint, the local window size is increased by that 6734 * difference. 6735 * 6736 * If the |window_size_increment| is negative, the local window size 6737 * is decreased by -|window_size_increment|. If automatic 6738 * WINDOW_UPDATE is enabled 6739 * $(D Options.setNoAutoWindowUpdate), and the library 6740 * decided that the WINDOW_UPDATE should be submitted, then 6741 * WINDOW_UPDATE is queued with the current received bytes count. 6742 * 6743 * If the |window_size_increment| is 0, the function does nothing and 6744 * returns 0. 6745 * 6746 * This function returns 0 if it succeeds, or one of the following 6747 * negative error codes: 6748 * 6749 * $(D ErrorCode.FLOW_CONTROL) 6750 * The local window size overflow or gets negative. 6751 */ 6752 ErrorCode submitWindowUpdate(Session session, int stream_id, int window_size_increment) 6753 { 6754 ErrorCode rv; 6755 Stream stream; 6756 if (window_size_increment == 0) { 6757 return ErrorCode.OK; 6758 } 6759 FrameFlags flags; 6760 if (stream_id == 0) { 6761 rv = adjustLocalWindowSize(session.local_window_size, session.recv_window_size, session.recv_reduction, window_size_increment); 6762 if (rv != ErrorCode.OK) { 6763 return rv; 6764 } 6765 } else { 6766 stream = session.getStream(stream_id); 6767 if (!stream) { 6768 return ErrorCode.OK; 6769 } 6770 6771 rv = adjustLocalWindowSize(stream.localWindowSize, stream.recvWindowSize, stream.recvReduction, window_size_increment); 6772 if (rv != ErrorCode.OK) { 6773 return rv; 6774 } 6775 } 6776 6777 if (window_size_increment > 0) { 6778 if (stream_id == 0) { 6779 session.consumed_size = max(0, session.consumed_size - window_size_increment); 6780 } else { 6781 stream.consumedSize = max(0, stream.consumedSize - window_size_increment); 6782 } 6783 6784 session.addWindowUpdate(flags, stream_id, window_size_increment); 6785 } 6786 return ErrorCode.OK; 6787 } 6788 6789 6790 /** 6791 * Signals to the client that the server started graceful shutdown 6792 * procedure. 6793 * 6794 * This function is only usable for server. If this function is 6795 * called with client side session, this function returns 6796 * $(D ErrorCode.INVALID_STATE). 6797 * 6798 * To gracefully shutdown HTTP/2 session, server should call this 6799 * function to send GOAWAY with last_stream_id (1u << 31) - 1. And 6800 * after some delay (e.g., 1 RTT), send another GOAWAY with the stream 6801 * ID that the server has some processing using 6802 * `submitGoAway()`. See also `getLastProcStreamID()`. 6803 * 6804 * Unlike `submitGoAway()`, this function just sends GOAWAY 6805 * and does nothing more. This is a mere indication to the client 6806 * that session shutdown is imminent. The application should call 6807 * `submitGoAway()` with appropriate last_stream_id after 6808 * this call. 6809 * 6810 * If one or more GOAWAY frame have been already sent by either 6811 * `submitGoAway()` or `terminateSession()`, this function has no effect. 6812 * 6813 * This function returns 0 if it succeeds, or one of the following 6814 * negative error codes: 6815 * 6816 * $(D ErrorCode.NOMEM) 6817 * Out of memory. 6818 * $(D ErrorCode.INVALID_STATE) 6819 * The $(D Session) is initialized as client. 6820 */ 6821 ErrorCode submitShutdownNotice(Session session) 6822 { 6823 if (!session.is_server) { 6824 return ErrorCode.INVALID_STATE; 6825 } 6826 if (session.goaway_flags) 6827 return ErrorCode.OK; 6828 6829 return session.addGoAway((1u << 31) - 1, FrameError.NO_ERROR, null, GoAwayAuxFlags.SHUTDOWN_NOTICE); 6830 } 6831 6832 private: 6833 6834 FrameFlags setResponseFlags(in DataProvider data_prd) 6835 { 6836 FrameFlags flags = FrameFlags.NONE; 6837 6838 if (!data_prd) 6839 flags |= FrameFlags.END_STREAM; 6840 6841 return flags; 6842 } 6843 6844 FrameFlags setRequestFlags(in PrioritySpec pri_spec, in DataProvider data_prd) 6845 { 6846 FrameFlags flags = FrameFlags.NONE; 6847 if (!data_prd) 6848 flags |= FrameFlags.END_STREAM; 6849 6850 if (pri_spec != PrioritySpec.init) 6851 flags |= FrameFlags.PRIORITY; 6852 6853 return flags; 6854 } 6855 6856 /* This function takes ownership of |hfa_copy|. Regardless of the 6857 return value, the caller must not free |hfa_copy| after this 6858 function returns. */ 6859 int submitHeadersShared(Session session, FrameFlags flags, int stream_id, 6860 const ref PrioritySpec pri_spec, HeaderField[] hfa_copy, 6861 in DataProvider data_prd, void *stream_user_data, bool attach_stream) 6862 { 6863 ErrorCode rv; 6864 FrameFlags flags_copy; 6865 OutboundItem item; 6866 Frame* frame; 6867 HeadersCategory hcat; 6868 bool owns_hfa = true; 6869 scope(failure) 6870 if (owns_hfa && hfa_copy) 6871 hfa_copy.free(); 6872 6873 if (stream_id == 0) { 6874 hfa_copy.free(); 6875 return ErrorCode.INVALID_ARGUMENT; 6876 } 6877 6878 item = Mem.alloc!OutboundItem(session); 6879 scope(failure) { 6880 if (item) Mem.free(item); 6881 } 6882 if (data_prd) { 6883 item.aux_data.headers.data_prd = data_prd; 6884 } 6885 6886 item.aux_data.headers.stream_user_data = stream_user_data; 6887 item.aux_data.headers.attach_stream = attach_stream; 6888 6889 flags_copy = cast(FrameFlags)((flags & (FrameFlags.END_STREAM | FrameFlags.PRIORITY)) | FrameFlags.END_HEADERS); 6890 6891 if (stream_id == -1) { 6892 if (session.next_stream_id > int.max) { 6893 if (item) Mem.free(item); 6894 if (hfa_copy) 6895 hfa_copy.free(); 6896 return ErrorCode.STREAM_ID_NOT_AVAILABLE; 6897 } 6898 6899 stream_id = session.next_stream_id; 6900 session.next_stream_id += 2; 6901 6902 hcat = HeadersCategory.REQUEST; 6903 } else { 6904 /* More specific categorization will be done later. */ 6905 hcat = HeadersCategory.HEADERS; 6906 } 6907 6908 frame = &item.frame; 6909 6910 owns_hfa = false; 6911 frame.headers = Headers(flags_copy, stream_id, hcat, pri_spec, hfa_copy); 6912 session.addItem(item); 6913 6914 if (rv != ErrorCode.OK) { 6915 if (item) { Mem.free(item); } 6916 if (hfa_copy) 6917 hfa_copy.free(); 6918 return rv; 6919 } 6920 6921 if (hcat == HeadersCategory.REQUEST) 6922 return stream_id; 6923 6924 return ErrorCode.OK; 6925 } 6926 6927 6928 6929 int submitHeadersSharedHfa(Session session, FrameFlags flags, int stream_id, in PrioritySpec pri_spec, in HeaderField[] hfa, 6930 in DataProvider data_prd, void *stream_user_data, bool attach_stream) 6931 { 6932 HeaderField[] hfa_copy = hfa.copy(); 6933 PrioritySpec copy_pri_spec = pri_spec; 6934 copy_pri_spec.adjustWeight(); 6935 6936 return submitHeadersShared(session, flags, stream_id, copy_pri_spec, hfa_copy, data_prd, stream_user_data, attach_stream); 6937 } 6938 6939 public: 6940 6941 /** 6942 * A helper function for dealing with NPN in client side or ALPN in 6943 * server side. The |input| contains peer's protocol list in preferable 6944 * order. The format of |input| is length-prefixed and not 6945 * null-terminated. For example, `HTTP-draft-04/2.0` and 6946 * `http/1.1` stored in |input| like this:: 6947 * 6948 * in[0] = 17 6949 * in[1..17] = "HTTP-draft-04/2.0" 6950 * in[18] = 8 6951 * in[19..26] = "http/1.1" 6952 * inlen = 27 6953 * 6954 * The selection algorithm is as follows: 6955 * 6956 * 1. If peer's list contains HTTP/2 protocol the library supports, 6957 * it is selected and returns 1. The following step is not taken. 6958 * 6959 * 2. If peer's list contains `http/1.1`, this function selects 6960 * `http/1.1` and returns 0. The following step is not taken. 6961 * 6962 * 3. This function selects nothing and returns -1 (So called 6963 * non-overlap case). In this case, |output| is left 6964 * untouched. 6965 * 6966 * Selecting `HTTP-draft-04/2.0` means that `HTTP-draft-04/2.0` is 6967 * written into |*out| and its length (which is 17) is assigned to 6968 * |*outlen|. 6969 * 6970 * For ALPN, refer to 6971 * https://tools.ietf.org/html/draft-ietf-tls-applayerprotoneg-05 6972 * 6973 * See http://technotes.googlecode.com/git/nextprotoneg.html for more 6974 * details about NPN. 6975 * 6976 * For NPN, to use this method you should do something like:: 6977 * 6978 * static int select_next_proto_cb(SSL* ssl, 6979 * unsigned char **out, 6980 * unsigned char *outlen, 6981 * const unsigned char *in, 6982 * unsigned int inlen, 6983 * void *arg) 6984 * { 6985 * int rv; 6986 * rv = selectNextProtocol(out, outlen, in, inlen); 6987 * if(rv == 1) { 6988 * (cast(MyType*)arg).http2_selected = 1; 6989 * } 6990 * return SSL_TLSEXT_ERR_OK; 6991 * } 6992 * ... 6993 * SSL_CTX_set_next_proto_select_cb(ssl_ctx, select_next_proto_cb, my_obj); 6994 * 6995 */ 6996 int selectNextProtocol(ref ubyte[] output, in ubyte[] input, ubyte[] other_proto = null) 6997 { 6998 size_t i; 6999 size_t len; 7000 while (i < input.length) 7001 { 7002 len = input[i]; 7003 ++i; 7004 ubyte[] proto = cast(ubyte[]) input[i .. i+len]; 7005 i += len; 7006 if (other_proto && other_proto == proto) 7007 { 7008 output = proto; 7009 return 1; 7010 } 7011 7012 if (proto == PROTOCOL_ALPN) { 7013 output = proto; 7014 return 1; 7015 } 7016 if (proto == HTTP_1_1_ALPN) { 7017 output = proto; 7018 return ErrorCode.OK; 7019 } 7020 } 7021 return -1; 7022 } 7023 7024 7025 7026 /** 7027 * Returns true if the $(D RV) library error code 7028 * |lib_error| is fatal. 7029 */ 7030 bool isFatal(int lib_error) { return lib_error < ErrorCode.FATAL; } 7031 7032 7033 /// Configuration options 7034 enum OptionFlags { 7035 /** 7036 * This option prevents the library from sending WINDOW_UPDATE for a 7037 * connection automatically. If this option is set to nonzero, the 7038 * library won't send WINDOW_UPDATE for DATA until application calls 7039 * $(D Session.consume) to indicate the amount of consumed 7040 * DATA. By default, this option is set to zero. 7041 */ 7042 NO_AUTO_WINDOW_UPDATE = 1, 7043 /** 7044 * This option sets the Setting.MAX_CONCURRENT_STREAMS value of 7045 * remote endpoint as if it is received in SETTINGS frame. Without 7046 * specifying this option, before the local endpoint receives 7047 * Setting.MAX_CONCURRENT_STREAMS in SETTINGS frame from remote 7048 * endpoint, Setting.MAX_CONCURRENT_STREAMS is unlimited. This may 7049 * cause problem if local endpoint submits lots of requests 7050 * initially and sending them at once to the remote peer may lead to 7051 * the rejection of some requests. Specifying this option to the 7052 * sensible value, say 100, may avoid this kind of issue. This value 7053 * will be overwritten if the local endpoint receives 7054 * Setting.MAX_CONCURRENT_STREAMS from the remote endpoint. 7055 */ 7056 PEER_MAX_CONCURRENT_STREAMS = 1 << 1, 7057 RECV_CLIENT_PREFACE = 1 << 2, 7058 NO_HTTP_MESSAGING = 1 << 3, 7059 } 7060 7061 /// Struct to store option values for http2_session. 7062 struct Options { 7063 private: 7064 /// Bitwise OR of http2_option_flag to determine which fields are specified. 7065 uint m_opt_set_mask; 7066 7067 uint m_peer_max_concurrent_streams; 7068 7069 bool m_no_auto_window_update; 7070 7071 bool m_recv_client_preface; 7072 7073 bool m_no_http_messaging; 7074 public: 7075 @property uint peer_max_concurrent_streams() const { return m_peer_max_concurrent_streams; } 7076 @property uint opt_set_mask() const { return m_opt_set_mask; } 7077 @property bool no_auto_window_update() const { return m_no_auto_window_update; } 7078 @property bool recv_client_preface() const { return m_recv_client_preface; } 7079 @property bool no_http_messaging() const { return m_no_http_messaging; } 7080 7081 /** 7082 * This option prevents the library from sending WINDOW_UPDATE for a 7083 * connection automatically. If this option is set to nonzero, the 7084 * library won't send WINDOW_UPDATE for DATA until application calls 7085 * `consume()` to indicate the consumed amount of 7086 * data. Don't use $(D submitWindowUpdate) for this purpose. 7087 * By default, this option is set to zero. 7088 */ 7089 @property void setNoAutoWindowUpdate(bool val) 7090 { 7091 if (val) m_opt_set_mask |= OptionFlags.NO_AUTO_WINDOW_UPDATE; 7092 else m_opt_set_mask |= ~OptionFlags.NO_AUTO_WINDOW_UPDATE; 7093 m_no_auto_window_update = val; 7094 } 7095 7096 /** 7097 * This option sets the Setting.MAX_CONCURRENT_STREAMS value of 7098 * remote endpoint as if it is received in SETTINGS frame. Without 7099 * specifying this option, before the local endpoint receives 7100 * Setting.MAX_CONCURRENT_STREAMS in SETTINGS frame from remote 7101 * endpoint, Setting.MAX_CONCURRENT_STREAMS is unlimited. This may 7102 * cause problem if local endpoint submits lots of requests initially 7103 * and sending them at once to the remote peer may lead to the 7104 * rejection of some requests. Specifying this option to the sensible 7105 * value, say 100, may avoid this kind of issue. This value will be 7106 * overwritten if the local endpoint receives 7107 * Setting.MAX_CONCURRENT_STREAMS from the remote endpoint. 7108 */ 7109 void setPeerMaxConcurrentStreams(uint val) 7110 { 7111 m_opt_set_mask |= OptionFlags.PEER_MAX_CONCURRENT_STREAMS; 7112 m_peer_max_concurrent_streams = val; 7113 } 7114 7115 /** 7116 * By default, libhttp2 library only handles HTTP/2 frames and does not 7117 * recognize first 24 bytes of client connection preface. This design 7118 * choice is done due to the fact that server may want to detect the 7119 * application protocol based on first few bytes on clear text 7120 * communication. But for simple servers which only speak HTTP/2, it 7121 * is easier for developers if libhttp2 library takes care of client 7122 * connection preface. 7123 * 7124 * If this option is used with nonzero |val|, libhttp2 library checks 7125 * first 24 bytes client connection preface. If it is not a valid 7126 * one, $(D Session.recv) and $(D Session.memRecv) will 7127 * return error $(D ErrorCode.BAD_PREFACE), which is fatal error. 7128 */ 7129 void setRecvClientPreface(bool val) 7130 { 7131 m_opt_set_mask |= OptionFlags.RECV_CLIENT_PREFACE; 7132 m_recv_client_preface = val; 7133 } 7134 7135 /** 7136 * By default, libhttp2 library enforces subset of HTTP Messaging rules 7137 * described in `HTTP/2 specification, section 8 7138 * <https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-8>`_. 7139 * See `HTTP Messaging`_ section for details. For those applications 7140 * who use libhttp2 library as non-HTTP use, give nonzero to |val| to 7141 * disable this enforcement. 7142 */ 7143 void setNoHTTPMessaging(bool val) 7144 { 7145 m_opt_set_mask |= OptionFlags.NO_HTTP_MESSAGING; 7146 m_no_http_messaging = val; 7147 } 7148 7149 }