#! /usr/bin/env python # Read pair-wise alignments in MAF or LAST tabular format: write an # "Oxford grid", a.k.a. dotplot. # TODO: Currently, pixels with zero aligned nt-pairs are white, and # pixels with one or more aligned nt-pairs are black. This can look # too crowded for large genome alignments. I tried shading each pixel # according to the number of aligned nt-pairs within it, but the # result is too faint. How can this be done better? import fnmatch, itertools, optparse, os, re, sys # Try to make PIL/PILLOW work: try: from PIL import Image, ImageDraw, ImageFont, ImageColor except ImportError: import Image, ImageDraw, ImageFont, ImageColor def myOpen(fileName): # faster than fileinput if fileName == "-": return sys.stdin return open(fileName) def warn(message): prog = os.path.basename(sys.argv[0]) sys.stderr.write(prog + ": " + message + "\n") def croppedBlocks(blocks, range1, range2): cropBeg1, cropEnd1 = range1 cropBeg2, cropEnd2 = range2 if blocks[0][0] < 0: cropBeg1, cropEnd1 = -cropEnd1, -cropBeg1 if blocks[0][1] < 0: cropBeg2, cropEnd2 = -cropEnd2, -cropBeg2 for beg1, beg2, size in blocks: b1 = max(cropBeg1, beg1) e1 = min(cropEnd1, beg1 + size) if b1 >= e1: continue offset = beg2 - beg1 b2 = max(cropBeg2, b1 + offset) e2 = min(cropEnd2, e1 + offset) if b2 >= e2: continue yield b2 - offset, b2, e2 - b2 def tabBlocks(beg1, beg2, blocks): '''Get the gapless blocks of an alignment, from LAST tabular format.''' for i in blocks.split(","): if ":" in i: x, y = i.split(":") beg1 += int(x) beg2 += int(y) else: size = int(i) yield beg1, beg2, size beg1 += size beg2 += size def mafBlocks(beg1, beg2, seq1, seq2): '''Get the gapless blocks of an alignment, from MAF format.''' size = 0 for x, y in itertools.izip(seq1, seq2): if x == "-": if size: yield beg1, beg2, size beg1 += size beg2 += size size = 0 beg2 += 1 elif y == "-": if size: yield beg1, beg2, size beg1 += size beg2 += size size = 0 beg1 += 1 else: size += 1 if size: yield beg1, beg2, size def alignmentInput(lines): '''Get alignments and sequence lengths, from MAF or tabular format.''' mafCount = 0 for line in lines: w = line.split() if line[0].isdigit(): # tabular format chr1, beg1, seqlen1 = w[1], int(w[2]), int(w[5]) if w[4] == "-": beg1 -= seqlen1 chr2, beg2, seqlen2 = w[6], int(w[7]), int(w[10]) if w[9] == "-": beg2 -= seqlen2 blocks = tabBlocks(beg1, beg2, w[11]) yield chr1, seqlen1, chr2, seqlen2, blocks elif line[0] == "s": # MAF format if mafCount == 0: chr1, beg1, seqlen1, seq1 = w[1], int(w[2]), int(w[5]), w[6] if w[4] == "-": beg1 -= seqlen1 mafCount = 1 else: chr2, beg2, seqlen2, seq2 = w[1], int(w[2]), int(w[5]), w[6] if w[4] == "-": beg2 -= seqlen2 blocks = mafBlocks(beg1, beg2, seq1, seq2) yield chr1, seqlen1, chr2, seqlen2, blocks mafCount = 0 def seqRangeFromText(text): if ":" in text: pattern, interval = text.rsplit(":", 1) if "-" in interval: beg, end = interval.rsplit("-", 1) return pattern, int(beg), int(end) # beg may be negative return text, 0, sys.maxsize def rangeFromSeqName(seqRanges, name, seqLen): if not seqRanges: return 0, seqLen base = name.split(".")[-1] # allow for names like hg19.chr7 for pat, beg, end in seqRanges: if fnmatch.fnmatchcase(name, pat) or fnmatch.fnmatchcase(base, pat): return max(beg, 0), min(end, seqLen) return None def updateSeqs(isTrim, seqNames, seqLimits, seqName, seqRange, blocks, index): if seqName not in seqLimits: seqNames.append(seqName) if isTrim: beg = blocks[0][index] end = blocks[-1][index] + blocks[-1][2] if beg < 0: beg, end = -end, -beg if seqName in seqLimits: b, e = seqLimits[seqName] seqLimits[seqName] = min(b, beg), max(e, end) else: seqLimits[seqName] = beg, end else: seqLimits[seqName] = seqRange def readAlignments(fileName, opts): '''Get alignments and sequence limits, from MAF or tabular format.''' seqRanges1 = map(seqRangeFromText, opts.seq1) seqRanges2 = map(seqRangeFromText, opts.seq2) alignments = [] seqNames1 = [] seqNames2 = [] seqLimits1 = {} seqLimits2 = {} lines = myOpen(fileName) for seqName1, seqLen1, seqName2, seqLen2, blocks in alignmentInput(lines): range1 = rangeFromSeqName(seqRanges1, seqName1, seqLen1) if not range1: continue range2 = rangeFromSeqName(seqRanges2, seqName2, seqLen2) if not range2: continue b = list(croppedBlocks(list(blocks), range1, range2)) if not b: continue aln = seqName1, seqName2, b alignments.append(aln) updateSeqs(opts.trim1, seqNames1, seqLimits1, seqName1, range1, b, 0) updateSeqs(opts.trim2, seqNames2, seqLimits2, seqName2, range2, b, 1) return alignments, seqNames1, seqNames2, seqLimits1, seqLimits2 def natural_sort_key(my_string): '''Return a sort key for "natural" ordering, e.g. chr9 < chr10.''' parts = re.split(r'(\d+)', my_string) parts[1::2] = map(int, parts[1::2]) return parts def get_text_sizes(my_strings, font, fontsize, image_mode): '''Get widths & heights, in pixels, of some strings.''' if fontsize == 0: return [(0, 0) for i in my_strings] image_size = 1, 1 im = Image.new(image_mode, image_size) draw = ImageDraw.Draw(im) return [draw.textsize(i, font=font) for i in my_strings] def sizeText(size): suffixes = "bp", "kb", "Mb", "Gb" for i, x in enumerate(suffixes): j = 10 ** (i * 3) if size < j * 10: return "%.2g" % (1.0 * size / j) + x if size < j * 1000 or i == len(suffixes) - 1: return "%.0f" % (1.0 * size / j) + x def seqNameAndSizeText(seqName, seqSize): return seqName + ": " + sizeText(seqSize) def getSeqInfo(sortOpt, seqNames, seqLimits, font, fontsize, image_mode, isShowSize): '''Return miscellaneous information about the sequences.''' if sortOpt == 1: seqNames.sort(key=natural_sort_key) seqSizes = [seqLimits[i][1] - seqLimits[i][0] for i in seqNames] if sortOpt == 2: seqRecords = sorted(zip(seqSizes, seqNames), reverse=True) seqSizes = [i[0] for i in seqRecords] seqNames = [i[1] for i in seqRecords] if isShowSize: seqLabels = map(seqNameAndSizeText, seqNames, seqSizes) else: seqLabels = seqNames labelSizes = get_text_sizes(seqLabels, font, fontsize, image_mode) margin = max(zip(*labelSizes)[1]) # maximum text height return seqNames, seqSizes, seqLabels, labelSizes, margin def div_ceil(x, y): '''Return x / y rounded up.''' q, r = divmod(x, y) return q + (r != 0) def tot_seq_pix(seq_sizes, bp_per_pix): '''Return the total pixels needed for sequences of the given sizes.''' return sum([div_ceil(i, bp_per_pix) for i in seq_sizes]) def get_bp_per_pix(seq_sizes, pix_tween_seqs, pix_limit): '''Get the minimum bp-per-pixel that fits in the size limit.''' seq_num = len(seq_sizes) seq_pix_limit = pix_limit - pix_tween_seqs * (seq_num - 1) if seq_pix_limit < seq_num: raise Exception("can't fit the image: too many sequences?") lower_bound = div_ceil(sum(seq_sizes), seq_pix_limit) for bp_per_pix in itertools.count(lower_bound): # slow linear search if tot_seq_pix(seq_sizes, bp_per_pix) <= seq_pix_limit: break return bp_per_pix def get_seq_starts(seq_pix, pix_tween_seqs, margin): '''Get the start pixel for each sequence.''' seq_starts = [] pix_tot = margin - pix_tween_seqs for i in seq_pix: pix_tot += pix_tween_seqs seq_starts.append(pix_tot) pix_tot += i return seq_starts def get_pix_info(seq_sizes, bp_per_pix, pix_tween_seqs, margin): '''Return pixel information about the sequences.''' seq_pix = [div_ceil(i, bp_per_pix) for i in seq_sizes] seq_starts = get_seq_starts(seq_pix, pix_tween_seqs, margin) tot_pix = seq_starts[-1] + seq_pix[-1] return seq_pix, seq_starts, tot_pix def drawLineForward(hits, width, bp_per_pix, beg1, beg2, size): while True: q1, r1 = divmod(beg1, bp_per_pix) q2, r2 = divmod(beg2, bp_per_pix) hits[q2 * width + q1] |= 1 next_pix = min(bp_per_pix - r1, bp_per_pix - r2) if next_pix >= size: break beg1 += next_pix beg2 += next_pix size -= next_pix def drawLineReverse(hits, width, bp_per_pix, beg1, beg2, size): beg2 = -1 - beg2 while True: q1, r1 = divmod(beg1, bp_per_pix) q2, r2 = divmod(beg2, bp_per_pix) hits[q2 * width + q1] |= 2 next_pix = min(bp_per_pix - r1, r2 + 1) if next_pix >= size: break beg1 += next_pix beg2 -= next_pix size -= next_pix def alignmentPixels(width, height, alignments, bp_per_pix, origins1, origins2): hits = [0] * (width * height) # the image data for seq1, seq2, blocks in alignments: ori1 = origins1[seq1] ori2 = origins2[seq2] for beg1, beg2, size in blocks: if beg1 < 0: beg1 = -(beg1 + size) beg2 = -(beg2 + size) if beg2 >= 0: drawLineForward(hits, width, bp_per_pix, beg1 + ori1, beg2 + ori2, size) else: drawLineReverse(hits, width, bp_per_pix, beg1 + ori1, beg2 - ori2, size) return hits def expandedSeqDict(seqDict): '''Allow lookup by short sequence names, e.g. chr7 as well as hg19.chr7.''' newDict = {} for name, x in seqDict.items(): base = name.split(".")[-1] newDict[name] = x newDict[base] = x return newDict def readBed(fileName, seqLimits): if not fileName: return for line in myOpen(fileName): w = line.split() if not w: continue seqName = w[0] if seqName not in seqLimits: continue cropBeg, cropEnd = seqLimits[seqName] beg = int(w[1]) end = int(w[2]) b = max(beg, cropBeg) e = min(end, cropEnd) if b >= e: continue color = "#ffe4ff" if len(w) > 5: if len(w) > 8 and w[8].count(",") == 2: color = "rgb(" + w[8] + ")" elif w[5] == "+": color = "#fff4f4" elif w[5] == "-": color = "#f4f4ff" yield seqName, b, e, color def isExtraFirstGapField(fields): return fields[4].isdigit() def readGaps(fileName, seqLimits): '''Read locations of unsequenced gaps, from an agp or gap file.''' if not fileName: return for line in myOpen(fileName): w = line.split() if not w or w[0][0] == "#": continue if isExtraFirstGapField(w): w = w[1:] if w[4] not in "NU": continue seqName = w[0] if seqName not in seqLimits: continue cropBeg, cropEnd = seqLimits[seqName] end = int(w[2]) beg = end - int(w[5]) # zero-based coordinate b = max(beg, cropBeg) e = min(end, cropEnd) if b >= e: continue bridgedText = w[7] yield seqName, b, e, bridgedText def drawAnnotations(im, beds, origins, margin, limit, isTop, bp_per_pix): # XXX no consideration of different-color overlaps for seqName, beg, end, color in beds: ori = origins[seqName] b = (ori + beg) // bp_per_pix e = div_ceil(ori + end, bp_per_pix) if isTop: box = b, margin, e, limit else: box = margin, b, limit, e im.paste(color, box) def drawUnsequencedGaps(im, gaps, origins, margin, limit, isTop, bridgedText, bp_per_pix, color): '''Draw rectangles representing unsequenced gaps.''' for seqName, beg, end, b in gaps: if b != bridgedText: continue ori = origins[seqName] b = div_ceil(ori + beg, bp_per_pix) # use fully-covered pixels only e = (ori + end) // bp_per_pix if e <= b: continue if isTop: box = b, margin, e, limit else: box = margin, b, limit, e im.paste(color, box) def make_label(text, text_size, range_start, range_size): '''Return an axis label with endpoint & sort-order information.''' text_width = text_size[0] label_start = range_start + (range_size - text_width) // 2 label_end = label_start + text_width sort_key = text_width - range_size return sort_key, label_start, label_end, text def get_nonoverlapping_labels(labels, label_space): '''Get a subset of non-overlapping axis labels, greedily.''' nonoverlapping_labels = [] for i in labels: if True not in [i[1] < j[2] + label_space and j[1] < i[2] + label_space for j in nonoverlapping_labels]: nonoverlapping_labels.append(i) return nonoverlapping_labels def get_axis_image(seqNames, name_sizes, seq_starts, seq_pix, font, image_mode, opts): '''Make an image of axis labels.''' min_pos = seq_starts[0] max_pos = seq_starts[-1] + seq_pix[-1] height = max(zip(*name_sizes)[1]) labels = map(make_label, seqNames, name_sizes, seq_starts, seq_pix) labels = [i for i in labels if i[1] >= min_pos and i[2] <= max_pos] labels.sort() labels = get_nonoverlapping_labels(labels, opts.label_space) image_size = max_pos, height im = Image.new(image_mode, image_size, opts.border_color) draw = ImageDraw.Draw(im) for i in labels: position = i[1], 0 draw.text(position, i[3], font=font, fill=opts.text_color) return im def seqOrigins(seqNames, seq_starts, seqLimits, bp_per_pix): for i, j in zip(seqNames, seq_starts): yield i, bp_per_pix * j - seqLimits[i][0] def lastDotplot(opts, args): if opts.fontfile: font = ImageFont.truetype(opts.fontfile, opts.fontsize) else: font = ImageFont.load_default() image_mode = 'RGB' forward_color = ImageColor.getcolor(opts.forwardcolor, image_mode) reverse_color = ImageColor.getcolor(opts.reversecolor, image_mode) zipped_colors = zip(forward_color, reverse_color) overlap_color = tuple([(i + j) // 2 for i, j in zipped_colors]) warn("reading alignments...") alignmentInfo = readAlignments(args[0], opts) alignments, seqNames1, seqNames2, seqLimits1, seqLimits2 = alignmentInfo warn("done") if not alignments: raise Exception("there are no alignments") i1 = getSeqInfo(opts.sort1, seqNames1, seqLimits1, font, opts.fontsize, image_mode, opts.lengths1) seqNames1, seqSizes1, seqLabels1, labelSizes1, margin1 = i1 i2 = getSeqInfo(opts.sort2, seqNames2, seqLimits2, font, opts.fontsize, image_mode, opts.lengths2) seqNames2, seqSizes2, seqLabels2, labelSizes2, margin2 = i2 warn("choosing bp per pixel...") pix_limit1 = opts.width - margin1 pix_limit2 = opts.height - margin2 bpPerPix1 = get_bp_per_pix(seqSizes1, opts.border_pixels, pix_limit1) bpPerPix2 = get_bp_per_pix(seqSizes2, opts.border_pixels, pix_limit2) bpPerPix = max(bpPerPix1, bpPerPix2) warn("bp per pixel = " + str(bpPerPix)) seq_pix1, seq_starts1, width = get_pix_info(seqSizes1, bpPerPix, opts.border_pixels, margin1) seq_pix2, seq_starts2, height = get_pix_info(seqSizes2, bpPerPix, opts.border_pixels, margin2) warn("width: " + str(width)) warn("height: " + str(height)) origins1 = dict(seqOrigins(seqNames1, seq_starts1, seqLimits1, bpPerPix)) origins2 = dict(seqOrigins(seqNames2, seq_starts2, seqLimits2, bpPerPix)) warn("processing alignments...") hits = alignmentPixels(width, height, alignments, bpPerPix, origins1, origins2) warn("done") image_size = width, height im = Image.new(image_mode, image_size, opts.background_color) seqLimits1 = expandedSeqDict(seqLimits1) seqLimits2 = expandedSeqDict(seqLimits2) origins1 = expandedSeqDict(origins1) origins2 = expandedSeqDict(origins2) beds1 = list(readBed(opts.bed1, seqLimits1)) beds2 = list(readBed(opts.bed2, seqLimits2)) drawAnnotations(im, beds1, origins1, margin2, height, True, bpPerPix) drawAnnotations(im, beds2, origins2, margin1, width, False, bpPerPix) gaps1 = list(readGaps(opts.gap1, seqLimits1)) gaps2 = list(readGaps(opts.gap2, seqLimits2)) # draw bridged gaps first, then unbridged gaps on top: drawUnsequencedGaps(im, gaps1, origins1, margin2, height, True, "yes", bpPerPix, opts.bridged_color) drawUnsequencedGaps(im, gaps2, origins2, margin1, width, False, "yes", bpPerPix, opts.bridged_color) drawUnsequencedGaps(im, gaps1, origins1, margin2, height, True, "no", bpPerPix, opts.unbridged_color) drawUnsequencedGaps(im, gaps2, origins2, margin1, width, False, "no", bpPerPix, opts.unbridged_color) for i in range(height): for j in range(width): store_value = hits[i * width + j] xy = j, i if store_value == 1: im.putpixel(xy, forward_color) elif store_value == 2: im.putpixel(xy, reverse_color) elif store_value == 3: im.putpixel(xy, overlap_color) if opts.fontsize != 0: axis1 = get_axis_image(seqLabels1, labelSizes1, seq_starts1, seq_pix1, font, image_mode, opts) axis2 = get_axis_image(seqLabels2, labelSizes2, seq_starts2, seq_pix2, font, image_mode, opts) axis2 = axis2.transpose(Image.ROTATE_270) # !!! bug hotspot im.paste(axis1, (0, 0)) im.paste(axis2, (0, 0)) for i in seq_starts1[1:]: box = i - opts.border_pixels, margin2, i, height im.paste(opts.border_color, box) for i in seq_starts2[1:]: box = margin1, i - opts.border_pixels, width, i im.paste(opts.border_color, box) im.save(args[1]) if __name__ == "__main__": usage = """%prog --help or: %prog [options] maf-or-tab-alignments dotplot.png or: %prog [options] maf-or-tab-alignments dotplot.gif or: ...""" description = "Draw a dotplot of pair-wise sequence alignments in MAF or tabular format." op = optparse.OptionParser(usage=usage, description=description) op.add_option("-1", "--seq1", metavar="PATTERN", action="append", default=[], help="which sequences to show from the 1st genome") op.add_option("-2", "--seq2", metavar="PATTERN", action="append", default=[], help="which sequences to show from the 2nd genome") # Replace "width" & "height" with a single "length" option? op.add_option("-x", "--width", type="int", default=1000, help="maximum width in pixels (default: %default)") op.add_option("-y", "--height", type="int", default=1000, help="maximum height in pixels (default: %default)") op.add_option("-c", "--forwardcolor", metavar="COLOR", default="red", help="color for forward alignments (default: %default)") op.add_option("-r", "--reversecolor", metavar="COLOR", default="blue", help="color for reverse alignments (default: %default)") op.add_option("--sort1", type="int", default=1, metavar="N", help="genome1 sequence order: 0=input order, 1=name order, " "2=length order (default=%default)") op.add_option("--sort2", type="int", default=1, metavar="N", help="genome2 sequence order: 0=input order, 1=name order, " "2=length order (default=%default)") op.add_option("--trim1", action="store_true", help="trim unaligned sequence flanks from the 1st genome") op.add_option("--trim2", action="store_true", help="trim unaligned sequence flanks from the 2nd genome") op.add_option("--border-pixels", metavar="INT", type="int", default=1, help="number of pixels between sequences (default=%default)") op.add_option("--border-color", metavar="COLOR", default="#dcdcdc", help="color for pixels between sequences (default=%default)") # xxx --margin-color? op.add_option("--bed1", metavar="FILE", help="read genome1 annotations from bed file") op.add_option("--bed2", metavar="FILE", help="read genome2 annotations from bed file") og = optparse.OptionGroup(op, "Text options") og.add_option("-f", "--fontfile", metavar="FILE", help="TrueType or OpenType font file") og.add_option("-s", "--fontsize", metavar="SIZE", type="int", default=11, help="TrueType or OpenType font size (default: %default)") og.add_option("--lengths1", action="store_true", help="show sequence lengths for the 1st (horizontal) genome") og.add_option("--lengths2", action="store_true", help="show sequence lengths for the 2nd (vertical) genome") op.add_option_group(og) og = optparse.OptionGroup(op, "Unsequenced gap options") og.add_option("--gap1", metavar="FILE", help="read genome1 unsequenced gaps from agp or gap file") og.add_option("--gap2", metavar="FILE", help="read genome2 unsequenced gaps from agp or gap file") og.add_option("--bridged-color", metavar="COLOR", default="yellow", help="color for bridged gaps (default: %default)") og.add_option("--unbridged-color", metavar="COLOR", default="pink", help="color for unbridged gaps (default: %default)") op.add_option_group(og) (opts, args) = op.parse_args() if len(args) != 2: op.error("2 arguments needed") opts.text_color = "black" opts.background_color = "white" opts.label_space = 5 # minimum number of pixels between axis labels try: lastDotplot(opts, args) except KeyboardInterrupt: pass # avoid silly error message except Exception, e: prog = os.path.basename(sys.argv[0]) sys.exit(prog + ": error: " + str(e))