123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298(**************************************************************************)(* *)(* OCaml *)(* *)(* KC Sivaramakrishnan, Indian Institute of Technology, Madras *)(* Stephen Dolan, University of Cambridge *)(* Tom Kelly, OCaml Labs Consultancy *)(* *)(* Copyright 2019 Indian Institute of Technology, Madras *)(* Copyright 2014 University of Cambridge *)(* Copyright 2021 OCaml Labs Consultancy Ltd *)(* *)(* All rights reserved. This file is distributed under the terms of *)(* the GNU Lesser General Public License version 2.1, with the *)(* special exception on linking described in the file LICENSE. *)(* *)(**************************************************************************)moduleRaw=struct(* Low-level primitives provided by the runtime *)typet=privateint(* The layouts of [state] and [term_sync] are hard-coded in
[runtime/domain.c] *)type'astate=|Running|Finishedof('a,exn)result[@warning"-unused-constructor"]type'aterm_sync={(* protected by [mut] *)mutablestate:'astate[@warning"-unused-field"];mut:Mutex.t;cond:Condition.t;}externalspawn:(unit->'a)->'aterm_sync->t="caml_domain_spawn"externalself:unit->t="caml_ml_domain_id"[@@noalloc]externalcpu_relax:unit->unit="caml_ml_domain_cpu_relax"externalget_recommended_domain_count:unit->int="caml_recommended_domain_count"[@@noalloc]endletcpu_relax()=Raw.cpu_relax()typeid=Raw.ttype'at={domain:Raw.t;term_sync:'aRaw.term_sync;}moduleDLS=structmoduleObj_opt:sigtypetvalnone:tvalsome:'a->tvalis_some:t->bool(** [unsafe_get obj] may only be called safely
if [is_some] is true.
[unsafe_get (some v)] is equivalent to
[Obj.obj (Obj.repr v)]. *)valunsafe_get:t->'aend=structtypet=Obj.tletnone=Obj.repr(ref0)letsomev=Obj.reprvletis_someobj=(obj!=none)letunsafe_getobj=Obj.objobjendtypedls_state=Obj_opt.tarrayexternalget_dls_state:unit->dls_state="%dls_get"externalset_dls_state:dls_state->unit="caml_domain_dls_set"[@@noalloc]externalcompare_and_set_dls_state:dls_state->dls_state->bool="caml_domain_dls_compare_and_set"[@@noalloc]letcreate_dls()=letst=Array.make8Obj_opt.noneinset_dls_statestlet_=create_dls()type'akey=int*(unit->'a)letkey_counter=Atomic.make0typekey_initializer=KI:'akey*('a->'a)->key_initializerletparent_keys=Atomic.make([]:key_initializerlist)letrecadd_parent_keyki=letl=Atomic.getparent_keysinifnot(Atomic.compare_and_setparent_keysl(ki::l))thenadd_parent_keykiletnew_key?split_from_parentinit_orphan=letidx=Atomic.fetch_and_addkey_counter1inletk=(idx,init_orphan)inbeginmatchsplit_from_parentwith|None->()|Somesplit->add_parent_key(KI(k,split))end;k(* If necessary, grow the current domain's local state array such that [idx]
* is a valid index in the array. *)letrecmaybe_growidx=letst=get_dls_state()inletsz=Array.lengthstinifidx<szthenstelsebeginletreccompute_new_sizes=ifidx<sthenselsecompute_new_size(2*s)inletnew_sz=compute_new_sizeszinletnew_st=Array.makenew_szObj_opt.noneinArray.blitst0new_st0sz;(* We want a implementation that is safe with respect to
single-domain multi-threading: retry if the DLS state has
changed under our feet.
Note that the number of retries will be very small in
contended scenarios, as the array only grows, with
exponential resizing. *)ifcompare_and_set_dls_statestnew_stthennew_stelsemaybe_growidxendletset(typea)(idx,_init)(x:a)=letst=maybe_growidxin(* [Sys.opaque_identity] ensures that flambda does not look at the type of
* [x], which may be a [float] and conclude that the [st] is a float array.
* We do not want OCaml's float array optimisation kicking in here. *)st.(idx)<-Obj_opt.some(Sys.opaque_identityx)let[@inlinenever]array_compare_and_setaioldvalnewval=(* Note: we cannot use [@poll error] due to the
allocations on a.(i) in the Double_array case. *)letcurval=a.(i)inifcurval==oldvalthen(Array.unsafe_setainewval;true)elsefalseletget(typea)((idx,init):akey):a=letst=maybe_growidxinletobj=st.(idx)inifObj_opt.is_someobjthen(Obj_opt.unsafe_getobj:a)elsebeginletv:a=init()inletnew_obj=Obj_opt.some(Sys.opaque_identityv)in(* At this point, [st] or [st.(idx)] may have been changed
by another thread on the same domain.
If [st] changed, it was resized into a larger value,
we can just reuse the new value.
If [st.(idx)] changed, we drop the current value to avoid
letting other threads observe a 'revert' that forgets
previous modifications. *)letst=get_dls_state()inifarray_compare_and_setstidxobjnew_objthenvelsebegin(* if st.(idx) changed, someone must have initialized
the key in the meantime. *)letupdated_obj=st.(idx)inifObj_opt.is_someupdated_objthen(Obj_opt.unsafe_getupdated_obj:a)elseassertfalseendendtypekey_value=KV:'akey*'a->key_valueletget_initial_keys():key_valuelist=List.map(fun(KI(k,split))->KV(k,(split(getk))))(Atomic.getparent_keys)letset_initial_keys(l:key_valuelist)=List.iter(fun(KV(k,v))->setkv)lend(******** Identity **********)letget_id{domain;_}=domainletself()=Raw.self()letis_main_domain()=(self():>int)=0(******** Callbacks **********)(* first spawn, domain startup and at exit functionality *)letfirst_domain_spawned=Atomic.makefalseletfirst_spawn_function=ref(fun()->())letbefore_first_spawnf=ifAtomic.getfirst_domain_spawnedthenraise(Invalid_argument"first domain already spawned")elsebeginletold_f=!first_spawn_functioninletnew_f()=old_f();f()infirst_spawn_function:=new_fendletdo_before_first_spawn()=ifnot(Atomic.getfirst_domain_spawned)thenbeginAtomic.setfirst_domain_spawnedtrue;!first_spawn_function();(* Release the old function *)first_spawn_function:=(fun()->())endletat_exit_key=DLS.new_key(fun()->(fun()->()))letat_exitf=letold_exit:unit->unit=DLS.getat_exit_keyinletnew_exit()=f();old_exit()inDLS.setat_exit_keynew_exitletdo_at_exit()=letf:unit->unit=DLS.getat_exit_keyinf()let_=Stdlib.do_domain_local_at_exit:=do_at_exit(******* Creation and Termination ********)letspawnf=do_before_first_spawn();letpk=DLS.get_initial_keys()in(* [term_sync] is used to synchronize with the joining domains *)letterm_sync=Raw.{state=Running;mut=Mutex.create();cond=Condition.create()}inletbody()=matchDLS.create_dls();DLS.set_initial_keyspk;letres=f()inreswith(* Run the [at_exit] callbacks when the domain computation either
terminates normally or exceptionally. *)|res->(* If the domain computation terminated normally, but the
[at_exit] callbacks raised an exception, then return the
exception. *)do_at_exit();res|exceptionexn->(* If both the domain computation and the [at_exit] callbacks
raise exceptions, then ignore the exception from the
[at_exit] callbacks and return the original exception. *)(trydo_at_exit()with_->());raiseexninletdomain=Raw.spawnbodyterm_syncin{domain;term_sync}letjoin{term_sync;_}=letopenRawinletrecloop()=matchterm_sync.statewith|Running->Condition.waitterm_sync.condterm_sync.mut;loop()|Finishedres->resinmatchMutex.protectterm_sync.mutloopwith|Okx->x|Errorex->raiseexletrecommended_domain_count=Raw.get_recommended_domain_count