1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """C{agg [-g GROUPING_FUNCTION] INITIAL_VALUE AGGREGATOR}
19
20 C{agg [-c GROUPING_FUNCTION] INITIAL_VALUE AGGREGATOR}
21
22 Combines inputs into a smaller number of outputs. For example, if
23 the input is a stream of numbers, then C{agg} can be used to generate
24 output comprising the sum of the inputs.
25
26 If C{GROUPING_FUNCTION} is omitted, then one output object is
27 generated by initializing an accumulator to C{INITIAL_VALUE} and then
28 combining the accumulator with input objects using C{AGGREGATOR}. The
29 inputs to C{AGGREGATOR} are the current values of the accumulator
30 followed by the members of the input object.
31
32 B{Example}: If the input objects are integers C{1, 2, 3}, then the sum of
33 the integers is computed by::
34
35 agg 0 'sum, x: sum + x'
36
37 which yields C{6}.
38
39 B{Example}: The sum and sum of squares of a sequence of input number can be computed
40 by::
41
42 agg '(0, 0)' 'sum, sum_squares, input: (sum + input, sum + input ** 2)'
43
44 In this case, the accumulator is a tuple containing two integers, the
45 sum and the sum of squares. The initial value of this accumulator is
46 C{(0, 0)}. Each input consists of a single number. The aggregator
47 function takes as input the two parts of the accumulator followed by
48 the next number of the input, and outputs a tuple containing the
49 updated accumulator value.
50
51 If C{GROUPING_FUNCTION} is specified, then a set of accumulators is
52 maintained, one for each value of C{GROUPING_FUNCTION}. Each output
53 value consists of the group value followed by the accumulated values
54 for the group, ("values" not "value" because the accumulator may be a
55 sequence.)
56
57 Example: If the input objects are C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then
58 the sum of ints for each string is computed by::
59
60 agg -g 'label, number: label' 0 'sum, label, number: sum + number'
61
62 which yields C{('a', 3), ('b', 7)}.
63
64 If C{GROUPING_FUNCTION} is specified with the C{-g} flag, then C{agg}
65 generates its output when the input stream has ended. (It can't
66 generate output earlier, because group members map appear in any
67 order.) In some situations however, group members appear
68 consecutively, and it is useful to get output earlier. If group
69 members are known to be consecutive, then C{GROUPING_FUNCTION} can be
70 specified using the C{-c} flag.
71 """
72
73 import osh.core
74 import osh.args
75
76 _wrap_if_necessary = osh.core.wrap_if_necessary
77 Option = osh.args.Option
78
79
82
83
84 -def agg(initial_value, aggregator, group = None, consecutive = None):
85 """Combine inputs into a smaller number of outputs. If neither C{group} nor
86 C{consecutive} is specified, then there is one accumulator, initialized to
87 C{initial_value}. The C{aggregator} function is used to combine the current value
88 of the accumulator with the input to yield the next value of the accumulator.
89 The arguments to C{aggregator} are the elements of the accumulator followed
90 by the elements of one piece of input.
91 If C{group} is specified, then there is one accumulator for each group value, defined
92 by applying the function C{group} to each input. C{consecutive} is just like C{group}
93 except that it is assumed that group values are adjacent in the input sequence.
94 At most one of C{group} and C{consecutive} may be specified.
95 """
96 args = [initial_value, aggregator]
97 if group:
98 args.append(Option('-g', group))
99 if consecutive:
100 args.append(Option('-c', consecutive))
101 return _Agg().process_args(*args)
102
103 -class _Agg(osh.core.Op):
104
105 _aggregate = None
106
107
108
109
112
113
114
115
118
120 args = self.args()
121 grouping_function = args.function_arg('-g')
122 consecutive_grouping_function = args.function_arg('-c')
123 initial_value = _wrap_if_necessary(args.next_eval())
124 aggregation_function = args.next_function()
125 if grouping_function and consecutive_grouping_function:
126 self.usage()
127 if initial_value is None or aggregation_function is None:
128 self.usage()
129 if args.has_next():
130 self.usage()
131 if grouping_function and consecutive_grouping_function:
132 self.usage()
133 elif grouping_function:
134 self._aggregate = _GroupingAggregate(
135 self,
136 grouping_function,
137 initial_value,
138 aggregation_function)
139 elif consecutive_grouping_function:
140 self._aggregate = _ConsecutiveGroupingAggregate(
141 self,
142 consecutive_grouping_function,
143 initial_value,
144 aggregation_function)
145 else:
146 self._aggregate = _SimpleAggregate(
147 self,
148 initial_value,
149 aggregation_function)
150
151
153 self._aggregate.receive(object)
154
156 self._aggregate.receive_complete()
157
158
160 _command = None
161 _group_function = None
162 _initial_value = None
163 _aggregate_function = None
164 _sum = None
165
166 - def __init__(self, command, group_function, initial_value, aggregate_function):
167 self._command = command
168 self._group_function = group_function
169 self._initial_value = initial_value
170 self._aggregate_function = aggregate_function
171 self._sum = {}
172
174 group = self._group_function(*object)
175 sum = self._sum.get(group, self._initial_value)
176 self._sum[group] = _wrap_if_necessary(self._aggregate_function(*(tuple(sum) + tuple(object))))
177
179 for group, sum in self._sum.iteritems():
180 self._command.send(_wrap_if_necessary(group) + tuple(sum))
181 self._command.send_complete()
182
184 _command = None
185 _group_function = None
186 _initial_value = None
187 _aggregate_function = None
188 _group = None
189 _sum = None
190
191 - def __init__(self, command, group_function, initial_value, aggregate_function):
192 self._command = command
193 self._group_function = group_function
194 self._initial_value = initial_value
195 self._aggregate_function = aggregate_function
196 self._group = None
197 self._sum = None
198
200 newGroup = self._group_function(*object)
201 if self._group is None or self._group != newGroup:
202 if self._group is not None:
203 self._command.send(_wrap_if_necessary(self._group) + tuple(self._sum))
204 self._group = newGroup
205 self._sum = self._initial_value
206 self._sum = _wrap_if_necessary(self._aggregate_function(*(tuple(self._sum) + tuple(object))))
207
209 if self._group is not None:
210 self._command.send(_wrap_if_necessary(self._group) + tuple(self._sum))
211 self._command.send_complete()
212
214 _command = None
215 _initial_value = None
216 _aggregate_function = None
217 _sum = None
218
219 - def __init__(self, command, initial_value, aggregate_function):
220 self._command = command
221 self._initial_value = initial_value
222 self._aggregate_function = aggregate_function
223 self._sum = initial_value
224
226 self._sum = _wrap_if_necessary(self._aggregate_function(*(tuple(self._sum) + tuple(object))))
227
229 self._command.send(self._sum)
230 self._command.send_complete()
231