Configuring MR3

The behavior of MR3 is specified by the configuration file mr3-site.xml in the classpath. Below we describe all configuration keys for MR3 which are divided into 9 sections:

  • MR3Runtime: configuration keys relevant to all components (MR3Client, DAGAppMaster, ContainerWorker)
  • MR3Client: configuration keys that are consumed or set by MR3Client
  • DAGAppMaster: configuration keys that are consumed or set by DAGAppMaster
  • ContainerGroup: configuration keys that specify properties of ContainerGroups
  • ContainerWorker: configuration keys for ContainerWorkers
  • TokenRenewer: configuration keys related to Kerberos and token renewal
  • HistoryLogger: configuration keys for history logging
  • tez.common.counters.Limits: configuration keys for Tez counters (which MR3 borrows from Tez)
  • Kubernetes: configuration keys for running MR3 on Kubernetes

MR3Runtime

Name Default value Description
mr3.runtime tez tez: use Tez 0.9.1 runtime.
asm: use ASM runtime.
mr3.master.mode yarn local-thread: DAGAppMaster starts as a new thread inside MR3Client.
local-process: DAGAppMaster starts as a new process on the same machine where MR3Client is running.
yarn: DAGAppMaster starts as a new container in the Hadoop cluster.
kubernetes: DAGAppMaster starts as a pod in the Kubernetes cluster.
mr3.am.acls.enabled true true: enable ACLs for DAGAppMaster and DAGs.
false: disable ACLS for DAGAppMaster and DAGs.
mr3.cluster.additional.classpath   Additional classpath for DAGAppMaster and ContainerWorkers
mr3.cluster.use.hadoop-libs false true: include the classpath defined in YarnConfiguration.YARN_APPLICATION_CLASSPATH.
false: do not include the classpath defined in YarnConfiguration.YARN_APPLICATION_CLASSPATH.
mr3.am.max.java.heap.fraction 0.8 Fraction of memory to be allocated for Java heap in DAGAppMaster
mr3.container.max.java.heap.fraction 0.8 Fraction of memory to be allocated for Java heap in ContainerWorkers
mr3.async.logging true true: use asynchronous logging.
false: use synchronous logging.

MR3Client

Name Default value Description
mr3.lib.uris ${liburis} URIs for the MR3 library jar files
mr3.aux.uris ${auxuris} URIs for the MR3 auxiliary jar files
mr3.queue.name   Name of the Yarn queue to which the MR3 job is submitted
mr3.credentials.path   Path to the credentials for MR3
mr3.am.staging-dir /tmp/${user.name}/staging Staging directory for DAGAppMaster
mr3.am.resource.memory.mb 5120 Size of memory in MB for DAGAppMaster
mr3.am.resource.cpu.cores 1 Number of cores for DAGAppMaster
mr3.am.launch.cmd-opts -server -Djava.net.preferIPv4Stack=true
-Dhadoop.metrics.log.level=WARN
-XX:+PrintGCDetails -verbose:gc
-XX:+PrintGCTimeStamps
-XX:+UseNUMA
-XX:+UseParallelGC
Command-line options for launching DAGAppMaster
mr3.am.launch.env LD_LIBRARY_PATH=$LD_LIBRARY_PATH:
$HADOOP_HOME/lib/native/
Environment variables for launching DAGAppMaster
mr3.am.max.app.attempts 2 Max number of Yarn ApplicationAttempts for the MR3 job
mr3.am.log.level INFO Logging level for DAGAppMaster
mr3.am.local.working-dir “/tmp/${user.name}/mr3/working-dir” Local working directory for DAGAppMaster running in LocalThread or LocalProcess mode
mr3.am.local.log-dir “/tmp/${user.name}/mr3/log-dir” Logging directory for DAGAppMaster running in LocalThread or LocalProcess mode
mr3.cancel.delegation.tokens.on.completion true true: cancel delegation tokens when the MR3 job completes.
false: do not cancel delegation tokens.
mr3.dag.status.pollinterval.ms 1000 Time interval in milliseconds for retrieving the status of running DAGs
mr3.am.session.mode false true: create MR3 SessionClient.
false: create MR3 JobClient.
mr3.session.client.timeout.secs 120 Time in seconds for terminating MR3 SessionClient with a timeout

