executor在发送outbounding message的时候, 需要决定发送到next component的哪些tasks
这里就需要用到streaming grouping,
1. mk-grouper
除了direct grouping, 返回的是grouper function, 执行该grouper function得到target tasks list
direct grouping返回, :direct(defn- mk-grouper "Returns a function that returns a vector of which task indices to send tuple to, or just a single task index." [^WorkerTopologyContext context component-id stream-id ^Fields out-fields thrift-grouping ^List target-tasks] (let [num-tasks (count target-tasks) random (Random.) target-tasks (vec (sort target-tasks))] (condp = (thrift/grouping-type thrift-grouping) :fields ;;1.1 fields-grouping, 根据某个field进行grouping (if (thrift/global-grouping? thrift-grouping) ;;1.2 fields为空时,代表global-grouping,所有tuple发到一个task (fn [task-id tuple] ;; It's possible for target to have multiple tasks if it reads multiple sources (first target-tasks)) ;;对于global-grouping,取排过序的第一个task, taskid最小的task (let [group-fields (Fields. (thrift/field-grouping thrift-grouping))] ;;取出group-fields (mk-fields-grouper out-fields group-fields target-tasks) )) :all (fn [task-id tuple] target-tasks) ;;1.3 all-grouping, 比较简单, 发送到所有task, 所以返回整个target-tasks :shuffle (mk-shuffle-grouper target-tasks) ;;1.4 shuffle-grouping :local-or-shuffle ;;1.5 local优先, 如果目标tasks有local的则shuffle到local的tasks (let [same-tasks (set/intersection (set target-tasks) (set (.getThisWorkerTasks context)))] (if-not (empty? same-tasks) (mk-shuffle-grouper (vec same-tasks)) (mk-shuffle-grouper target-tasks))) :none ;;1.6 简单的版本的random,从target-tasks随机取一个 (fn [task-id tuple] (let [i (mod (.nextInt random) num-tasks)] (.get target-tasks i) )) :custom-object (let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))] (mk-custom-grouper grouping context component-id stream-id target-tasks)) :custom-serialized (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping))] (mk-custom-grouper grouping context component-id stream-id target-tasks)) :direct :direct )))
1.1 fields-groups
使用.select取出group-fields在tuple中对应的values list, 你可以使用多个fields来进行group
使用tuple/list-hash-code, 对values list产生hash code 对num-tasks取mod, 并使用task-getter取出对应的target-tasks(defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields ^List target-tasks] (let [num-tasks (count target-tasks) task-getter (fn [i] (.get target-tasks i))] (fn [task-id ^List values] (-> (.select out-fields group-fields values) tuple/list-hash-code (mod num-tasks) task-getter))))
Fields类, 除了存放fields的list, 还有个用于快速field读取的index index的生成, 很简单, 就是记录fields以及自然排序 使用时调用select, 给出需要哪几个fields的value, 以及tuple 从index读出fields的index值, 直接从tuple中读出对应index的value (当然生成tuple的时候, 也必须安装fields的顺序生成)
public class Fields implements Iterable, Serializable { private List _fields; private Map _index = new HashMap (); private void index() { for(int i=0; i<_fields.size(); i++) { _index.put(_fields.get(i), i); } } public List
1.2 globle-groups
fields grouping, 但是field为空, 就代表globle grouping, 所有tuple都发送到一个task
默认选取第一个task
1.3 all-groups
发送到所有的tasks
1.4 shuffle-grouper
没有采用比较简单的直接用random取值的方式(区别于none-grouping)
因为考虑到load balance, 所以采用下面这种伪随机的实现方式对target-tasks, 先随机shuffle, 打乱次序
在acquire-random-range-id, 会依次读所有的task, 这样保证, 虽然顺序是随机的, 但是每个task都会被选中一次 当curr越界时, 清空curr, 并从新shuffle target-tasks(defn- mk-shuffle-grouper [^List target-tasks] (let [choices (rotating-random-range target-tasks)] (fn [task-id tuple] (acquire-random-range-id choices))))
(defn rotating-random-range [choices] (let [rand (Random.) choices (ArrayList. choices)] (Collections/shuffle choices rand) [(MutableInt. -1) choices rand]))(defn acquire-random-range-id [[^MutableInt curr ^List state ^Random rand]] (when (>= (.increment curr) (.size state)) (.set curr 0) (Collections/shuffle state rand)) (.get state (.get curr)))
1.5 local-or-shuffle
local tasks优先选取, 并采用shuffle的方式1.6 none-grouping
不care grouping的方式, 现在的实现就是简单的random1.7 customing-grouping
可以自定义CustomStreamGrouping, 关键就是定义chooseTasks逻辑, 来实现自己的tasks choose策略
(defn- mk-custom-grouper [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String component-id ^String stream-id target-tasks] (.prepare grouping context (GlobalStreamId. component-id stream-id) target-tasks) (fn [task-id ^List values] (.chooseTasks grouping task-id values) ))
public interface CustomStreamGrouping extends Serializable { /** * Tells the stream grouping at runtime the tasks in the target bolt. * This information should be used in chooseTasks to determine the target tasks. * * It also tells the grouping the metadata on the stream this grouping will be used on. */ void prepare(WorkerTopologyContext context, GlobalStreamId stream, ListtargetTasks); /** * This function implements a custom stream grouping. It takes in as input * the number of tasks in the target bolt in prepare and returns the * tasks to send the tuples to. * * @param values the values to group on */ List chooseTasks(int taskId, List values); }
:custom-object 和:custom-serialized 的不同仅仅是, thrift-grouping是否被序列化过
没有就可以直接读出object, 否则需要反序列成object1.8 direct-grouping
producer of the tuple decides which task of the consumer will receive this tuple.
Direct groupings can only be declared on streams that have been declared as direct streams.这里直接返回:direct, 因为direct-grouping, 发送到哪个tasks, 是由producer产生tuple的时候已经决定了, 所以这里不需要做任何grouping相关工作
2 stream->component->grouper
outbound-components 一个executor只会对应于一个component, 所以给出当前executor的component-id
getTargets, 可以得出所有outbound components, [streamid, [target-componentid, grouping]]调用outbound-groupings,
最终返回[streamid [component grouper]]的hashmap, 并赋值给executor-data中的stream->component->groupertask在最终发送message的时候, 就会通过stream->component->grouper来产生真正的target tasks list
(defn outbound-components "Returns map of stream id to component id to grouper" [^WorkerTopologyContext worker-context component-id] (->> (.getTargets worker-context component-id) ;;[streamid, [target-componentid, grouping]] clojurify-structure (map (fn [[stream-id component->grouping]] [stream-id (outbound-groupings worker-context component-id stream-id (.getComponentOutputFields worker-context component-id stream-id) component->grouping)])) (into {}) (HashMap.)))
outbound-groupings
对每个task不为空的target component调用mk-grouper mk-grouper返回的是grouper fn, 所以, 最终的返回, [component, grouper](defn- outbound-groupings [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping] (->> component->grouping (filter-key #(-> worker-context ;;component对应的tasks不为0 (.getComponentTasks %) count pos?)) (map (fn [[component tgrouping]] [component (mk-grouper worker-context this-component-id stream-id out-fields tgrouping (.getComponentTasks worker-context component) )])) (into {}) (HashMap.)))