RDD.
saveAsHadoopDataset
Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system, using the old Hadoop OutputFormat API (mapred package). Keys/values are converted for output using either user specified converters or, by default, “org.apache.spark.api.python.JavaToWritableConverter”.
RDD[(K, V)]
New in version 1.1.0.
Hadoop job configuration
fully qualified classname of key converter (None by default)
fully qualified classname of value converter (None by default)
See also
SparkContext.hadoopRDD()
RDD.saveAsNewAPIHadoopDataset()
RDD.saveAsHadoopFile()
RDD.saveAsNewAPIHadoopFile()
RDD.saveAsSequenceFile()
Examples
>>> import os >>> import tempfile
Set the related classes
>>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat" >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat" >>> key_class = "org.apache.hadoop.io.IntWritable" >>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d: ... path = os.path.join(d, "old_hadoop_file") ... ... # Create the conf for writing ... write_conf = { ... "mapred.output.format.class": output_format_class, ... "mapreduce.job.output.key.class": key_class, ... "mapreduce.job.output.value.class": value_class, ... "mapreduce.output.fileoutputformat.outputdir": path, ... } ... ... # Write a temporary Hadoop file ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")]) ... rdd.saveAsHadoopDataset(conf=write_conf) ... ... # Create the conf for reading ... read_conf = {"mapreduce.input.fileinputformat.inputdir": path} ... ... # Load this Hadoop file as an RDD ... loaded = sc.hadoopRDD(input_format_class, key_class, value_class, conf=read_conf) ... sorted(loaded.collect()) [(0, '1\t'), (0, '1\ta'), (0, '3\tx')]