org.apache.hadoop.filecache
Class DistributedCache

java.lang.Object
  extended by org.apache.hadoop.filecache.DistributedCache

public class DistributedCache
extends Object

Distribute application-specific large, read-only files efficiently.

DistributedCache is a facility provided by the Map-Reduce framework to cache files (text, archives, jars etc.) needed by applications.

Applications specify the files, via urls (hdfs:// or http://) to be cached via the JobConf. The DistributedCache assumes that the files specified via hdfs:// urls are already present on the FileSystem at the path specified by the url.

The framework will copy the necessary files on to the slave node before any tasks for the job are executed on that node. Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are un-archived on the slaves.

DistributedCache can be used to distribute simple, read-only data/text files and/or more complex types such as archives, jars etc. Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. Jars may be optionally added to the classpath of the tasks, a rudimentary software distribution mechanism. Files have execution permissions. Optionally users can also direct it to symlink the distributed cache file(s) into the working directory of the task.

DistributedCache tracks modification timestamps of the cache files. Clearly the cache files should not be modified by the application or externally while the job is executing.

Here is an illustrative example on how to use the DistributedCache:

     // Setting up the cache for the application
     
     1. Copy the requisite files to the FileSystem:
     
     $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat  
     $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip  
     $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
     $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
     $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
     $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
     
     2. Setup the application's JobConf:
     
     JobConf job = new JobConf();
     DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), 
                                   job);
     DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
     DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
     
     3. Use the cached files in the Mapper or Reducer:
     
     public static class MapClass extends MapReduceBase  
     implements Mapper<K, V, K, V> {
     
       private Path[] localArchives;
       private Path[] localFiles;
       
       public void configure(JobConf job) {
         // Get the cached archives/files
         localArchives = DistributedCache.getLocalCacheArchives(job);
         localFiles = DistributedCache.getLocalCacheFiles(job);
       }
       
       public void map(K key, V value, 
                       OutputCollector<K, V> output, Reporter reporter) 
       throws IOException {
         // Use data from the cached archives/files here
         // ...
         // ...
         output.collect(k, v);
       }
     }
     
 

See Also:
JobConf, JobClient

Constructor Summary
DistributedCache()
           
 
Method Summary
static void addArchiveToClassPath(Path archive, Configuration conf)
          Add an archive path to the current set of classpath entries.
static void addCacheArchive(URI uri, Configuration conf)
          Add a archives to be localized to the conf
static void addCacheFile(URI uri, Configuration conf)
          Add a file to be localized to the conf
static void addFileToClassPath(Path file, Configuration conf)
          Add an file path to the current set of classpath entries It adds the file to cache as well.
static boolean checkURIs(URI[] uriFiles, URI[] uriArchives)
          This method checks if there is a conflict in the fragment names of the uris.
static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
          This method create symlinks for all files in a given dir in another directory
static void createSymlink(Configuration conf)
          This method allows you to create symlinks in the current working directory of the task to all the cache files/archives
static Path[] getArchiveClassPaths(Configuration conf)
          Get the archive entries in classpath as an array of Path
static String[] getArchiveTimestamps(Configuration conf)
          Get the timestamps of the archives
static URI[] getCacheArchives(Configuration conf)
          Get cache archives set in the Configuration
static URI[] getCacheFiles(Configuration conf)
          Get cache files set in the Configuration
static Path[] getFileClassPaths(Configuration conf)
          Get the file entries in classpath as an array of Path
static String[] getFileTimestamps(Configuration conf)
          Get the timestamps of the files
static Path getLocalCache(URI cache, Configuration conf, Path baseDir, boolean isArchive, long confFileStamp, Path currentWorkDir)
          Get the locally cached file or archive; it could either be previously cached (and valid) or copy it from the FileSystem now.
static Path getLocalCache(URI cache, Configuration conf, Path baseDir, FileStatus fileStatus, boolean isArchive, long confFileStamp, Path currentWorkDir)
          Get the locally cached file or archive; it could either be previously cached (and valid) or copy it from the FileSystem now.
static Path[] getLocalCacheArchives(Configuration conf)
          Return the path array of the localized caches
static Path[] getLocalCacheFiles(Configuration conf)
          Return the path array of the localized files
static boolean getSymlink(Configuration conf)
          This method checks to see if symlinks are to be create for the localized cache files in the current working directory
static long getTimestamp(Configuration conf, URI cache)
          Returns mtime of a given cache file on hdfs.
static String makeRelative(URI cache, Configuration conf)
           
static void purgeCache(Configuration conf)
          Clear the entire contents of the cache and delete the backing files.
static void releaseCache(URI cache, Configuration conf)
          This is the opposite of getlocalcache.
