#6 ✓resolved
mfryde

Distinct not Working

Reported by mfryde | March 27th, 2009 @ 11:13 AM

Adding the 2 lines :
group_by [ "event_type" , "message_type", "sender_id" ]
distinct [ "event_type" , "message_type", "sender_id" ]
in the code make the error bellow and return empty result

the code


require 'rubygems'
require 'cascading'
require 'cascading-ext/jdbc'
require 'utils'
require 'common'
require 'date'

input = "/home/brm/brm-current/brm-app-meo/brm-meo.log"
output = "/home/brm/brm-current/brm-app-meo/result.mfr"


def timestamp_to_date
  	clean_timestamp
    format_date "timestamp2", :pattern=>"yyyy/MM/dd", :into=>"day"
end


def get_json
	json_extraction = {
	  "data:Action" => "action",
	  "data:Agent2:id" => "agent2",
	  "data:Buyer" => "buyer",
	  "data:MessageType" => "message_type",
	  "data:Recipients" => "recipients",
	  "data:Resource:id" => "location_id",
	  "data:Resource:type" => "location_type",
	  "data:ResourceAttributes:Experience" => "xp_gained",
	  "data:Sender:id" => "sender_id",
	  "data:Stepid" => "stepid",
	  "data:TransactionType" => "transaction_type",
	  "data:User:id" => "user_id",
	  "data:UseType" => "use_type",
	  "metaData:eventType" => "event_type",
	  "metaData:timestamp" => "timestamp",
	  "data:ResourceAttributes:Amount" => "xp",
	  "data:Status" => "Status" }

	each "json", :function => json_splitter(json_extraction), :output => all_fields

	timestamp_to_date
end

 
flow = Cascading::Flow.new "olap" do


  source tap(input)
  sink tap(output , :replace => true)
  #sink "count", jdbc_tap(connection_url, connection_props)
  
  assembly "start" do
  
    split "line", 
		:into => ["timestamp", "queuename", "loglevel", "json"], 
		:pattern=>/\|/, 
		:output=>"json"
  
	get_json

	restrict_to [ "event_type" , "message_type", "sender_id" ]
	group_by [ "event_type" , "message_type", "sender_id" ]
	distinct [ "event_type" , "message_type", "sender_id" ]
	
   end

the error


 ./bin/run_jobs 05-test
09/03/27 12:11:48 INFO flow.MultiMapReducePlanner: using application jar: /usr/local/share/cascading/cascading-1.0.3.jar
09/03/27 12:11:48 INFO flow.Flow: [start] starting
09/03/27 12:11:48 INFO flow.Flow: [start]  source: Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/brm-meo.log"]"]
09/03/27 12:11:48 INFO flow.Flow: [start]  sink: Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/result.mfr"]"]
09/03/27 12:11:48 INFO flow.Flow: [start]  parallel execution is enabled: false
09/03/27 12:11:48 INFO flow.Flow: [start]  starting jobs: 1
09/03/27 12:11:48 INFO flow.Flow: [start]  allocating threads: 1
09/03/27 12:11:48 INFO flow.FlowStep: [start] starting step: (1/1) Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/result.mfr"]"]
09/03/27 12:11:48 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
09/03/27 12:11:48 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
09/03/27 12:11:49 INFO mapred.FileInputFormat: Total input paths to process : 1
09/03/27 12:11:49 INFO mapred.FileInputFormat: Total input paths to process : 1
09/03/27 12:11:49 INFO mapred.MapTask: numReduceTasks: 1
09/03/27 12:11:49 INFO mapred.MapTask: io.sort.mb = 100
09/03/27 12:11:49 INFO mapred.MapTask: data buffer = 79691776/99614720
09/03/27 12:11:49 INFO mapred.MapTask: record buffer = 262144/327680
09/03/27 12:11:55 INFO mapred.LocalJobRunner: file:/home/brm/brm-current/brm-app-meo/brm-meo.log:0+33554432
09/03/27 12:11:57 INFO mapred.MapTask: Starting flush of map output
09/03/27 12:11:58 WARN mapred.LocalJobRunner: job_local_0001
cascading.CascadingException: unable to compare Tuples, likely a CoGroup is being attempted on fields of different types
        at cascading.tuple.hadoop.DeserializerComparator.compare(DeserializerComparator.java:108)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:656)
        at org.apache.hadoop.util.QuickSort.fix(QuickSort.java:30)
        at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:84)
        at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:59)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:954)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:842)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:138)
Caused by: java.lang.ClassCastException: cascading.utils.json.JSONWritable cannot be cast to net.sf.json.JSON
        at cascading.utils.json.JSONWritable.compareTo(Unknown Source)
        at cascading.tuple.Tuple.compareTo(Tuple.java:657)
        at cascading.tuple.TuplePair.compareTo(TuplePair.java:142)
        at cascading.tuple.hadoop.TuplePairComparator.compare(TuplePairComparator.java:38)
        at cascading.tuple.hadoop.TuplePairComparator.compare(TuplePairComparator.java:29)
        at cascading.tuple.hadoop.DeserializerComparator.compare(DeserializerComparator.java:104)
        ... 8 more
09/03/27 12:11:58 INFO mapred.LocalJobRunner: file:/home/brm/brm-current/brm-app-meo/brm-meo.log:0+33554432
09/03/27 12:11:59 WARN flow.FlowStep: [start] completion events count: 0
09/03/27 12:11:59 WARN flow.Flow: stopping jobs
09/03/27 12:11:59 INFO flow.FlowStep: [start] stopping: (1/1) Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/result.mfr"]"]
09/03/27 12:11:59 WARN flow.Flow: stopped jobs
09/03/27 12:11:59 WARN flow.Flow: shutting down job executor
09/03/27 12:11:59 WARN flow.Flow: shutdown complete
cascading/flow/FlowStep.java:458:in `call': cascading.flow.FlowException: step failed: (1/1) Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/result.mfr"]"] (NativeException)
        from cascading/flow/FlowStep.java:389:in `call'
        from java/util/concurrent/FutureTask.java:303:in `innerRun'
        from java/util/concurrent/FutureTask.java:138:in `run'
        from java/util/concurrent/ThreadPoolExecutor.java:885:in `runTask'
        from java/util/concurrent/ThreadPoolExecutor.java:907:in `run'
        from java/lang/Thread.java:619:in `run'
        from ./bin/run_jobs:21
        from ./bin/run_jobs:20:in `each'
        from ./bin/run_jobs:20
Complete Java stackTrace
cascading.flow.FlowException: step failed: (1/1) Hfs["TextLine[['line']->[ALL]]"]["/home/brm/brm-current/brm-app-meo/result.mfr"]"]
        at cascading.flow.FlowStep$FlowStepJob.call(FlowStep.java:458)
        at cascading.flow.FlowStep$FlowStepJob.call(FlowStep.java:389)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
        at java.lang.Thread.run(Thread.java:619)

Comments and changes to this ticket

Please Sign in or create a free account to add a new ticket.

With your very own profile, you can contribute to projects, track your activity, watch tickets, receive and update tickets through your email and much more.

New-ticket Create new ticket

Create your profile

Help contribute to this project by taking a few moments to create your personal profile. Create your profile ยป

People watching this ticket

Pages