博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)
阅读量:5104 次
发布时间:2019-06-13

本文共 8096 字,大约阅读时间需要 26 分钟。

executor在发送outbounding message的时候, 需要决定发送到next component的哪些tasks

这里就需要用到streaming grouping,

 

1. mk-grouper

除了direct grouping, 返回的是grouper function, 执行该grouper function得到target tasks list

direct grouping返回, :direct

(defn- mk-grouper  "Returns a function that returns a vector of which task indices to send tuple to, or just a single task index."  [^WorkerTopologyContext context component-id stream-id ^Fields out-fields thrift-grouping ^List target-tasks]  (let [num-tasks (count target-tasks)        random (Random.)        target-tasks (vec (sort target-tasks))]    (condp = (thrift/grouping-type thrift-grouping)      :fields                                         ;;1.1 fields-grouping, 根据某个field进行grouping        (if (thrift/global-grouping? thrift-grouping) ;;1.2 fields为空时,代表global-grouping,所有tuple发到一个task          (fn [task-id tuple]            ;; It's possible for target to have multiple tasks if it reads multiple sources            (first target-tasks))                     ;;对于global-grouping,取排过序的第一个task, taskid最小的task          (let [group-fields (Fields. (thrift/field-grouping thrift-grouping))] ;;取出group-fields              (mk-fields-grouper out-fields group-fields target-tasks)            ))      :all        (fn [task-id tuple] target-tasks) ;;1.3 all-grouping, 比较简单, 发送到所有task, 所以返回整个target-tasks      :shuffle        (mk-shuffle-grouper target-tasks) ;;1.4 shuffle-grouping      :local-or-shuffle                   ;;1.5 local优先, 如果目标tasks有local的则shuffle到local的tasks        (let [same-tasks (set/intersection                           (set target-tasks)                           (set (.getThisWorkerTasks context)))]          (if-not (empty? same-tasks)            (mk-shuffle-grouper (vec same-tasks))            (mk-shuffle-grouper target-tasks)))      :none                              ;;1.6 简单的版本的random,从target-tasks随机取一个           (fn [task-id tuple]          (let [i (mod (.nextInt random) num-tasks)]            (.get target-tasks i)            ))      :custom-object        (let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))]          (mk-custom-grouper grouping context component-id stream-id target-tasks))      :custom-serialized        (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping))]          (mk-custom-grouper grouping context component-id stream-id target-tasks))      :direct        :direct      )))

 

1.1 fields-groups

使用.select取出group-fields在tuple中对应的values list, 你可以使用多个fields来进行group

使用tuple/list-hash-code, 对values list产生hash code
对num-tasks取mod, 并使用task-getter取出对应的target-tasks

(defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields ^List target-tasks]  (let [num-tasks (count target-tasks)        task-getter (fn [i] (.get target-tasks i))]    (fn [task-id ^List values]      (-> (.select out-fields group-fields values)          tuple/list-hash-code          (mod num-tasks)          task-getter))))

Fields类, 除了存放fields的list, 还有个用于快速field读取的index
index的生成, 很简单, 就是记录fields以及自然排序
使用时调用select, 给出需要哪几个fields的value, 以及tuple
从index读出fields的index值, 直接从tuple中读出对应index的value (当然生成tuple的时候, 也必须安装fields的顺序生成)

public class Fields implements Iterable
, Serializable { private List
_fields; private Map
_index = new HashMap
(); private void index() { for(int i=0; i<_fields.size(); i++) { _index.put(_fields.get(i), i); } } public List
select(Fields selector, List tuple) { List ret = new ArrayList(selector.size()); for(String s: selector) { ret.add(tuple.get(_index.get(s))); } return ret; }}

1.2 globle-groups

fields grouping, 但是field为空, 就代表globle grouping, 所有tuple都发送到一个task

默认选取第一个task

1.3 all-groups

发送到所有的tasks

1.4 shuffle-grouper

没有采用比较简单的直接用random取值的方式(区别于none-grouping)

因为考虑到load balance, 所以采用下面这种伪随机的实现方式

对target-tasks, 先随机shuffle, 打乱次序

在acquire-random-range-id, 会依次读所有的task, 这样保证, 虽然顺序是随机的, 但是每个task都会被选中一次
当curr越界时, 清空curr, 并从新shuffle target-tasks

(defn- mk-shuffle-grouper [^List target-tasks]  (let [choices (rotating-random-range target-tasks)]    (fn [task-id tuple]      (acquire-random-range-id choices))))
(defn rotating-random-range [choices]  (let [rand (Random.)        choices (ArrayList. choices)]    (Collections/shuffle choices rand)    [(MutableInt. -1) choices rand]))(defn acquire-random-range-id [[^MutableInt curr ^List state ^Random rand]]  (when (>= (.increment curr) (.size state))    (.set curr 0)    (Collections/shuffle state rand))  (.get state (.get curr)))

1.5 local-or-shuffle

local tasks优先选取, 并采用shuffle的方式 

1.6 none-grouping

不care grouping的方式, 现在的实现就是简单的random 

1.7 customing-grouping

可以自定义CustomStreamGrouping, 关键就是定义chooseTasks逻辑, 来实现自己的tasks choose策略

(defn- mk-custom-grouper [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String component-id ^String stream-id target-tasks]  (.prepare grouping context (GlobalStreamId. component-id stream-id) target-tasks)  (fn [task-id ^List values]    (.chooseTasks grouping task-id values)    ))
public interface CustomStreamGrouping extends Serializable {      /**     * Tells the stream grouping at runtime the tasks in the target bolt.     * This information should be used in chooseTasks to determine the target tasks.     *      * It also tells the grouping the metadata on the stream this grouping will be used on.     */   void prepare(WorkerTopologyContext context, GlobalStreamId stream, List
targetTasks); /** * This function implements a custom stream grouping. It takes in as input * the number of tasks in the target bolt in prepare and returns the * tasks to send the tuples to. * * @param values the values to group on */ List
chooseTasks(int taskId, List
values); }

:custom-object 和:custom-serialized 的不同仅仅是, thrift-grouping是否被序列化过

没有就可以直接读出object, 否则需要反序列成object

1.8 direct-grouping

producer of the tuple decides which task of the consumer will receive this tuple.

Direct groupings can only be declared on streams that have been declared as direct streams.

这里直接返回:direct, 因为direct-grouping, 发送到哪个tasks, 是由producer产生tuple的时候已经决定了, 所以这里不需要做任何grouping相关工作 

 

2 stream->component->grouper

outbound-components

一个executor只会对应于一个component, 所以给出当前executor的component-id
getTargets, 可以得出所有outbound components, [streamid, [target-componentid, grouping]]

调用outbound-groupings,

最终返回[streamid [component grouper]]的hashmap, 并赋值给executor-data中的stream->component->grouper

task在最终发送message的时候, 就会通过stream->component->grouper来产生真正的target tasks list

(defn outbound-components  "Returns map of stream id to component id to grouper"  [^WorkerTopologyContext worker-context component-id]  (->> (.getTargets worker-context component-id) ;;[streamid, [target-componentid, grouping]]        clojurify-structure        (map (fn [[stream-id component->grouping]]               [stream-id                (outbound-groupings                  worker-context                  component-id                  stream-id                  (.getComponentOutputFields worker-context component-id stream-id)                  component->grouping)]))         (into {})         (HashMap.)))

 

outbound-groupings

对每个task不为空的target component调用mk-grouper
mk-grouper返回的是grouper fn, 所以, 最终的返回, [component, grouper]

(defn- outbound-groupings [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping]  (->> component->grouping       (filter-key #(-> worker-context  ;;component对应的tasks不为0                        (.getComponentTasks %)                        count                        pos?))       (map (fn [[component tgrouping]]               [component                (mk-grouper worker-context                            this-component-id                            stream-id                            out-fields                            tgrouping                            (.getComponentTasks worker-context component)                            )]))       (into {})       (HashMap.)))

转载于:https://www.cnblogs.com/fxjwind/p/3216704.html

你可能感兴趣的文章
Python IO模型
查看>>
Ugly Windows
查看>>
DataGridView的行的字体颜色变化
查看>>
java.nio异步线程安全的IO
查看>>
(网上摘抄)云标签
查看>>
记录-时间日期
查看>>
便签:
查看>>
JS function 函数基本定义方法
查看>>
Java再学习——关于ConcurrentHashMap
查看>>
bzoj3944 Sum
查看>>
后缀表达式/逆波兰表达式
查看>>
标准模板库中的优先队列(priority_queue)
查看>>
如何处理Win10电脑黑屏后出现代码0xc0000225的错误?
查看>>
局域网内手机访问电脑网站注意几点
查看>>
IT项目经验和难点分享
查看>>
那些黑刘翔的人,你们的良心被狗吃了
查看>>
TreeMap和TreeSet在排序时如何比较元素?Collections工具类中的sort()方法如何比较元素?...
查看>>
Redis系列--内存淘汰机制(含单机版内存优化建议)
查看>>
最小二乘法
查看>>
iptables端口转发
查看>>