Min on date not working
Reported by mfryde | March 27th, 2009 @ 10:07 AM
When I try to do a Min on a date field I have an error.
the code
require 'rubygems'
require 'cascading'
require 'cascading-ext/jdbc'
require 'utils'
require 'common'
require 'date'
playersource = "/home/brm/brm-current/meo_playerProfile_small.csv"
# playersource = "/home/brm/brm-current/meo_playerProfile_20090323_OVH.csv"
horsesource = "/home/brm/brm-current/meo_horse_20090323_OVH.csv"
#cardsource = "/home/brm/brm-current/meo_card_20090323_OVH.csv"
cardsource = "/home/brm/brm-current/meo_card_small.csv"
output = "/home/brm/brm-current/brm-app-meo/player.dim"
flow = Cascading::Flow.new "olap" do
source tap(cardsource)
sink tap(output , :replace => true)
assembly "main" do
split "line",
:into => ["code","transactionId","playerProfile_id","valid","country","lot","subType","premium","creationTime","registrationTime","useTime","sourceCode","multiple","original_server"],
:pattern=>/\t/,
:output=> ["playerProfile_id","useTime"]
filter "playerProfile_id.equals(\"NULL\")"
group_by "playerProfile_id"
# Min on Date
every "useTime", :aggregator => Java::CascadingOperationAggregator::Min.new(Cascading.fields("first_transaction_date"))
end
end
the stack
brm@wormee1:~/brm-current/brm-app-meo$ ./bin/run_jobs 05-test
09/03/27 11:05:17 INFO flow.MultiMapReducePlanner: using application jar: /usr/local/share/cascading/cascading-1.0.3.jar
09/03/27 11:05:17 INFO flow.Flow: [main] starting
09/03/27 11:05:17 INFO flow.Flow: [main] source: Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/meo_card_small.csv"]"]
09/03/27 11:05:17 INFO flow.Flow: [main] sink: Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/player.dim"]"]
09/03/27 11:05:18 INFO flow.Flow: [main] parallel execution is enabled: false
09/03/27 11:05:18 INFO flow.Flow: [main] starting jobs: 1
09/03/27 11:05:18 INFO flow.Flow: [main] allocating threads: 1
09/03/27 11:05:18 INFO flow.FlowStep: [main] starting step: (1/1) Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/player.dim"]"]
09/03/27 11:05:18 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
09/03/27 11:05:18 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
09/03/27 11:05:18 INFO mapred.FileInputFormat: Total input paths to process : 1
09/03/27 11:05:18 INFO mapred.FileInputFormat: Total input paths to process : 1
09/03/27 11:05:18 INFO mapred.MapTask: numReduceTasks: 1
09/03/27 11:05:18 INFO mapred.MapTask: io.sort.mb = 100
09/03/27 11:05:18 INFO mapred.MapTask: data buffer = 79691776/99614720
09/03/27 11:05:18 INFO mapred.MapTask: record buffer = 262144/327680
09/03/27 11:05:18 INFO mapred.MapTask: Starting flush of map output
09/03/27 11:05:18 INFO mapred.MapTask: Finished spill 0
09/03/27 11:05:18 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
09/03/27 11:05:18 INFO mapred.LocalJobRunner: file:/home/brm/brm-current/meo_card_small.csv:0+64805
09/03/27 11:05:18 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
09/03/27 11:05:18 INFO mapred.LocalJobRunner:
09/03/27 11:05:18 INFO mapred.Merger: Merging 1 sorted segments
09/03/27 11:05:18 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1094 bytes
09/03/27 11:05:18 INFO mapred.LocalJobRunner:
09/03/27 11:05:18 WARN mapred.LocalJobRunner: job_local_0001
cascading.pipe.OperatorException: [main][sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)] operator Every failed executing aggregator: Min[decl:'first_transaction_date'][args:1]
at cascading.pipe.Every$EveryAggregatorHandler.operate(Every.java:403)
at cascading.flow.stack.EveryAllAggregatorReducerStackElement.operateEveryHandlers(EveryAllAggregatorReducerStackElement.java:89)
at cascading.flow.stack.EveryAllAggregatorReducerStackElement.collect(EveryAllAggregatorReducerStackElement.java:64)
at cascading.flow.stack.GroupReducerStackElement.operateGroup(GroupReducerStackElement.java:76)
at cascading.flow.stack.GroupReducerStackElement.collect(GroupReducerStackElement.java:60)
at cascading.flow.stack.FlowReducerStack.reduce(FlowReducerStack.java:152)
at cascading.flow.FlowReducer.reduce(FlowReducer.java:77)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:436)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:170)
Caused by: java.lang.NumberFormatException: For input string: "2008-10-31 18:44:34"
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1224)
at java.lang.Double.parseDouble(Double.java:510)
at cascading.tuple.Tuple.getDouble(Tuple.java:262)
at cascading.tuple.TupleEntry.getDouble(TupleEntry.java:286)
at cascading.operation.aggregator.ExtremaBase.aggregate(ExtremaBase.java:111)
at cascading.pipe.Every$EveryAggregatorHandler.operate(Every.java:399)
... 8 more
09/03/27 11:05:23 WARN flow.FlowStep: [main] completion events count: 0
09/03/27 11:05:23 WARN flow.Flow: stopping jobs
09/03/27 11:05:23 INFO flow.FlowStep: [main] stopping: (1/1) Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/player.dim"]"]
09/03/27 11:05:23 WARN flow.Flow: stopped jobs
09/03/27 11:05:23 WARN flow.Flow: shutting down job executor
09/03/27 11:05:23 WARN flow.Flow: shutdown complete
cascading/flow/FlowStep.java:458:in `call': cascading.flow.FlowException: step failed: (1/1) Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/player.dim"]"] (NativeException)
from cascading/flow/FlowStep.java:389:in `call'
from java/util/concurrent/FutureTask.java:303:in `innerRun'
from java/util/concurrent/FutureTask.java:138:in `run'
from java/util/concurrent/ThreadPoolExecutor.java:885:in `runTask'
from java/util/concurrent/ThreadPoolExecutor.java:907:in `run'
from java/lang/Thread.java:619:in `run'
from ./bin/run_jobs:21
from ./bin/run_jobs:20:in `each'
from ./bin/run_jobs:20
Complete Java stackTrace
cascading.flow.FlowException: step failed: (1/1) Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/player.dim"]"]
at cascading.flow.FlowStep$FlowStepJob.call(FlowStep.java:458)
at cascading.flow.FlowStep$FlowStepJob.call(FlowStep.java:389)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:619)
Comments and changes to this ticket
-

gmarabout (at gmail) March 27th, 2009 @ 02:38 PM
- State changed from new to invalid
That is 'expected': you must parse the date (which is currently a string) before computing a min/max/average on it...
Please Sign in or create a free account to add a new ticket.
With your very own profile, you can contribute to projects, track your activity, watch tickets, receive and update tickets through your email and much more.
Create your profile
Help contribute to this project by taking a few moments to create your personal profile. Create your profile ยป