Skip to content

Commit a2d4863

Browse files
author
Andrew Jeffery
committed
Refactor call and rpc client handlers
This makes sure that the rpc can send requests before having received anything.
1 parent 6df3d90 commit a2d4863

File tree

2 files changed

+16
-11
lines changed

2 files changed

+16
-11
lines changed

lib/grpc-lwt/client.ml

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,19 @@ let make_request ~scheme ~service ~rpc =
2121

2222
let call ~service ~rpc ?(scheme = "https") ~handler ~do_request () =
2323
let request = make_request ~service ~rpc ~scheme in
24-
let write_body, write_body_notify = Lwt.wait () in
25-
let out, notify_out = Lwt.wait () in
24+
let read_body, read_body_notify = Lwt.wait () in
25+
let handler_res, handler_res_notify = Lwt.wait () in
26+
let out, out_notify = Lwt.wait () in
2627
let response_handler (response : H2.Response.t) body =
28+
Lwt.wakeup_later read_body_notify body;
2729
Lwt.async (fun () ->
2830
if response.status <> `OK then (
29-
Lwt.wakeup_later notify_out
31+
Lwt.wakeup_later out_notify
3032
(Error (Grpc.Status.v Grpc.Status.Unknown));
3133
Lwt.return_unit )
3234
else
33-
let+ out = handler write_body body in
34-
Lwt.wakeup_later notify_out (Ok out))
35+
let+ handler_res = handler_res in
36+
Lwt.wakeup_later out_notify (Ok handler_res))
3537
in
3638
let status, status_notify = Lwt.wait () in
3739
let trailers_handler headers =
@@ -50,23 +52,26 @@ let call ~service ~rpc ?(scheme = "https") ~handler ~do_request () =
5052
let status = Grpc.Status.v ?message code in
5153
Lwt.wakeup_later status_notify status
5254
in
53-
let body =
55+
let write_body =
5456
do_request ?trailers_handler:(Some trailers_handler) request
5557
~response_handler
5658
in
57-
Lwt.wakeup_later write_body_notify body;
59+
Lwt.async (fun () ->
60+
let+ handler_res = handler write_body read_body in
61+
Lwt.wakeup_later handler_res_notify handler_res);
5862
let* out = out in
5963
let+ status = status in
6064
match out with Error _ as e -> e | Ok out -> Ok (out, status)
6165

6266
module Rpc = struct
6367
type 'a handler =
64-
[ `write ] H2.Body.t Lwt.t -> [ `read ] H2.Body.t -> 'a Lwt.t
68+
[ `write ] H2.Body.t -> [ `read ] H2.Body.t Lwt.t -> 'a Lwt.t
6569

6670
let bidirectional_streaming ~f write_body read_body =
67-
let* write_body = write_body in
6871
let decoder_stream, decoder_push = Lwt_stream.create () in
69-
Connection.grpc_recv_streaming read_body decoder_push;
72+
Lwt.async (fun () ->
73+
let+ read_body = read_body in
74+
Connection.grpc_recv_streaming read_body decoder_push);
7075
let encoder_stream, encoder_push = Lwt_stream.create () in
7176
Lwt.async (fun () ->
7277
Connection.grpc_send_streaming_client write_body encoder_stream);

lib/grpc-lwt/client.mli

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module Rpc : sig
22
type 'a handler =
3-
[ `write ] H2.Body.t Lwt.t -> [ `read ] H2.Body.t -> 'a Lwt.t
3+
[ `write ] H2.Body.t -> [ `read ] H2.Body.t Lwt.t -> 'a Lwt.t
44

55
val bidirectional_streaming :
66
f:((string -> unit) -> Grpc.Buffer.t Lwt_stream.t -> 'a Lwt.t) -> 'a handler

0 commit comments

Comments
 (0)