DAGAppMaster

Name Default value Description
mr3.am.worker.mode local local: ContainerWorkers start as threads inside DAGAppMaster.
yarn: ContainerWorkers start as containers in a Hadoop cluster.
kubernetes: ContainerWorkers start as Pods in a Kubernetes cluster
mr3.am.max.num.concurrent.dags 128 Max number of DAGs that can run concurrently in DAGAppMaster
mr3.am.shutdown.rightaway true true: DAGAppMaster does not wait until MR3Client retrieves the final states of all DAGs.
false: DAGAppMaster waits until MR3Client retrieves the final states of all DAGs.
mr3.am.shutdown.sleep.max.ms 5000 Time in milliseconds to wait until MR3Client retrieves the final states of all DAGs
mr3.am.min.cluster.resource.memory.mb 40960 Min size of memory in MB that DAGAppMaster assumes as the cluster resource when initializing Map Tasks
mr3.am.min.cluster.resource.cpu.cores 10 Min number of cores that DAGAppMaster assumes as the cluster resource when initializing Map Tasks
mr3.am.local.resourcescheduler.min.memory.mb 256 Min size of memory in MB to reserve for all local ContainerWorkers running in DAGAppMaster
mr3.am.local.resourcescheduler.max.memory.mb 4096 Max size of memory in MB to reserve for all local ContainerWorkers running in DAGAppMaster
mr3.am.local.resourcescheduler.max.cpu.cores 16 Max number of cores for all local ContainerWorkers running in DAGAppMaster
mr3.am.local.resourcescheduler.native.fraction 0.0d Fraction of memory to be allocated for native memory for all local ContainerWorkers running in DAGAppMaster
mr3.am.delete.local.working-dir true true: DAGAppMaster running in LocalThread or LocalProcess mode deletes its local working directory upon termination.
false: DAGAppMaster running in LocalThread or LocalProcess mode does not delete its local working directory upon termination.
mr3.am.taskcommunicator.type protobuf protobuf: use Protobuf for communication between TaskCommunicator and ContainerWorkers.
protowritable: use Protobuf + Writable for communication between TaskCommunicator and ContainerWorkers.
writable: use Writable for communication between TaskCommunicator and ContainerWorkers.
direct: use direct communication between TaskCommunicator and local ContainerWorkers.
mr3.am.taskcommunicator.thread.count 30 Number of threads in TaskCommunicator for serving requests from ContainerWorkers
mr3.am.resourcescheduler.max.requests.per.taskscheduler 10 Max number of containers that TaskScheduler can request to Yarn ResourceScheduler at once
mr3.am.rm.heartbeat.interval.ms 1000 Time interval in milliseconds of heartbeats in AMRMClientAsync
mr3.dag.priority.scheme fifo fifo: assign DAG priorities on FIFO basis.
concurrent: assign the same priority to all DAGs.
mr3.am.task.max.failed.attempts 3 Max number of TaskAttempts to create for Task
mr3.am.task.retry.on.fatal.error false true: retry even if TaskAttempts fail with fatal errors.
false: do not retry if TaskAttempts fail with fatal errors.
mr3.am.commit-all-outputs-on-dag-success true true: commit the output of all Vertexes when DAG completes successfully.
false: commit the output when Vertex completes successfully.
mr3.am.client.thread-count 8 Number of threads in DAGClientServer for serving requests from MR3Clients
mr3.heartbeat.timeout.ms 300000 Time in milliseconds for triggering heartbeat timeout for TaskAttempts and ContainerWorkers
mr3.task.heartbeat.timeout.check.ms 30000 Time interval in milliseconds for checking heartbeat timeout for TaskAttempts
mr3.container.heartbeat.timeout.check.ms 15000 Time interval in milliseconds for checking heartbeat timeout for ContainerWorkers
mr3.container.idle.timeout.ms 300000 Time in milliseconds for triggering timeout for idle ContainerWorkers
mr3.am.node-blacklisting.enabled false true: enable node blacklisting.
false: disable node blacklisting.
mr3.am.maxtaskfailure.percent 5 Percentage of TaskAttempt failures that triggers node blacklisting
mr3.am.max.safe.resource.percent.blacklisted 50 Max percentage of resource to be allocated to a node that is blacklisted
mr3.am.min.safe.resource.percent.blacklisted 10 Min percentage of resource to be allocated to a node that is blacklisted
mr3.dag.recovery.enabled true true: enable DAG recovery when DAGAppMaster restarts.
false: disable DAG recovery when DAGAppMaster restarts.
mr3.am.max.finished.reported.dags 10 Max number of DAGs whose final states are kept in DAGAppMaster after being reported to MR3Client
mr3.am.generate.dag.graph.viz false true: create DOT graph files showing the structure of DAGs on the working directory of DAGAppMaster.
false: do not create DOT graph files.
mr3.memory.usage.check.scheme none average: calculate the average memory usage of the current window.
maximum: calculate the maximum memory usage of the current window.
none: do not calculate memory usage.
mr3.memory.usage.check.window.length.secs 600 Window length in seconds for calculating memory usage
mr3.check.memory.usage.event.interval.secs 10 Time interval in seconds for generating events for calculating memory usage

