(Usually, True is the default)
set hive.execution.engine=mr;set hive.execution.engine=tez;set hive.execution.engine=spark;
set mapreduce.job.queuename=<yourqueuename>;set tez.queue.name=<yourqueuename>;set spark.yarn.queue=<yourqueuename>;
Vectorization processes reducers in batch of 1024 readings, which improves performance. For example, your joins will not be processed one at a time from now on but in batch of 1024. Think reading one entry from
Right and comparing it to 1024 from
Left before moving to the next entry from
set hive.vectorized.execution.enabled=true;set hive.vectorized.execution.reduce.enabled=true;
Running this list of command will gather statistics about the distribution of the data within your tables. Using this new knowledge, Hive will now adopt a
cob strategy while running its reducers.
set hive.cbo.enable=true;set hive.compute.query.using.stats=true;set hive.stats.fetch.column.stats=true;set hive.stats.fetch.partition.stats=true;analyze table yourTable compute statistics for columns;
By default Hive writes to some sort of textFile. ORC is a highly efficient way to store Hive data. It supports datetime, decimal, list, map. Chunks of data are bounded by amount of memory needed for read/write, it saves meta-information & light-weight index for each chunk of data and performs most of operations faster because of that. (eg: it will store the count of rows in each chunks, so that when you want to count the number of rows in one dataset, it only has to add the already summarized information by chunks.)
STORED AS ORC tblproperties ("orc.compress" = "SNAPPY")
.myTableSTORED AS ORC tblproperties ("orc.compress" = "ZLIB")AS SELECT * FROM myOtherTableWHERE filteringColumn IN (filter1, filter2, ..., filtern);
Behind the scene, Hive will hash the specified columns and store it by hash results into N buckets. For example, you would not partition your data by ID but you could bucket it by ID and Hive would store the same ID from 2 different tables into corresponding buckets of hashed-ID. Thus, it would only need to compare the entries from the corresponding buckets to perform its join. Note: joinID can be many columns, in which case we will write: joinID1, joinID2, ..., joinIDn.
set hive.enforce.bucketing=true;.left LIKE lib.formerLeft;.left SET FILEFORMAT ORC;.leftCLUSTERED BY (joinID) INTO 256 buckets;INSERT OVERWRITE TABLE lib.leftSELECT * FROM lib.formerLeft;-- repeat steps for Right table-- Note: your Right table has to have a number of buckets that is a multiple of the number of buckets that your left table has
Now we join right on left.
SET hive.optimize.bucketmapjoin=true;.joinedTableSTORED AS ORC TBLPROPERTIES('orc.compress' = 'SNAPPY')AS SELECT /*+ MAPJOIN(a,b)*/ a.*, b.colNameFROM lib.left AS aLEFT OUTER JOIN lib.right AS bON a.joinID=b.joinID;-- You have just conducted an highly parallelized join, congrats.IF EXISTS lib.left;IF EXISTS lib.right;
Very useful if you want to join 2 large dataframes and you want to quickly test your procedure.
lib.right from above
SELECT a.*, b.*FROM lib.left TABLESAMPLE(bucket 8 out of 256 on joinID) AS a, lib.right TABLESAMPLE(bucket 8 out of 256 on joinID) AS bWHERE a.joinID=b.joinID;
Warning: This takes a while (in terms of runtime) to set up. So use it only if you get out-of-memory-error.
set mapreduce.map.memory.mb=32768;set mapreduce.reduce.memory.mb=32768;-- By default, hive.tez.container.size is equal to mapreduce.map.memory.mbset hive.tez.container.size=32768;-- I read somewhere that the rule of thumb was to set it equal to 1/3 of hive.tez.container.sizeset hive.auto.convert.join.noconditionaltask.size=16384;-- SET hive.tez.java.opts=-Xmx5120m;
set tez.grouping.min-size=16777216; -- 16 MB min splitset tez.grouping.max-size=1073741824; -- 1 GB max split-- Increase min and max split size to reduce the number of mappers.
The most important thing of all, dropping old, useless, space clogging, and visually-distracting tables
IF EXISTS lib.tableToRetire;