static void setArchiveTimestamps(Configuration conf, String timestamps)
          This is to check the timestamp of the archives to be localized
static void setCacheArchives(URI[] archives, Configuration conf)
          Set the configuration with the given set of archives
static void setCacheFiles(URI[] files, Configuration conf)
          Set the configuration with the given set of files
static void setFileTimestamps(Configuration conf, String timestamps)
          This is to check the timestamp of the files to be localized
static void setLocalArchives(Configuration conf, String str)
          Set the conf to contain the location for localized archives
static void setLocalFiles(Configuration conf, String str)
          Set the conf to contain the location for localized files
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

DistributedCache

public DistributedCache()
Method Detail

getLocalCache

public static Path getLocalCache(URI cache,
                                 Configuration conf,
                                 Path baseDir,
                                 FileStatus fileStatus,
                                 boolean isArchive,
                                 long confFileStamp,
                                 Path currentWorkDir)
                          throws IOException
Get the locally cached file or archive; it could either be previously cached (and valid) or copy it from the FileSystem now.

Parameters:
cache - the cache to be localized, this should be specified as new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema or hostname:port is provided the file is assumed to be in the filesystem being used in the Configuration
conf - The Confguration file which contains the filesystem
baseDir - The base cache Dir where you wnat to localize the files/archives
fileStatus - The file status on the dfs.
isArchive - if the cache is an archive or a file. In case it is an archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will be unzipped/unjarred/untarred automatically and the directory where the archive is unzipped/unjarred/untarred is returned as the Path. In case of a file, the path to the file is returned
confFileStamp - this is the hdfs file modification timestamp to verify that the file to be cached hasn't changed since the job started
currentWorkDir - this is the directory where you would want to create symlinks for the locally cached files/archives
Returns:
the path to directory where the archives are unjarred in case of archives, the path to the file where the file is copied locally
Throws:
IOException

getLocalCache

public static Path getLocalCache(URI cache,
                                 Configuration conf,
                                 Path baseDir,
                                 boolean isArchive,
                                 long confFileStamp,
                                 Path currentWorkDir)
                          throws IOException
Get the locally cached file or archive; it could either be previously cached (and valid) or copy it from the FileSystem now.

Parameters:
cache - the cache to be localized, this should be specified as new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema or hostname:port is provided the file is assumed to be in the filesystem being used in the Configuration
conf - The Confguration file which contains the filesystem
baseDir - The base cache Dir where you wnat to localize the files/archives
isArchive - if the cache is an archive or a file. In case it is an archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will be unzipped/unjarred/untarred automatically and the directory where the archive is unzipped/unjarred/untarred is returned as the Path. In case of a file, the path to the file is returned
confFileStamp - this is the hdfs file modification timestamp to verify that the file to be cached hasn't changed since the job started
currentWorkDir - this is the directory where you would want to create symlinks for the locally cached files/archives
Returns:
the path to directory where the archives are unjarred in case of archives, the path to the file where the file is copied locally
Throws:
IOException

releaseCache

public static void releaseCache(URI cache,
                                Configuration conf)
                         throws IOException
This is the opposite of getlocalcache. When you are done with using the cache, you need to release the cache

Parameters:
cache - The cache URI to be released
conf - configuration which contains the filesystem the cache is contained in.
Throws:
IOException

makeRelative

public static String makeRelative(URI cache,
                                  Configuration conf)
                           throws IOException
Throws:
IOException

getTimestamp

public static long getTimestamp(Configuration conf,
                                URI cache)
                         throws IOException
Returns mtime of a given cache file on hdfs.

Parameters:
conf - configuration
cache - cache file
Returns:
mtime of a given cache file on hdfs
Throws:
IOException

createAllSymlink

public static void createAllSymlink(Configuration conf,
                                    File jobCacheDir,
                                    File workDir)
                             throws IOException
This method create symlinks for all files in a given dir in another directory

Parameters:
conf - the configuration
jobCacheDir - the target directory for creating symlinks
workDir - the directory in which the symlinks are created
Throws:
IOException

setCacheArchives

public static void setCacheArchives(URI[] archives,
                                    Configuration conf)
Set the configuration with the given set of archives

Parameters:
archives - The list of archives that need to be localized
conf - Configuration which will be changed

setCacheFiles

public static void setCacheFiles(URI[] files,
                                 Configuration conf)
Set the configuration with the given set of files

Parameters:
files - The list of files that need to be localized
conf - Configuration which will be changed

getCacheArchives

public static URI[] getCacheArchives(Configuration conf)
                              throws IOException
Get cache archives set in the Configuration

Parameters:
conf - The configuration which contains the archives
Returns:
A URI array of the caches set in the Configuration
Throws:
IOException

