Mailing List Archive

[PATCH v2 8/8] tools/ocaml/xenstored: Implement live update for socket connections
From: Edvin Torok <edvint@eddie2.eng.citrite.net>

Signed-off-by: Edwin Török <edvin.torok@citrix.com>
Reviewed-by: Pau Ruiz Safont <pau.safont@citrix.com>
Reviewed-by: Christian Lindig <christian.lindig@citrix.com>

---
Changed since V1
* post publicly now that the XSA is out
---
tools/ocaml/xenstored/connection.ml | 25 +++++---
tools/ocaml/xenstored/parse_arg.ml | 4 ++
tools/ocaml/xenstored/process.ml | 51 ++++++++++++-----
tools/ocaml/xenstored/store.ml | 2 +-
tools/ocaml/xenstored/utils.ml | 12 ++++
tools/ocaml/xenstored/xenstored.ml | 88 +++++++++++++++++++++--------
6 files changed, 138 insertions(+), 44 deletions(-)

diff --git a/tools/ocaml/xenstored/connection.ml b/tools/ocaml/xenstored/connection.ml
index bd02060cd0..eb23c3af7a 100644
--- a/tools/ocaml/xenstored/connection.ml
+++ b/tools/ocaml/xenstored/connection.ml
@@ -281,6 +281,9 @@ let get_transaction con tid =

let do_input con = Xenbus.Xb.input con.xb
let has_input con = Xenbus.Xb.has_in_packet con.xb
+let has_partial_input con = match con.xb.Xenbus.Xb.partial_in with
+ | HaveHdr _ -> true
+ | NoHdr (n, _) -> n < Xenbus.Partial.header_size ()
let pop_in con = Xenbus.Xb.get_in_packet con.xb
let has_more_input con = Xenbus.Xb.has_more_input con.xb

@@ -309,12 +312,13 @@ let is_bad con = match con.dom with None -> false | Some dom -> Domain.is_bad_do
Restrictions below can be relaxed once xenstored learns to dump more
of its live state in a safe way *)
let has_extra_connection_data con =
- let has_in = has_input con in
+ let has_in = has_input con || has_partial_input con in
let has_out = has_output con in
let has_socket = con.dom = None in
let has_nondefault_perms = make_perm con.dom <> con.perm in
has_in || has_out
- || has_socket (* dom0 sockets not dumped yet *)
+ (* TODO: what about SIGTERM, should use systemd to store FDS
+ || has_socket (* dom0 sockets not * dumped yet *) *)
|| has_nondefault_perms (* set_target not dumped yet *)

let has_transaction_data con =
@@ -337,16 +341,21 @@ let stats con =
Hashtbl.length con.watches, con.stat_nb_ops

let dump con chan =
- match con.dom with
+ let id = match con.dom with
| Some dom ->
let domid = Domain.get_id dom in
(* dump domain *)
Domain.dump dom chan;
- (* dump watches *)
- List.iter (fun (path, token) ->
- Printf.fprintf chan "watch,%d,%s,%s\n" domid (Utils.hexify path) (Utils.hexify token)
- ) (list_watches con);
- | None -> ()
+ domid
+ | None ->
+ let fd = con |> get_fd |> Utils.FD.to_int in
+ Printf.fprintf chan "socket,%d\n" fd;
+ -fd
+ in
+ (* dump watches *)
+ List.iter (fun (path, token) ->
+ Printf.fprintf chan "watch,%d,%s,%s\n" id (Utils.hexify path) (Utils.hexify token)
+ ) (list_watches con)

let debug con =
let domid = get_domstr con in
diff --git a/tools/ocaml/xenstored/parse_arg.ml b/tools/ocaml/xenstored/parse_arg.ml
index 2c4b5a8528..7c0478e76a 100644
--- a/tools/ocaml/xenstored/parse_arg.ml
+++ b/tools/ocaml/xenstored/parse_arg.ml
@@ -24,6 +24,7 @@ type config =
pidfile: string option; (* old xenstored compatibility *)
tracefile: string option; (* old xenstored compatibility *)
restart: bool;
+ live_reload: bool;
disable_socket: bool;
}

@@ -35,6 +36,7 @@ let do_argv =
and reraise_top_level = ref false
and config_file = ref ""
and restart = ref false
+ and live_reload = ref false
and disable_socket = ref false
in