ContainerGroup

Name Default value Description
mr3.container.stop.cross.dag.reuse true true: stop cross-DAG container reuse for the current ContainerGroup.
false: do not update the current ContainerGroup with regard to cross-DAG container reuse.
mr3.container.reuse false true: reuse ContainerWorkers in the current ContainerGroup.
false: use each ContainerWorker only for a single TaskAttempt.
mr3.container.resourcescheduler.type local local: create local ContainerWorkers in DAGAppMaster for the current ContainerGroup.
yarn: create Yarn ContainerWorkers for the current ContainerGroup.
mr3.container.combine.taskattempts false true: allow multiple TaskAttempts to run concurrently in a ContainerWorker.
false: allow only one TaskAttempt to run at a time in a ContainerWorker.
mr3.container.mix.taskattempts true true: allow TaskAttempts from different DAGs to run concurrently in a ContainerWorker.
false: use each ContainerWorker for a single DAG exclusively.
mr3.container.launch.cmd-opts -server -Djava.net.preferIPv4Stack=true
-Dhadoop.metrics.log.level=WARN
-XX:+PrintGCDetails -verbose:gc
-XX:+PrintGCTimeStamps
-XX:+UseNUMA
-XX:+UseParallelGC
Command-line options for launching ContainerWorkers
mr3.container.launch.env LD_LIBRARY_PATH=$LD_LIBRARY_PATH:
$HADOOP_HOME/lib/native/
Environment variables for launching ContainerWorkers
mr3.container.log.level INFO Logging level for ContainerWorkers
mr3.container.kill.policy container.kill.wait.workervertex container.kill.wait.workervertex: stop a ContainerWorker only if no more TaskAttempts are to arrive.
container.kill.nowait: stop a ContainerWorker right away if it is not serving any TaskAttempt.
mr3.use.daemon.shufflehandler false true: use the shuffle handler as a DaemonTask. false: do not use shuffle handler.
mr3.daemon.shuffle.service-id   Service identifier for the shuffle handler
mr3.daemon.shuffle.port   Port number for the shuffle handler

ContainerWorker

