Clayton Clayton - 11 months ago 63
Java Question

Run a simple Cascading application in local mode

I'm new to Cascading/Hadoop and am trying to run a simple example in local mode (i.e. in memory). The example just copies a file:

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.pipe.Pipe;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;

public class CascadingTest {

public static void main(String[] args) {
Properties properties = new Properties();

AppProps.setApplicationJarClass( properties, CascadingTest.class );
FlowConnector flowConnector = new LocalFlowConnector();

// create the source tap
Tap inTap = new Hfs( new TextLine(), "D:\\git_workspace\\Impatient\\part1\\data\\rain.txt" );

// create the sink tap
Tap outTap = new Hfs( new TextLine(), "D:\\git_workspace\\Impatient\\part1\\data\\out.txt" );

// specify a pipe to connect the taps
Pipe copyPipe = new Pipe( "copy" );

// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
.addSource( copyPipe, inTap )
.addTailSink( copyPipe, outTap );

// run the flow
Flow flow = flowConnector.connect( flowDef );

Here is the error I'm getting:

09-25-12 11:30:38,114 INFO - AppProps - using 9C82C76AC667FDAA2F6969A0DF3949C6
Exception in thread "main" cascading.flow.planner.PlannerException: could not build flow from assembly: [java.util.Properties cannot be cast to org.apache.hadoop.mapred.JobConf]
at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(
at cascading.flow.local.planner.LocalPlanner.buildFlow(
at cascading.flow.FlowConnector.connect(
at com.x.y.CascadingTest.main(
Caused by: java.lang.ClassCastException: java.util.Properties cannot be cast to org.apache.hadoop.mapred.JobConf
at cascading.tap.hadoop.Hfs.sourceConfInit(
at cascading.flow.local.LocalFlowStep.initTaps(
at cascading.flow.local.LocalFlowStep.getInitializedConfig(
at cascading.flow.local.LocalFlowStep.createFlowStepJob(
at cascading.flow.local.LocalFlowStep.createFlowStepJob(
at cascading.flow.planner.BaseFlowStep.getFlowStepJob(
at cascading.flow.BaseFlow.initializeNewJobsMap(
at cascading.flow.BaseFlow.initialize(
at cascading.flow.local.planner.LocalPlanner.buildFlow(
... 2 more

Answer Source

Just to provide a bit more detail: You can't mix local and hadoop classes in Cascading, as they assume different and incompatible environments. What's happening in your case is that you're trying to create a local flow with hadoop taps, the latter expecting a hadoop JobConf instead of the Properties object used to configure local taps.

Your code will work if you use cascading.tap.local.FileTap instead of cascading.tap.hadoop.Hfs.