getCacheFiles

public static URI[] getCacheFiles(Configuration conf)
                           throws IOException
Get cache files set in the Configuration

Parameters:
conf - The configuration which contains the files
Returns:
A URI array of the files set in the Configuration
Throws:
IOException

getLocalCacheArchives

public static Path[] getLocalCacheArchives(Configuration conf)
                                    throws IOException
Return the path array of the localized caches

Parameters:
conf - Configuration that contains the localized archives
Returns:
A path array of localized caches
Throws:
IOException

getLocalCacheFiles

public static Path[] getLocalCacheFiles(Configuration conf)
                                 throws IOException
Return the path array of the localized files

Parameters:
conf - Configuration that contains the localized files
Returns:
A path array of localized files
Throws:
IOException

getArchiveTimestamps

public static String[] getArchiveTimestamps(Configuration conf)
Get the timestamps of the archives

Parameters:
conf - The configuration which stored the timestamps
Returns:
a string array of timestamps
Throws:
IOException

getFileTimestamps

public static String[] getFileTimestamps(Configuration conf)
Get the timestamps of the files

Parameters:
conf - The configuration which stored the timestamps
Returns:
a string array of timestamps
Throws:
IOException

setArchiveTimestamps

public static void setArchiveTimestamps(Configuration conf,
                                        String timestamps)
This is to check the timestamp of the archives to be localized

Parameters:
conf - Configuration which stores the timestamp's
timestamps - comma separated list of timestamps of archives. The order should be the same as the order in which the archives are added.

setFileTimestamps

public static void setFileTimestamps(Configuration conf,
                                     String timestamps)
This is to check the timestamp of the files to be localized

Parameters:
conf - Configuration which stores the timestamp's
timestamps - comma separated list of timestamps of files. The order should be the same as the order in which the files are added.

setLocalArchives

public static void setLocalArchives(Configuration conf,
                                    String str)
Set the conf to contain the location for localized archives

Parameters:
conf - The conf to modify to contain the localized caches
str - a comma separated list of local archives

setLocalFiles

public static void setLocalFiles(Configuration conf,
                                 String str)
Set the conf to contain the location for localized files

Parameters:
conf - The conf to modify to contain the localized caches
str - a comma separated list of local files

addCacheArchive

public static void addCacheArchive(URI uri,
                                   Configuration conf)
Add a archives to be localized to the conf

Parameters:
uri - The uri of the cache to be localized
conf - Configuration to add the cache to

addCacheFile

public static void addCacheFile(URI uri,
                                Configuration conf)
Add a file to be localized to the conf

Parameters:
uri - The uri of the cache to be localized
conf - Configuration to add the cache to

addFileToClassPath

public static void addFileToClassPath(Path file,
                                      Configuration conf)
                               throws IOException
Add an file path to the current set of classpath entries It adds the file to cache as well.

Parameters:
file - Path of the file to be added
conf - Configuration that contains the classpath setting
Throws:
IOException

getFileClassPaths

public static Path[] getFileClassPaths(Configuration conf)
Get the file entries in classpath as an array of Path

Parameters:
conf - Configuration that contains the classpath setting

addArchiveToClassPath

public static void addArchiveToClassPath(Path archive,
                                         Configuration conf)
                                  throws IOException
Add an archive path to the current set of classpath entries. It adds the archive to cache as well.

Parameters:
archive - Path of the archive to be added
conf - Configuration that contains the classpath setting
Throws:
IOException

getArchiveClassPaths

public static Path[] getArchiveClassPaths(Configuration conf)
Get the archive entries in classpath as an array of Path

Parameters:
conf - Configuration that contains the classpath setting

createSymlink

public static void createSymlink(Configuration conf)
This method allows you to create symlinks in the current working directory of the task to all the cache files/archives

Parameters:
conf - the jobconf

getSymlink

public static boolean getSymlink(Configuration conf)
This method checks to see if symlinks are to be create for the localized cache files in the current working directory

Parameters:
conf - the jobconf
Returns:
true if symlinks are to be created- else return false

checkURIs

public static boolean checkURIs(URI[] uriFiles,
                                URI[] uriArchives)
This method checks if there is a conflict in the fragment names of the uris. Also makes sure that each uri has a fragment. It is only to be called if you want to create symlinks for the various archives and files.

Parameters:
uriFiles - The uri array of urifiles
uriArchives - the uri array of uri archives

purgeCache

public static void purgeCache(Configuration conf)
                       throws IOException
Clear the entire contents of the cache and delete the backing files. This should only be used when the server is reinitializing, because the users are going to lose their files.

Throws:
IOException


Copyright © 2008 The Apache Software Foundation