@@ -52,6 +54,7 @@ let do_argv =
("--pid-file", Arg.Set_string pidfile, ""); (* for compatibility *)
("-T", Arg.Set_string tracefile, ""); (* for compatibility *)
("--restart", Arg.Set restart, "Read database on starting");
+ ("--live", Arg.Set live_reload, "Read live dump on startup");
("--disable-socket", Arg.Unit (fun () -> disable_socket := true), "Disable socket");
] in
let usage_msg = "usage : xenstored [--config-file <filename>] [--no-domain-init] [--help] [--no-fork] [--reraise-top-level] [--restart] [--disable-socket]" in
@@ -65,5 +68,6 @@ let do_argv =
pidfile = if !pidfile <> "" then Some !pidfile else None;
tracefile = if !tracefile <> "" then Some !tracefile else None;
restart = !restart;
+ live_reload = !live_reload;
disable_socket = !disable_socket;
}
diff --git a/tools/ocaml/xenstored/process.ml b/tools/ocaml/xenstored/process.ml
index 3174d8ede5..dd50456ad5 100644
--- a/tools/ocaml/xenstored/process.ml
+++ b/tools/ocaml/xenstored/process.ml
@@ -91,19 +91,24 @@ type t =
; cmdline: string list
; deadline: float
; force: bool
+ ; result: string list
; pending: bool }

let state =
ref
{ binary= Sys.executable_name
- ; cmdline= []
+ ; cmdline= (Sys.argv |> Array.to_list |> List.tl)
; deadline= 0.
; force= false
+ ; result = []
; pending= false }

let debug = Printf.eprintf

-let args_of_t t = (t.binary, "--restart" :: t.cmdline)
+let forced_args = ["--live"; "--restart"]
+let args_of_t t =
+ let filtered = List.filter (fun x -> not @@ List.mem x forced_args) t.cmdline in
+ (t.binary, forced_args @ filtered)

let string_of_t t =
let executable, rest = args_of_t t in
@@ -117,12 +122,12 @@ let launch_exn t =

let validate_exn t =
(* --help must be last to check validity of earlier arguments *)
- let t = {t with cmdline= t.cmdline @ ["--help"]} in
- let cmd = string_of_t t in
+ let t' = {t with cmdline= t.cmdline @ ["--help"]} in
+ let cmd = string_of_t t' in
debug "Executing %s" cmd ;
match Unix.fork () with
| 0 ->
- ( try launch_exn t with _ -> exit 2 )
+ ( try launch_exn t' with _ -> exit 2 )
| pid -> (
match Unix.waitpid [] pid with
| _, Unix.WEXITED 0 ->
@@ -146,10 +151,14 @@ let parse_live_update args =
validate_exn {!state with binary= file}
| ["-a"] ->
debug "Live update aborted" ;
- {!state with pending= false}
+ {!state with pending= false; result = []}
| "-c" :: cmdline ->
- validate_exn {!state with cmdline}
+ validate_exn {!state with cmdline = !state.cmdline @ cmdline}
| "-s" :: _ ->
+ (match !state.pending, !state.result with
+ | true, _ -> !state (* no change to state, avoid resetting timeout *)
+ | false, _ :: _ -> !state (* we got a pending result to deliver *)
+ | false, [] ->
let timeout = ref 60 in
let force = ref false in
Arg.parse_argv ~current:(ref 0) (Array.of_list args)
@@ -165,10 +174,16 @@ let parse_live_update args =
"live-update -s" ;
debug "Live update process queued" ;
{!state with deadline = Unix.gettimeofday () +. float !timeout
- ; force= !force; pending= true}
+ ; force= !force; pending= true})
| _ ->
invalid_arg ("Unknown arguments: " ^ String.concat "," args)) ;
- None
+ match !state.pending, !state.result with
+ | true, _ -> Some "BUSY"
+ | false, (_ :: _ as result) ->
+ (* xenstore-control has read the result, clear it *)
+ state := { !state with result = [] };
+ Some (String.concat "\n" result)
+ | false, [] -> None
with
| Arg.Bad s | Arg.Help s | Invalid_argument s ->
Some s
@@ -182,17 +197,27 @@ let parse_live_update args =
| [] -> true
| _ when Unix.gettimeofday () < t.deadline -> false
| l ->
- info "Live update timeout reached: %d active connections" (List.length l);
- List.iter (fun con -> warn "%s prevents live update" (Connection.get_domstr con)) l;
+ warn "timeout reached: have to wait, migrate or shutdown %d domains:" (List.length l);
+ let msgs = List.rev_map (fun con -> Printf.sprintf "%s: %d tx, in: %b, out: %b, perm: %s"
+ (Connection.get_domstr con)
+ (Connection.number_of_transactions con)
+ (Connection.has_input con)
+ (Connection.has_output con)
+ (Connection.get_perm con |> Perms.Connection.to_string)
+ ) l in
+ List.iter (warn "Live-update: %s") msgs;
if t.force then begin
warn "Live update forced, some domain connections may break!";
true
end else begin
- warn "Live update aborted, try migrating or shutting down the domains/toolstack";
- state := { t with pending = false };
+ warn "Live update aborted (see above for domains preventing it)";
+ state := { t with pending = false; result = msgs};
false
end
end else false
+
+ let completed () =
+ state := { !state with result = ["OK"] }
end