Name Default value Description
mr3.container.get.command.interval.ms 2000 Time interval in milliseconds for retrieving commands in ContainerWorkers that are currently serving TaskAttempts
mr3.container.busy.wait.interval.ms 100 Time interval in milliseconds for retrieving commands in idle ContainerWorkers
mr3.task.am.heartbeat.interval.ms 250 Time interval in milliseconds for sending heartbeats from TaskAttempts
mr3.task.am.heartbeat.counter.interval.ms 30000 Time interval in milliseconds for sending counters in heartbeats from TaskAttempts
mr3.task.max.events.per.heartbeat 500 Max number of Task events to include in a heartbeat reply
mr3.container.thread.keep.alive.time.ms 4000 Time in milliseconds for keeping threads serving TaskAttempts in ContainerWorkers
mr3.container.use.termination.checker true true: check whether TaskAttempts are terminated successfully or not after termination requests.
false: do not check.
mr3.container.termination.checker.timeout.ms 120000 Time in milliseconds before checking the termination of a TaskAttempt after a termination request. With the default value, the ContainerWorker checks whether a TaskAttempt has properly terminated 60 seconds after the termination request. If the TaskAttempt has not terminated, the whole ContainerWorker is shut down.
mr3.container.elastic.execution.memory.threshold.percent 0 Maximum percentage of heap memory that is safe for accepting new TaskAttempts. For example, a value of 90 means that once the heap usage exceeds 90 percent of the total heap memory, ContainerWorker temporarily stops requesting new TaskAttempts. A value of 0 disables the elastic execution. Not used if mr3.container.combine.taskattempts is set to false.
mr3.container.elastic.execution.memory.commit.ratio 1.0d Multiplier for memory to be allocated to each TaskAttempt. For exmple, a value of 1.5 means that a TaksAttempt created with memory resource of 4GB is actually allocated 6GB of memory in a ContainerWorker. When overcommiting memory with a value larger than 1.0, use only in conjunction with mr3.container.elastic.execution.memory.threshold.percent strictly smaller than 100.

TokenRenewer

Name Default value Description
mr3.principal   Kerberos principal
mr3.keytab   Location of the Kerberos keytab file
mr3.token.renewal.fraction 0.75 Fraction of the token renewal interval for renewing tokens conservatively
mr3.token.renewal.retry.interval.ms 3600000 Time interval in milliseconds for retrying token renewal
mr3.token.renewal.num.credentials.files 5 Max number of credential files to keep for token renewal
mr3.token.renewal.hdfs.enabled false true: automatically renew HDFS tokens.
false: do not renew HDFS tokens.
mr3.token.renewal.hive.enabled false true: automatically renew Hive tokens.
false: do not renew Hive tokens.
mr3.am.token.renewal.paths   Path that specifies FileSystem for token renewal. If empty, DAGAppMaster uses the staging directory.
mr3.token.renewal.pass.credentials.via.memory true true: DAGAppMaster passes credentials to ContainerWorkers directly via messages. false: DAGAppMaster stores credentials on HDFS.

HistoryLogger

Name Default value Description
mr3.app.history.logging.enabled false true: enable history logging for Yarn applications and ContainerWorkers.
false: disable history logging for Yarn applications and ContainerWorkers.
mr3.dag.history.logging.enabled false true: enable history logging for DAGs.
false: disable history logging for DAGs.
mr3.task.history.logging.enabled false true: enable history logging for Tasks.
false: disable history logging for Tasks.

tez.common.counters.Limits

Name Default value Description
tez.counters.max 1200 Max number of Tez counters
tez.counters.max.groups 500 Max number of Tez counters groups
tez.counters.counter-name.max-length 64 Max length of Tez counter names
tez.counters.group-name.max-length 256 Max length of Tez counters group names

Kubernetes

