Distinct not Working
Reported by mfryde | March 27th, 2009 @ 11:13 AM
Adding the 2 lines :
group_by [ "event_type" , "message_type", "sender_id"
]
distinct [ "event_type" , "message_type", "sender_id"
]
in the code make the error bellow and return empty result
the code
require 'rubygems'
require 'cascading'
require 'cascading-ext/jdbc'
require 'utils'
require 'common'
require 'date'
input = "/home/brm/brm-current/brm-app-meo/brm-meo.log"
output = "/home/brm/brm-current/brm-app-meo/result.mfr"
def timestamp_to_date
clean_timestamp
format_date "timestamp2", :pattern=>"yyyy/MM/dd", :into=>"day"
end
def get_json
json_extraction = {
"data:Action" => "action",
"data:Agent2:id" => "agent2",
"data:Buyer" => "buyer",
"data:MessageType" => "message_type",
"data:Recipients" => "recipients",
"data:Resource:id" => "location_id",
"data:Resource:type" => "location_type",
"data:ResourceAttributes:Experience" => "xp_gained",
"data:Sender:id" => "sender_id",
"data:Stepid" => "stepid",
"data:TransactionType" => "transaction_type",
"data:User:id" => "user_id",
"data:UseType" => "use_type",
"metaData:eventType" => "event_type",
"metaData:timestamp" => "timestamp",
"data:ResourceAttributes:Amount" => "xp",
"data:Status" => "Status" }
each "json", :function => json_splitter(json_extraction), :output => all_fields
timestamp_to_date
end
flow = Cascading::Flow.new "olap" do
source tap(input)
sink tap(output , :replace => true)
#sink "count", jdbc_tap(connection_url, connection_props)
assembly "start" do
split "line",
:into => ["timestamp", "queuename", "loglevel", "json"],
:pattern=>/\|/,
:output=>"json"
get_json
restrict_to [ "event_type" , "message_type", "sender_id" ]
group_by [ "event_type" , "message_type", "sender_id" ]
distinct [ "event_type" , "message_type", "sender_id" ]
end
the error
./bin/run_jobs 05-test
09/03/27 12:11:48 INFO flow.MultiMapReducePlanner: using application jar: /usr/local/share/cascading/cascading-1.0.3.jar
09/03/27 12:11:48 INFO flow.Flow: [start] starting
09/03/27 12:11:48 INFO flow.Flow: [start] source: Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/brm-meo.log"]"]
09/03/27 12:11:48 INFO flow.Flow: [start] sink: Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/result.mfr"]"]
09/03/27 12:11:48 INFO flow.Flow: [start] parallel execution is enabled: false
09/03/27 12:11:48 INFO flow.Flow: [start] starting jobs: 1
09/03/27 12:11:48 INFO flow.Flow: [start] allocating threads: 1
09/03/27 12:11:48 INFO flow.FlowStep: [start] starting step: (1/1) Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/result.mfr"]"]
09/03/27 12:11:48 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
09/03/27 12:11:48 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
09/03/27 12:11:49 INFO mapred.FileInputFormat: Total input paths to process : 1
09/03/27 12:11:49 INFO mapred.FileInputFormat: Total input paths to process : 1
09/03/27 12:11:49 INFO mapred.MapTask: numReduceTasks: 1
09/03/27 12:11:49 INFO mapred.MapTask: io.sort.mb = 100
09/03/27 12:11:49 INFO mapred.MapTask: data buffer = 79691776/99614720
09/03/27 12:11:49 INFO mapred.MapTask: record buffer = 262144/327680
09/03/27 12:11:55 INFO mapred.LocalJobRunner: file:/home/brm/brm-current/brm-app-meo/brm-meo.log:0+33554432
09/03/27 12:11:57 INFO mapred.MapTask: Starting flush of map output
09/03/27 12:11:58 WARN mapred.LocalJobRunner: job_local_0001
cascading.CascadingException: unable to compare Tuples, likely a CoGroup is being attempted on fields of different types
at cascading.tuple.hadoop.DeserializerComparator.compare(DeserializerComparator.java:108)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:656)
at org.apache.hadoop.util.QuickSort.fix(QuickSort.java:30)
at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:84)
at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:59)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:954)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:842)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:138)
Caused by: java.lang.ClassCastException: cascading.utils.json.JSONWritable cannot be cast to net.sf.json.JSON
at cascading.utils.json.JSONWritable.compareTo(Unknown Source)
at cascading.tuple.Tuple.compareTo(Tuple.java:657)
at cascading.tuple.TuplePair.compareTo(TuplePair.java:142)
at cascading.tuple.hadoop.TuplePairComparator.compare(TuplePairComparator.java:38)
at cascading.tuple.hadoop.TuplePairComparator.compare(TuplePairComparator.java:29)
at cascading.tuple.hadoop.DeserializerComparator.compare(DeserializerComparator.java:104)
... 8 more
09/03/27 12:11:58 INFO mapred.LocalJobRunner: file:/home/brm/brm-current/brm-app-meo/brm-meo.log:0+33554432
09/03/27 12:11:59 WARN flow.FlowStep: [start] completion events count: 0
09/03/27 12:11:59 WARN flow.Flow: stopping jobs
09/03/27 12:11:59 INFO flow.FlowStep: [start] stopping: (1/1) Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/result.mfr"]"]
09/03/27 12:11:59 WARN flow.Flow: stopped jobs
09/03/27 12:11:59 WARN flow.Flow: shutting down job executor
09/03/27 12:11:59 WARN flow.Flow: shutdown complete
cascading/flow/FlowStep.java:458:in `call': cascading.flow.FlowException: step failed: (1/1) Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/result.mfr"]"] (NativeException)
from cascading/flow/FlowStep.java:389:in `call'
from java/util/concurrent/FutureTask.java:303:in `innerRun'
from java/util/concurrent/FutureTask.java:138:in `run'
from java/util/concurrent/ThreadPoolExecutor.java:885:in `runTask'
from java/util/concurrent/ThreadPoolExecutor.java:907:in `run'
from java/lang/Thread.java:619:in `run'
from ./bin/run_jobs:21
from ./bin/run_jobs:20:in `each'
from ./bin/run_jobs:20
Complete Java stackTrace
cascading.flow.FlowException: step failed: (1/1) Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/result.mfr"]"]
at cascading.flow.FlowStep$FlowStepJob.call(FlowStep.java:458)
at cascading.flow.FlowStep$FlowStepJob.call(FlowStep.java:389)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:619)
Comments and changes to this ticket
-

gmarabout (at gmail) March 27th, 2009 @ 02:44 PM
- Assigned user set to gmarabout (at gmail)
-

gmarabout (at gmail) March 27th, 2009 @ 02:56 PM
- State changed from new to invalid
This problem is a bug in JSONWritable which is a class of project cascading.utils.
-

gmarabout (at gmail) March 27th, 2009 @ 02:59 PM
- State changed from invalid to open
-

gmarabout (at gmail) March 27th, 2009 @ 03:11 PM
- State changed from open to resolved
Fixed in cascading.utils
Please Sign in or create a free account to add a new ticket.
With your very own profile, you can contribute to projects, track your activity, watch tickets, receive and update tickets through your email and much more.
Create your profile
Help contribute to this project by taking a few moments to create your personal profile. Create your profile ยป