(* packets *)
diff --git a/tools/ocaml/xenstored/store.ml b/tools/ocaml/xenstored/store.ml
index e20767372f..a3be2e6bbe 100644
--- a/tools/ocaml/xenstored/store.ml
+++ b/tools/ocaml/xenstored/store.ml
@@ -366,7 +366,7 @@ let traversal root_node f =
let rec _traversal path node =
f path node;
let node_path = Path.of_path_and_name path (Symbol.to_string node.Node.name) in
- List.iter (_traversal node_path) node.Node.children
+ List.iter (_traversal node_path) (List.rev node.Node.children)
in
_traversal [] root_node

diff --git a/tools/ocaml/xenstored/utils.ml b/tools/ocaml/xenstored/utils.ml
index eb79bf0146..6c1603c276 100644
--- a/tools/ocaml/xenstored/utils.ml
+++ b/tools/ocaml/xenstored/utils.ml
@@ -115,3 +115,15 @@ let path_validate path connection_path =
if len > !Define.path_max then raise Define.Invalid_path;

abs_path
+
+module FD : sig
+ type t = Unix.file_descr
+ val of_int: int -> t
+ val to_int : t -> int
+end = struct
+ type t = Unix.file_descr
+ (* This is like Obj.magic but just for these types,
+ and relies on Unix.file_descr = int *)
+ external to_int : t -> int = "%identity"
+ external of_int : int -> t = "%identity"
+end
diff --git a/tools/ocaml/xenstored/xenstored.ml b/tools/ocaml/xenstored/xenstored.ml
index 22413271fb..5893af2caa 100644
--- a/tools/ocaml/xenstored/xenstored.ml
+++ b/tools/ocaml/xenstored/xenstored.ml
@@ -141,9 +141,12 @@ exception Bad_format of string

let dump_format_header = "$xenstored-dump-format"

-let from_channel_f chan domain_f watch_f store_f =
+let from_channel_f chan global_f socket_f domain_f watch_f store_f =
let unhexify s = Utils.unhexify s in
- let getpath s = Store.Path.of_string (Utils.unhexify s) in
+ let getpath s =
+ let u = Utils.unhexify s in
+ debug "Path: %s" u;
+ Store.Path.of_string u in
let header = input_line chan in
if header <> dump_format_header then
raise (Bad_format "header");
@@ -155,6 +158,12 @@ let from_channel_f chan domain_f watch_f store_f =
let l = String.split ',' line in
try
match l with
+ | "global" :: rw :: _ ->
+ (* there might be more parameters here,
+ e.g. a RO socket from a previous version: ignore it *)
+ global_f ~rw
+ | "socket" :: fd :: [] ->
+ socket_f ~fd:(int_of_string fd)
| "dom" :: domid :: mfn :: port :: []->
domain_f (int_of_string domid)
(Nativeint.of_string mfn)
@@ -175,12 +184,28 @@ let from_channel_f chan domain_f watch_f store_f =
with End_of_file ->
quit := true
done;
- ()
+ info "Completed loading xenstore dump"