Name Default value Description
mr3.k8s.api.server.url https://kubernetes.default.svc URL for the Kubernetes API server
mr3.k8s.namespace mr3 Kubernetes namespace to use when creating a Kubernetes client
mr3.k8s.service.account.token.path /var/run/secrets/kubernetes.io/
serviceaccount/token
Token path for the service account
mr3.k8s.service.account.token.ca.cert.path /var/run/secrets/kubernetes.io/
serviceaccount/ca.crt
Certificate path for the service account
mr3.k8s.nodes.polling.interval.ms 60000 Time interval in milliseconds for querying states of Kubernetes Nodes
mr3.k8s.pods.polling.interval.ms 15000 Time interval in milliseconds for querying Pod states
mr3.k8s.pod.creation.timeout.ms 60000 Time in milliseconds to wait until a new Pod is created
mr3.k8s.pod.image.pull.policy IfNotPresent Image pull policy for Pods
mr3.k8s.pod.image.pull.secrets   Image pull secrets for Pods
mr3.k8s.pod.master.serviceaccount   ServiceAccount for DAGAppMaster Pod
mr3.k8s.pod.master.image   Docker image for DAGAppMaster containers
mr3.k8s.pod.master.user   User for DAGAppMaster Pod
mr3.k8s.pod.master.emptydirs   Comma-separated list of directories where emptyDir volumes are mounted for DAGAppMaster
mr3.k8s.pod.master.hostpaths   Comma-separated list of directories (on each node) to which hostPath volumes point for DAGAppMaster.
For example, /data1/k8s,/data2/k8s,/data3/k8s mounts three hostPath volumes created from three local directories of the node where DAGAppMaster Pod is to run.
mr3.k8s.pod.master.node.selector   Node selector for DAGAppMaster Pod
mr3.k8s.master.working.dir   Working directory for DAGAppMaster container
mr3.k8s.master.persistentvolumeclaim.mounts   Comma-separated list of pairs of a PersistentVolumeClaim and its mount point for DAGAppMaster Pod
mr3.k8s.master.command /usr/bin/java Command for launching JVM for DAGAppMaster container
mr3.k8s.master.pod.affinity.match.label   Label for specifying Pod affinity for DAGAppMaster Pod
mr3.k8s.pod.worker.image   Docker image for ContainerWorker containers
mr3.k8s.pod.worker.user   User for ContainerWorker Pods
mr3.k8s.pod.worker.emptydirs   Comma-separated list of directories where emptyDir volumes are mounted for ContainerWorkers
mr3.k8s.pod.worker.hostpaths   Comma-separated list of directories (on each node) to which hostPath volumes point for ContainerWorkers.
For example, /data1/k8s,/data2/k8s,/data3/k8s mounts three hostPath volumes created from three local directories of the node where ContainerWorker Pods are to run.
mr3.k8s.pod.worker.node.selector   Node selector for ContainerWorker Pods
mr3.k8s.worker.working.dir   Working directory for ContainerWorker containers
mr3.k8s.worker.persistentvolumeclaim.mounts   Comma-separated list of pairs of a PersistentVolumeClaim and its mount point.
For example, foo1=bar1,foo2=bar2,foo3=bar3 mounts PersistentVolumeClaim foo1 on directory bar1 in ContainerWorker containers, and so on.
mr3.k8s.worker.command /usr/bin/java Command for launching JVM for ContainerWorker containers
mr3.k8s.conf.dir.configmap   Name of the ConfigMap carrying all configuration files (such as mr3-site.xml)
mr3.k8s.conf.dir.mount.dir   Mount path for the ConfigMap carrying all configuration files
mr3.k8s.keytab.secret   Name of the Secret containing the Keytab file to be passed from DAGAppMaster to ContainerWorkers
mr3.k8s.keytab.mount.dir   Mount path for the Secret containing the keytab file
mr3.k8s.keytab.mount.file   File name for the Secret containing the keytab file.
mr3.k8s.keytab.mount.dir and mr3.k8s.keytab.mount.file specify the full path for the keytab file mounted inside ContainerWorker containers.
mr3.k8s.host.aliases   Comma-separated list of pairs of a hostname and an IP address.
For example, foo=1.1.1.1,bar=2.2.2.2 registers host foo as IP address 1.1.1.1 in DAGAppMaster and ContainerWorker containers, and so on.