let from_channel store cons doms chan =
(* don't let the permission get on our way, full perm ! *)
let op = Store.get_ops store Perms.Connection.full_rights in
-
+ let rwro = ref (None) in
+ let global_f ~rw =
+ let get_listen_sock sockfd =
+ let fd = sockfd |> int_of_string |> Utils.FD.of_int in
+ Unix.listen fd 1;
+ Some fd
+ in
+ rwro := get_listen_sock rw
+ in
+ let socket_f ~fd =
+ let ufd = Utils.FD.of_int fd in
+ let is_valid = try (Unix.fstat ufd).Unix.st_kind = Unix.S_SOCK with _ -> false in
+ if is_valid then
+ Connections.add_anonymous cons ufd
+ else
+ warn "Ignoring invalid socket FD %d" fd
+ in
let domain_f domid mfn port =
let ndom =
if domid > 0 then
@@ -190,28 +215,38 @@ let from_channel store cons doms chan =
in
Connections.add_domain cons ndom;
in
- let watch_f domid path token =
- let con = Connections.find_domain cons domid in
- ignore (Connections.add_watch cons con path token)
+ let get_con id =
+ if id < 0 then Connections.find cons (Utils.FD.of_int (-id))
+ else Connections.find_domain cons id
+ in
+ let watch_f id path token =
+ ignore (Connections.add_watch cons (get_con id) path token)
in
let store_f path perms value =
op.Store.write path value;
op.Store.setperms path perms
in
- from_channel_f chan domain_f watch_f store_f
+ from_channel_f chan global_f socket_f domain_f watch_f store_f;
+ !rwro

let from_file store cons doms file =
+ info "Loading xenstore dump from %s" file;
let channel = open_in file in
finally (fun () -> from_channel store doms cons channel)
(fun () -> close_in channel)

-let to_channel store cons chan =
+let to_channel store cons rw chan =
let hexify s = Utils.hexify s in

fprintf chan "%s\n" dump_format_header;
+ let fdopt = function None -> -1 | Some fd ->
+ (* systemd and utils.ml sets it close on exec *)
+ Unix.clear_close_on_exec fd;
+ Utils.FD.to_int fd in
+ fprintf chan "global,%d\n" (fdopt rw);

- (* dump connections related to domains; domid, mfn, eventchn port, watches *)
- Connections.iter_domains cons (fun con -> Connection.dump con chan);
+ (* dump connections related to domains: domid, mfn, eventchn port/ sockets, and watches *)
+ Connections.iter cons (fun con -> Connection.dump con chan);

(* dump the store *)
Store.dump_fct store (fun path node ->
@@ -224,9 +259,9 @@ let to_channel store cons chan =
()


-let to_file store cons file =
+let to_file store cons fds file =
let channel = open_out_gen [ Open_wronly; Open_creat; Open_trunc; ] 0o600 file in
- finally (fun () -> to_channel store cons channel)
+ finally (fun () -> to_channel store cons fds channel)
(fun () -> close_out channel)
end

@@ -246,13 +281,13 @@ let _ =
);

let rw_sock =
- if cf.disable_socket then
+ if cf.disable_socket || cf.live_reload then
None
else
Some (Unix.handle_unix_error Utils.create_unix_socket Define.xs_daemon_socket)
in

- if cf.daemonize then
+ if cf.daemonize && not cf.live_reload then
Unixext.daemonize ()
else
printf "Xen Storage Daemon, version %d.%d\n%!"
@@ -292,10 +327,15 @@ let _ =
List.iter (fun path ->
Store.write store Perms.Connection.full_rights path "") Store.Path.specials;

+ let rw_sock =
if cf.restart && Sys.file_exists Disk.xs_daemon_database then (
- DB.from_file store domains cons Disk.xs_daemon_database;
- Event.bind_dom_exc_virq eventchn
- ) else (
+ let rwro = DB.from_file store domains cons Disk.xs_daemon_database in
+ info "Live reload: database loaded";
+ Event.bind_dom_exc_virq eventchn;
+ Process.LiveUpdate.completed ();
+ rwro
+ ) else (
+ info "No live reload: regular startup";
if !Disk.enable then (
info "reading store from disk";
Disk.read store
@@ -309,10 +349,13 @@ let _ =
Connections.add_domain cons (Domains.create0 domains);
Event.bind_dom_exc_virq eventchn
);
- );
+ rw_sock
+ ) in

(* required for xenstore-control to detect availability of live-update *)
- Store.mkdir store Perms.Connection.full_rights (Store.Path.of_string "/tool");
+ let toolpath = Store.Path.of_string "/tool" in
+ if not (Store.path_exists store toolpath) then
+ Store.mkdir store Perms.Connection.full_rights toolpath;
Store.write store Perms.Connection.full_rights
(Store.Path.of_string "/tool/xenstored") Sys.executable_name;

@@ -324,7 +367,7 @@ let _ =
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;

if cf.activate_access_log then begin
- let post_rotate () = DB.to_file store cons Disk.xs_daemon_database in
+ let post_rotate () = DB.to_file store cons (None) Disk.xs_daemon_database in
Logging.init_access_log post_rotate
end;

@@ -367,6 +410,7 @@ let _ =
let ring_scan_checker dom =
(* no need to scan domains already marked as for processing *)
if not (Domain.get_io_credit dom > 0) then
+ debug "Looking up domid %d" (Domain.get_id dom);
let con = Connections.find_domain cons (Domain.get_id dom) in
if not (Connection.has_more_work con) then (
Process.do_output store cons domains con;
@@ -496,7 +540,7 @@ let _ =
live_update := Process.LiveUpdate.should_run cons;
if !live_update || !quit then begin
(* don't initiate live update if saving state fails *)
- DB.to_file store cons Disk.xs_daemon_database;
+ DB.to_file store cons (rw_sock) Disk.xs_daemon_database;
quit := true;
end
with exc ->
--
2.29.2